mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-24 23:27:25 -04:00
Remove multiple paths from NodeEnvironment (#72599)
This commit converts the multiple paths and locks internal to NodeEnvironment into a singular data path. relates #71205
This commit is contained in:
parent
fad5e44b99
commit
b6436c51cd
2 changed files with 98 additions and 179 deletions
|
@ -64,7 +64,6 @@ import java.nio.file.StandardCopyOption;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -144,9 +143,9 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
|
||||
private final Logger logger = LogManager.getLogger(NodeEnvironment.class);
|
||||
private final NodePath[] nodePaths;
|
||||
private final NodePath nodePath;
|
||||
private final Path sharedDataPath;
|
||||
private final Lock[] locks;
|
||||
private final Lock lock;
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private final Map<ShardId, InternalShardLock> shardLocks = new HashMap<>();
|
||||
|
@ -177,8 +176,8 @@ public final class NodeEnvironment implements Closeable {
|
|||
|
||||
public static class NodeLock implements Releasable {
|
||||
|
||||
private final Lock[] locks;
|
||||
private final NodePath[] nodePaths;
|
||||
private final Lock lock;
|
||||
private final NodePath nodePath;
|
||||
|
||||
|
||||
public NodeLock(final Logger logger,
|
||||
|
@ -195,18 +194,18 @@ public final class NodeEnvironment implements Closeable {
|
|||
final Environment environment,
|
||||
final CheckedFunction<Path, Boolean, IOException> pathFunction,
|
||||
final Function<Path, Path> subPathMapping) throws IOException {
|
||||
nodePaths = new NodePath[1];
|
||||
locks = new Lock[1];
|
||||
try {
|
||||
Path dataDir = environment.dataFile();
|
||||
Path dir = subPathMapping.apply(dataDir);
|
||||
if (pathFunction.apply(dir) == false) {
|
||||
lock = null;
|
||||
nodePath = null;
|
||||
return;
|
||||
}
|
||||
try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
|
||||
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
|
||||
locks[0] = luceneDir.obtainLock(NODE_LOCK_FILENAME);
|
||||
nodePaths[0] = new NodePath(dir);
|
||||
lock = luceneDir.obtainLock(NODE_LOCK_FILENAME);
|
||||
nodePath = new NodePath(dir);
|
||||
} catch (IOException e) {
|
||||
logger.trace(() -> new ParameterizedMessage(
|
||||
"failed to obtain node lock on {}", dir.toAbsolutePath()), e);
|
||||
|
@ -221,17 +220,12 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
|
||||
public NodePath getNodePath() {
|
||||
return nodePaths[0];
|
||||
return nodePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (int i = 0; i < locks.length; i++) {
|
||||
if (locks[i] != null) {
|
||||
IOUtils.closeWhileHandlingException(locks[i]);
|
||||
}
|
||||
locks[i] = null;
|
||||
}
|
||||
IOUtils.closeWhileHandlingException(lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -258,10 +252,10 @@ public final class NodeEnvironment implements Closeable {
|
|||
throw new IllegalStateException(message, e);
|
||||
}
|
||||
|
||||
this.locks = nodeLock.locks;
|
||||
this.nodePaths = nodeLock.nodePaths;
|
||||
this.lock = nodeLock.lock;
|
||||
this.nodePath = nodeLock.nodePath;
|
||||
|
||||
logger.debug("using node location {}", Arrays.toString(nodePaths));
|
||||
logger.debug("using node location {}", nodePath);
|
||||
|
||||
maybeLogPathDetails();
|
||||
maybeLogHeapDetails();
|
||||
|
@ -269,7 +263,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
applySegmentInfosTrace(settings);
|
||||
assertCanWrite();
|
||||
|
||||
ensureAtomicMoveSupported(nodePaths);
|
||||
ensureAtomicMoveSupported(nodePath);
|
||||
|
||||
if (upgradeLegacyNodeFolders(logger, settings, environment, nodeLock)) {
|
||||
assertCanWrite();
|
||||
|
@ -277,13 +271,13 @@ public final class NodeEnvironment implements Closeable {
|
|||
|
||||
if (DiscoveryNode.canContainData(settings) == false) {
|
||||
if (DiscoveryNode.isMasterNode(settings) == false) {
|
||||
ensureNoIndexMetadata(nodePaths);
|
||||
ensureNoIndexMetadata(nodePath);
|
||||
}
|
||||
|
||||
ensureNoShardData(nodePaths);
|
||||
ensureNoShardData(nodePath);
|
||||
}
|
||||
|
||||
this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
|
||||
this.nodeMetadata = loadNodeMetadata(settings, logger, nodePath);
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
|
@ -430,7 +424,6 @@ public final class NodeEnvironment implements Closeable {
|
|||
if (logger.isDebugEnabled()) {
|
||||
// Log one line per path.data:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (NodePath nodePath : nodePaths) {
|
||||
sb.append('\n').append(" -> ").append(nodePath.path.toAbsolutePath());
|
||||
|
||||
FsInfo.Path fsPath = FsProbe.getFSInfo(nodePath);
|
||||
|
@ -445,28 +438,14 @@ public final class NodeEnvironment implements Closeable {
|
|||
.append("], type [")
|
||||
.append(fsPath.getType())
|
||||
.append(']');
|
||||
}
|
||||
logger.debug("node data locations details:{}", sb);
|
||||
} else if (logger.isInfoEnabled()) {
|
||||
FsInfo.Path totFSPath = new FsInfo.Path();
|
||||
Set<String> allTypes = new HashSet<>();
|
||||
Set<String> allMounts = new HashSet<>();
|
||||
for (NodePath nodePath : nodePaths) {
|
||||
Path path = nodePath.path.toAbsolutePath();
|
||||
FsInfo.Path fsPath = FsProbe.getFSInfo(nodePath);
|
||||
String mount = fsPath.getMount();
|
||||
if (allMounts.contains(mount) == false) {
|
||||
allMounts.add(mount);
|
||||
String type = fsPath.getType();
|
||||
if (type != null) {
|
||||
allTypes.add(type);
|
||||
}
|
||||
totFSPath.add(fsPath);
|
||||
}
|
||||
}
|
||||
|
||||
// Just log a 1-line summary:
|
||||
logger.info("using [{}] data paths, mounts [{}], net usable_space [{}], net total_space [{}], types [{}]",
|
||||
nodePaths.length, allMounts, totFSPath.getAvailable(), totFSPath.getTotal(), toString(allTypes));
|
||||
logger.info("using data path: mount [{}], usable_space [{}], total_space [{}], type [{}]",
|
||||
fsPath.getMount(), fsPath.getAvailable(), fsPath.getTotal(), fsPath.getType());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -481,29 +460,15 @@ public final class NodeEnvironment implements Closeable {
|
|||
* scans the node paths and loads existing metadata file. If not found a new meta data will be generated
|
||||
*/
|
||||
private static NodeMetadata loadNodeMetadata(Settings settings, Logger logger,
|
||||
NodePath... nodePaths) throws IOException {
|
||||
final Path[] paths = Arrays.stream(nodePaths).map(np -> np.path).toArray(Path[]::new);
|
||||
NodeMetadata metadata = PersistedClusterStateService.nodeMetadata(paths);
|
||||
NodePath nodePath) throws IOException {
|
||||
final Path path = nodePath.path;
|
||||
NodeMetadata metadata = PersistedClusterStateService.nodeMetadata(path);
|
||||
if (metadata == null) {
|
||||
// load legacy metadata
|
||||
final Set<String> nodeIds = new HashSet<>();
|
||||
for (final Path path : paths) {
|
||||
final NodeMetadata oldStyleMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
|
||||
if (oldStyleMetadata != null) {
|
||||
nodeIds.add(oldStyleMetadata.nodeId());
|
||||
}
|
||||
}
|
||||
if (nodeIds.size() > 1) {
|
||||
throw new IllegalStateException(
|
||||
"data paths " + Arrays.toString(paths) + " belong to multiple nodes with IDs " + nodeIds);
|
||||
}
|
||||
// load legacy metadata
|
||||
final NodeMetadata legacyMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths);
|
||||
final NodeMetadata legacyMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
|
||||
if (legacyMetadata == null) {
|
||||
assert nodeIds.isEmpty() : nodeIds;
|
||||
metadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT);
|
||||
} else {
|
||||
assert nodeIds.equals(Collections.singleton(legacyMetadata.nodeId())) : nodeIds + " doesn't match " + legacyMetadata;
|
||||
metadata = legacyMetadata;
|
||||
}
|
||||
}
|
||||
|
@ -885,7 +850,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
|
||||
public boolean hasNodeFile() {
|
||||
return nodePaths != null && locks != null;
|
||||
return nodePath != null && lock != null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -894,7 +859,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
*/
|
||||
public Path nodeDataPath() {
|
||||
assertEnvIsLocked();
|
||||
return nodePaths[0].path;
|
||||
return nodePath.path;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -920,10 +885,10 @@ public final class NodeEnvironment implements Closeable {
|
|||
*/
|
||||
public NodePath nodePath() {
|
||||
assertEnvIsLocked();
|
||||
if (nodePaths == null || locks == null) {
|
||||
if (nodePath == null || lock == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
return nodePaths[0];
|
||||
return nodePath;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -931,11 +896,9 @@ public final class NodeEnvironment implements Closeable {
|
|||
*/
|
||||
public Path indexPath(Index index) {
|
||||
assertEnvIsLocked();
|
||||
return nodePaths[0].resolve(index);
|
||||
return nodePath.resolve(index);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Returns all shard paths excluding custom shard path. Note: Shards are only allocated on one of the
|
||||
* returned paths. The returned array may contain paths to non-existing directories.
|
||||
|
@ -946,7 +909,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
*/
|
||||
public Path availableShardPath(ShardId shardId) {
|
||||
assertEnvIsLocked();
|
||||
return nodePaths[0].resolve(shardId);
|
||||
return nodePath.resolve(shardId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -961,15 +924,11 @@ public final class NodeEnvironment implements Closeable {
|
|||
* @param excludeIndexPathIdsPredicate folder names to exclude
|
||||
*/
|
||||
public Set<String> availableIndexFolders(Predicate<String> excludeIndexPathIdsPredicate) throws IOException {
|
||||
if (nodePaths == null || locks == null) {
|
||||
if (nodePath == null || lock == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assertEnvIsLocked();
|
||||
Set<String> indexFolders = new HashSet<>();
|
||||
for (NodePath nodePath : nodePaths) {
|
||||
indexFolders.addAll(availableIndexFoldersForPath(nodePath, excludeIndexPathIdsPredicate));
|
||||
}
|
||||
return indexFolders;
|
||||
return availableIndexFoldersForPath(nodePath, excludeIndexPathIdsPredicate);
|
||||
|
||||
}
|
||||
|
||||
|
@ -994,7 +953,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
*/
|
||||
public Set<String> availableIndexFoldersForPath(final NodePath nodePath, Predicate<String> excludeIndexPathIdsPredicate)
|
||||
throws IOException {
|
||||
if (nodePaths == null || locks == null) {
|
||||
if (nodePath == null || lock == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assertEnvIsLocked();
|
||||
|
@ -1017,11 +976,11 @@ public final class NodeEnvironment implements Closeable {
|
|||
* Resolves all existing paths to <code>indexFolderName</code> in ${data.paths}/indices
|
||||
*/
|
||||
public Path resolveIndexFolder(String indexFolderName) {
|
||||
if (nodePaths == null || locks == null) {
|
||||
if (nodePath == null || lock == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assertEnvIsLocked();
|
||||
return nodePaths[0].indicesPath.resolve(indexFolderName);
|
||||
return nodePath.indicesPath.resolve(indexFolderName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1034,44 +993,12 @@ public final class NodeEnvironment implements Closeable {
|
|||
*/
|
||||
public Set<ShardId> findAllShardIds(final Index index) throws IOException {
|
||||
assert index != null;
|
||||
if (nodePaths == null || locks == null) {
|
||||
if (nodePath == null || lock == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assertEnvIsLocked();
|
||||
final Set<ShardId> shardIds = new HashSet<>();
|
||||
final String indexUniquePathId = index.getUUID();
|
||||
for (final NodePath nodePath : nodePaths) {
|
||||
shardIds.addAll(findAllShardsForIndex(nodePath.indicesPath.resolve(indexUniquePathId), index));
|
||||
}
|
||||
return shardIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all the shards for this index, returning a map of the {@code NodePath} to the number of shards on that path
|
||||
* @param index the index by which to filter shards
|
||||
* @return a map of NodePath to count of the shards for the index on that path
|
||||
* @throws IOException if an IOException occurs
|
||||
*/
|
||||
public Map<NodePath, Long> shardCountPerPath(final Index index) throws IOException {
|
||||
assert index != null;
|
||||
if (nodePaths == null || locks == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assertEnvIsLocked();
|
||||
final Map<NodePath, Long> shardCountPerPath = new HashMap<>();
|
||||
final String indexUniquePathId = index.getUUID();
|
||||
for (final NodePath nodePath : nodePaths) {
|
||||
Path indexLocation = nodePath.indicesPath.resolve(indexUniquePathId);
|
||||
if (Files.isDirectory(indexLocation)) {
|
||||
shardCountPerPath.put(nodePath, (long) findAllShardsForIndex(indexLocation, index).size());
|
||||
}
|
||||
}
|
||||
return shardCountPerPath;
|
||||
}
|
||||
|
||||
private static Set<ShardId> findAllShardsForIndex(Path indexPath, Index index) throws IOException {
|
||||
assert indexPath.getFileName().toString().equals(index.getUUID());
|
||||
Set<ShardId> shardIds = new HashSet<>();
|
||||
final Path indexPath = nodePath.indicesPath.resolve(index.getUUID());
|
||||
if (Files.isDirectory(indexPath)) {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
|
||||
for (Path shardPath : stream) {
|
||||
|
@ -1089,8 +1016,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed.compareAndSet(false, true) && locks != null) {
|
||||
for (Lock lock : locks) {
|
||||
if (closed.compareAndSet(false, true) && lock != null) {
|
||||
try {
|
||||
logger.trace("releasing lock [{}]", lock);
|
||||
lock.close();
|
||||
|
@ -1099,12 +1025,10 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void assertEnvIsLocked() {
|
||||
if (closed.get() == false && locks != null) {
|
||||
for (Lock lock : locks) {
|
||||
if (closed.get() == false && lock != null) {
|
||||
try {
|
||||
lock.ensureValid();
|
||||
} catch (IOException e) {
|
||||
|
@ -1113,7 +1037,6 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method tries to write an empty file and moves it using an atomic move operation.
|
||||
|
@ -1121,8 +1044,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
* not supported by the filesystem. This test is executed on each of the data directories.
|
||||
* This method cleans up all files even in the case of an error.
|
||||
*/
|
||||
private static void ensureAtomicMoveSupported(final NodePath[] nodePaths) throws IOException {
|
||||
for (NodePath nodePath : nodePaths) {
|
||||
private static void ensureAtomicMoveSupported(final NodePath nodePath) throws IOException {
|
||||
assert Files.isDirectory(nodePath.path) : nodePath.path + " is not a directory";
|
||||
final Path src = nodePath.path.resolve(TEMP_FILE_NAME + ".tmp");
|
||||
final Path target = nodePath.path.resolve(TEMP_FILE_NAME + ".final");
|
||||
|
@ -1142,10 +1064,9 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
|
||||
List<Path> shardDataPaths = collectShardDataPaths(nodePaths);
|
||||
private void ensureNoShardData(final NodePath nodePath) throws IOException {
|
||||
List<Path> shardDataPaths = collectShardDataPaths(nodePath);
|
||||
if (shardDataPaths.isEmpty() == false) {
|
||||
final String message = String.format(
|
||||
Locale.ROOT,
|
||||
|
@ -1157,8 +1078,8 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException {
|
||||
List<Path> indexMetadataPaths = collectIndexMetadataPaths(nodePaths);
|
||||
private void ensureNoIndexMetadata(final NodePath nodePath) throws IOException {
|
||||
List<Path> indexMetadataPaths = collectIndexMetadataPaths(nodePath);
|
||||
if (indexMetadataPaths.isEmpty() == false) {
|
||||
final String message = String.format(
|
||||
Locale.ROOT,
|
||||
|
@ -1174,8 +1095,8 @@ public final class NodeEnvironment implements Closeable {
|
|||
/**
|
||||
* Collect the paths containing shard data in the indicated node paths. The returned paths will point to the shard data folder.
|
||||
*/
|
||||
static List<Path> collectShardDataPaths(NodePath[] nodePaths) throws IOException {
|
||||
return collectIndexSubPaths(nodePaths, NodeEnvironment::isShardPath);
|
||||
static List<Path> collectShardDataPaths(NodePath nodePath) throws IOException {
|
||||
return collectIndexSubPaths(nodePath, NodeEnvironment::isShardPath);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1183,13 +1104,12 @@ public final class NodeEnvironment implements Closeable {
|
|||
* Collect the paths containing index meta data in the indicated node paths. The returned paths will point to the
|
||||
* {@link MetadataStateFormat#STATE_DIR_NAME} folder
|
||||
*/
|
||||
static List<Path> collectIndexMetadataPaths(NodePath[] nodePaths) throws IOException {
|
||||
return collectIndexSubPaths(nodePaths, NodeEnvironment::isIndexMetadataPath);
|
||||
static List<Path> collectIndexMetadataPaths(NodePath nodePath) throws IOException {
|
||||
return collectIndexSubPaths(nodePath, NodeEnvironment::isIndexMetadataPath);
|
||||
}
|
||||
|
||||
private static List<Path> collectIndexSubPaths(NodePath[] nodePaths, Predicate<Path> subPathPredicate) throws IOException {
|
||||
private static List<Path> collectIndexSubPaths(NodePath nodePath, Predicate<Path> subPathPredicate) throws IOException {
|
||||
List<Path> indexSubPaths = new ArrayList<>();
|
||||
for (NodePath nodePath : nodePaths) {
|
||||
Path indicesPath = nodePath.indicesPath;
|
||||
if (Files.isDirectory(indicesPath)) {
|
||||
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(indicesPath)) {
|
||||
|
@ -1204,7 +1124,6 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return indexSubPaths;
|
||||
}
|
||||
|
|
|
@ -77,10 +77,10 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand {
|
|||
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
|
||||
|
||||
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
|
||||
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
|
||||
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths[0]);
|
||||
|
||||
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting index metadata paths");
|
||||
List<Path> indexMetadataPaths = NodeEnvironment.collectIndexMetadataPaths(nodePaths);
|
||||
List<Path> indexMetadataPaths = NodeEnvironment.collectIndexMetadataPaths(nodePaths[0]);
|
||||
|
||||
Set<Path> indexPaths = uniqueParentPaths(shardDataPaths, indexMetadataPaths);
|
||||
|
||||
|
@ -116,7 +116,7 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand {
|
|||
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
|
||||
|
||||
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
|
||||
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
|
||||
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths[0]);
|
||||
if (shardDataPaths.isEmpty()) {
|
||||
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
|
||||
return;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue