diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 758379aae2f5..e3c90240b5e8 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -64,7 +64,6 @@ import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -144,9 +143,9 @@ public final class NodeEnvironment implements Closeable { } private final Logger logger = LogManager.getLogger(NodeEnvironment.class); - private final NodePath[] nodePaths; + private final NodePath nodePath; private final Path sharedDataPath; - private final Lock[] locks; + private final Lock lock; private final AtomicBoolean closed = new AtomicBoolean(false); private final Map shardLocks = new HashMap<>(); @@ -177,8 +176,8 @@ public final class NodeEnvironment implements Closeable { public static class NodeLock implements Releasable { - private final Lock[] locks; - private final NodePath[] nodePaths; + private final Lock lock; + private final NodePath nodePath; public NodeLock(final Logger logger, @@ -195,18 +194,18 @@ public final class NodeEnvironment implements Closeable { final Environment environment, final CheckedFunction pathFunction, final Function subPathMapping) throws IOException { - nodePaths = new NodePath[1]; - locks = new Lock[1]; try { Path dataDir = environment.dataFile(); Path dir = subPathMapping.apply(dataDir); if (pathFunction.apply(dir) == false) { + lock = null; + nodePath = null; return; } try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) { logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath()); - locks[0] = luceneDir.obtainLock(NODE_LOCK_FILENAME); - nodePaths[0] = new NodePath(dir); + lock = luceneDir.obtainLock(NODE_LOCK_FILENAME); + nodePath = new NodePath(dir); } catch (IOException e) { logger.trace(() -> new ParameterizedMessage( "failed to obtain node lock on {}", dir.toAbsolutePath()), e); @@ -221,17 +220,12 @@ public final class NodeEnvironment implements Closeable { } public NodePath getNodePath() { - return nodePaths[0]; + return nodePath; } @Override public void close() { - for (int i = 0; i < locks.length; i++) { - if (locks[i] != null) { - IOUtils.closeWhileHandlingException(locks[i]); - } - locks[i] = null; - } + IOUtils.closeWhileHandlingException(lock); } } @@ -258,10 +252,10 @@ public final class NodeEnvironment implements Closeable { throw new IllegalStateException(message, e); } - this.locks = nodeLock.locks; - this.nodePaths = nodeLock.nodePaths; + this.lock = nodeLock.lock; + this.nodePath = nodeLock.nodePath; - logger.debug("using node location {}", Arrays.toString(nodePaths)); + logger.debug("using node location {}", nodePath); maybeLogPathDetails(); maybeLogHeapDetails(); @@ -269,7 +263,7 @@ public final class NodeEnvironment implements Closeable { applySegmentInfosTrace(settings); assertCanWrite(); - ensureAtomicMoveSupported(nodePaths); + ensureAtomicMoveSupported(nodePath); if (upgradeLegacyNodeFolders(logger, settings, environment, nodeLock)) { assertCanWrite(); @@ -277,13 +271,13 @@ public final class NodeEnvironment implements Closeable { if (DiscoveryNode.canContainData(settings) == false) { if (DiscoveryNode.isMasterNode(settings) == false) { - ensureNoIndexMetadata(nodePaths); + ensureNoIndexMetadata(nodePath); } - ensureNoShardData(nodePaths); + ensureNoShardData(nodePath); } - this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths); + this.nodeMetadata = loadNodeMetadata(settings, logger, nodePath); success = true; } finally { @@ -430,43 +424,28 @@ public final class NodeEnvironment implements Closeable { if (logger.isDebugEnabled()) { // Log one line per path.data: StringBuilder sb = new StringBuilder(); - for (NodePath nodePath : nodePaths) { - sb.append('\n').append(" -> ").append(nodePath.path.toAbsolutePath()); + sb.append('\n').append(" -> ").append(nodePath.path.toAbsolutePath()); - FsInfo.Path fsPath = FsProbe.getFSInfo(nodePath); - sb.append(", free_space [") - .append(fsPath.getFree()) - .append("], usable_space [") - .append(fsPath.getAvailable()) - .append("], total_space [") - .append(fsPath.getTotal()) - .append("], mount [") - .append(fsPath.getMount()) - .append("], type [") - .append(fsPath.getType()) - .append(']'); - } + FsInfo.Path fsPath = FsProbe.getFSInfo(nodePath); + sb.append(", free_space [") + .append(fsPath.getFree()) + .append("], usable_space [") + .append(fsPath.getAvailable()) + .append("], total_space [") + .append(fsPath.getTotal()) + .append("], mount [") + .append(fsPath.getMount()) + .append("], type [") + .append(fsPath.getType()) + .append(']'); logger.debug("node data locations details:{}", sb); } else if (logger.isInfoEnabled()) { - FsInfo.Path totFSPath = new FsInfo.Path(); - Set allTypes = new HashSet<>(); - Set allMounts = new HashSet<>(); - for (NodePath nodePath : nodePaths) { - FsInfo.Path fsPath = FsProbe.getFSInfo(nodePath); - String mount = fsPath.getMount(); - if (allMounts.contains(mount) == false) { - allMounts.add(mount); - String type = fsPath.getType(); - if (type != null) { - allTypes.add(type); - } - totFSPath.add(fsPath); - } - } + Path path = nodePath.path.toAbsolutePath(); + FsInfo.Path fsPath = FsProbe.getFSInfo(nodePath); // Just log a 1-line summary: - logger.info("using [{}] data paths, mounts [{}], net usable_space [{}], net total_space [{}], types [{}]", - nodePaths.length, allMounts, totFSPath.getAvailable(), totFSPath.getTotal(), toString(allTypes)); + logger.info("using data path: mount [{}], usable_space [{}], total_space [{}], type [{}]", + fsPath.getMount(), fsPath.getAvailable(), fsPath.getTotal(), fsPath.getType()); } } @@ -481,29 +460,15 @@ public final class NodeEnvironment implements Closeable { * scans the node paths and loads existing metadata file. If not found a new meta data will be generated */ private static NodeMetadata loadNodeMetadata(Settings settings, Logger logger, - NodePath... nodePaths) throws IOException { - final Path[] paths = Arrays.stream(nodePaths).map(np -> np.path).toArray(Path[]::new); - NodeMetadata metadata = PersistedClusterStateService.nodeMetadata(paths); + NodePath nodePath) throws IOException { + final Path path = nodePath.path; + NodeMetadata metadata = PersistedClusterStateService.nodeMetadata(path); if (metadata == null) { // load legacy metadata - final Set nodeIds = new HashSet<>(); - for (final Path path : paths) { - final NodeMetadata oldStyleMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); - if (oldStyleMetadata != null) { - nodeIds.add(oldStyleMetadata.nodeId()); - } - } - if (nodeIds.size() > 1) { - throw new IllegalStateException( - "data paths " + Arrays.toString(paths) + " belong to multiple nodes with IDs " + nodeIds); - } - // load legacy metadata - final NodeMetadata legacyMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths); + final NodeMetadata legacyMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); if (legacyMetadata == null) { - assert nodeIds.isEmpty() : nodeIds; metadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT); } else { - assert nodeIds.equals(Collections.singleton(legacyMetadata.nodeId())) : nodeIds + " doesn't match " + legacyMetadata; metadata = legacyMetadata; } } @@ -885,7 +850,7 @@ public final class NodeEnvironment implements Closeable { } public boolean hasNodeFile() { - return nodePaths != null && locks != null; + return nodePath != null && lock != null; } /** @@ -894,7 +859,7 @@ public final class NodeEnvironment implements Closeable { */ public Path nodeDataPath() { assertEnvIsLocked(); - return nodePaths[0].path; + return nodePath.path; } /** @@ -920,10 +885,10 @@ public final class NodeEnvironment implements Closeable { */ public NodePath nodePath() { assertEnvIsLocked(); - if (nodePaths == null || locks == null) { + if (nodePath == null || lock == null) { throw new IllegalStateException("node is not configured to store local location"); } - return nodePaths[0]; + return nodePath; } /** @@ -931,11 +896,9 @@ public final class NodeEnvironment implements Closeable { */ public Path indexPath(Index index) { assertEnvIsLocked(); - return nodePaths[0].resolve(index); + return nodePath.resolve(index); } - - /** * Returns all shard paths excluding custom shard path. Note: Shards are only allocated on one of the * returned paths. The returned array may contain paths to non-existing directories. @@ -946,7 +909,7 @@ public final class NodeEnvironment implements Closeable { */ public Path availableShardPath(ShardId shardId) { assertEnvIsLocked(); - return nodePaths[0].resolve(shardId); + return nodePath.resolve(shardId); } /** @@ -961,15 +924,11 @@ public final class NodeEnvironment implements Closeable { * @param excludeIndexPathIdsPredicate folder names to exclude */ public Set availableIndexFolders(Predicate excludeIndexPathIdsPredicate) throws IOException { - if (nodePaths == null || locks == null) { + if (nodePath == null || lock == null) { throw new IllegalStateException("node is not configured to store local location"); } assertEnvIsLocked(); - Set indexFolders = new HashSet<>(); - for (NodePath nodePath : nodePaths) { - indexFolders.addAll(availableIndexFoldersForPath(nodePath, excludeIndexPathIdsPredicate)); - } - return indexFolders; + return availableIndexFoldersForPath(nodePath, excludeIndexPathIdsPredicate); } @@ -994,7 +953,7 @@ public final class NodeEnvironment implements Closeable { */ public Set availableIndexFoldersForPath(final NodePath nodePath, Predicate excludeIndexPathIdsPredicate) throws IOException { - if (nodePaths == null || locks == null) { + if (nodePath == null || lock == null) { throw new IllegalStateException("node is not configured to store local location"); } assertEnvIsLocked(); @@ -1017,11 +976,11 @@ public final class NodeEnvironment implements Closeable { * Resolves all existing paths to indexFolderName in ${data.paths}/indices */ public Path resolveIndexFolder(String indexFolderName) { - if (nodePaths == null || locks == null) { + if (nodePath == null || lock == null) { throw new IllegalStateException("node is not configured to store local location"); } assertEnvIsLocked(); - return nodePaths[0].indicesPath.resolve(indexFolderName); + return nodePath.indicesPath.resolve(indexFolderName); } /** @@ -1034,44 +993,12 @@ public final class NodeEnvironment implements Closeable { */ public Set findAllShardIds(final Index index) throws IOException { assert index != null; - if (nodePaths == null || locks == null) { + if (nodePath == null || lock == null) { throw new IllegalStateException("node is not configured to store local location"); } assertEnvIsLocked(); final Set shardIds = new HashSet<>(); - final String indexUniquePathId = index.getUUID(); - for (final NodePath nodePath : nodePaths) { - shardIds.addAll(findAllShardsForIndex(nodePath.indicesPath.resolve(indexUniquePathId), index)); - } - return shardIds; - } - - /** - * Find all the shards for this index, returning a map of the {@code NodePath} to the number of shards on that path - * @param index the index by which to filter shards - * @return a map of NodePath to count of the shards for the index on that path - * @throws IOException if an IOException occurs - */ - public Map shardCountPerPath(final Index index) throws IOException { - assert index != null; - if (nodePaths == null || locks == null) { - throw new IllegalStateException("node is not configured to store local location"); - } - assertEnvIsLocked(); - final Map shardCountPerPath = new HashMap<>(); - final String indexUniquePathId = index.getUUID(); - for (final NodePath nodePath : nodePaths) { - Path indexLocation = nodePath.indicesPath.resolve(indexUniquePathId); - if (Files.isDirectory(indexLocation)) { - shardCountPerPath.put(nodePath, (long) findAllShardsForIndex(indexLocation, index).size()); - } - } - return shardCountPerPath; - } - - private static Set findAllShardsForIndex(Path indexPath, Index index) throws IOException { - assert indexPath.getFileName().toString().equals(index.getUUID()); - Set shardIds = new HashSet<>(); + final Path indexPath = nodePath.indicesPath.resolve(index.getUUID()); if (Files.isDirectory(indexPath)) { try (DirectoryStream stream = Files.newDirectoryStream(indexPath)) { for (Path shardPath : stream) { @@ -1089,28 +1016,24 @@ public final class NodeEnvironment implements Closeable { @Override public void close() { - if (closed.compareAndSet(false, true) && locks != null) { - for (Lock lock : locks) { - try { - logger.trace("releasing lock [{}]", lock); - lock.close(); - } catch (IOException e) { - logger.trace(() -> new ParameterizedMessage("failed to release lock [{}]", lock), e); - } + if (closed.compareAndSet(false, true) && lock != null) { + try { + logger.trace("releasing lock [{}]", lock); + lock.close(); + } catch (IOException e) { + logger.trace(() -> new ParameterizedMessage("failed to release lock [{}]", lock), e); } } } private void assertEnvIsLocked() { - if (closed.get() == false && locks != null) { - for (Lock lock : locks) { - try { - lock.ensureValid(); - } catch (IOException e) { - logger.warn("lock assertion failed", e); - throw new IllegalStateException("environment is not locked", e); - } + if (closed.get() == false && lock != null) { + try { + lock.ensureValid(); + } catch (IOException e) { + logger.warn("lock assertion failed", e); + throw new IllegalStateException("environment is not locked", e); } } } @@ -1121,31 +1044,29 @@ public final class NodeEnvironment implements Closeable { * not supported by the filesystem. This test is executed on each of the data directories. * This method cleans up all files even in the case of an error. */ - private static void ensureAtomicMoveSupported(final NodePath[] nodePaths) throws IOException { - for (NodePath nodePath : nodePaths) { - assert Files.isDirectory(nodePath.path) : nodePath.path + " is not a directory"; - final Path src = nodePath.path.resolve(TEMP_FILE_NAME + ".tmp"); - final Path target = nodePath.path.resolve(TEMP_FILE_NAME + ".final"); + private static void ensureAtomicMoveSupported(final NodePath nodePath) throws IOException { + assert Files.isDirectory(nodePath.path) : nodePath.path + " is not a directory"; + final Path src = nodePath.path.resolve(TEMP_FILE_NAME + ".tmp"); + final Path target = nodePath.path.resolve(TEMP_FILE_NAME + ".final"); + try { + Files.deleteIfExists(src); + Files.createFile(src); + Files.move(src, target, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (AtomicMoveNotSupportedException ex) { + throw new IllegalStateException("atomic_move is not supported by the filesystem on path [" + + nodePath.path + + "] atomic_move is required for elasticsearch to work correctly.", ex); + } finally { try { Files.deleteIfExists(src); - Files.createFile(src); - Files.move(src, target, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - } catch (AtomicMoveNotSupportedException ex) { - throw new IllegalStateException("atomic_move is not supported by the filesystem on path [" - + nodePath.path - + "] atomic_move is required for elasticsearch to work correctly.", ex); } finally { - try { - Files.deleteIfExists(src); - } finally { - Files.deleteIfExists(target); - } + Files.deleteIfExists(target); } } } - private void ensureNoShardData(final NodePath[] nodePaths) throws IOException { - List shardDataPaths = collectShardDataPaths(nodePaths); + private void ensureNoShardData(final NodePath nodePath) throws IOException { + List shardDataPaths = collectShardDataPaths(nodePath); if (shardDataPaths.isEmpty() == false) { final String message = String.format( Locale.ROOT, @@ -1157,8 +1078,8 @@ public final class NodeEnvironment implements Closeable { } } - private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException { - List indexMetadataPaths = collectIndexMetadataPaths(nodePaths); + private void ensureNoIndexMetadata(final NodePath nodePath) throws IOException { + List indexMetadataPaths = collectIndexMetadataPaths(nodePath); if (indexMetadataPaths.isEmpty() == false) { final String message = String.format( Locale.ROOT, @@ -1174,8 +1095,8 @@ public final class NodeEnvironment implements Closeable { /** * Collect the paths containing shard data in the indicated node paths. The returned paths will point to the shard data folder. */ - static List collectShardDataPaths(NodePath[] nodePaths) throws IOException { - return collectIndexSubPaths(nodePaths, NodeEnvironment::isShardPath); + static List collectShardDataPaths(NodePath nodePath) throws IOException { + return collectIndexSubPaths(nodePath, NodeEnvironment::isShardPath); } @@ -1183,23 +1104,21 @@ public final class NodeEnvironment implements Closeable { * Collect the paths containing index meta data in the indicated node paths. The returned paths will point to the * {@link MetadataStateFormat#STATE_DIR_NAME} folder */ - static List collectIndexMetadataPaths(NodePath[] nodePaths) throws IOException { - return collectIndexSubPaths(nodePaths, NodeEnvironment::isIndexMetadataPath); + static List collectIndexMetadataPaths(NodePath nodePath) throws IOException { + return collectIndexSubPaths(nodePath, NodeEnvironment::isIndexMetadataPath); } - private static List collectIndexSubPaths(NodePath[] nodePaths, Predicate subPathPredicate) throws IOException { + private static List collectIndexSubPaths(NodePath nodePath, Predicate subPathPredicate) throws IOException { List indexSubPaths = new ArrayList<>(); - for (NodePath nodePath : nodePaths) { - Path indicesPath = nodePath.indicesPath; - if (Files.isDirectory(indicesPath)) { - try (DirectoryStream indexStream = Files.newDirectoryStream(indicesPath)) { - for (Path indexPath : indexStream) { - if (Files.isDirectory(indexPath)) { - try (Stream shardStream = Files.list(indexPath)) { - shardStream.filter(subPathPredicate) - .map(Path::toAbsolutePath) - .forEach(indexSubPaths::add); - } + Path indicesPath = nodePath.indicesPath; + if (Files.isDirectory(indicesPath)) { + try (DirectoryStream indexStream = Files.newDirectoryStream(indicesPath)) { + for (Path indexPath : indexStream) { + if (Files.isDirectory(indexPath)) { + try (Stream shardStream = Files.list(indexPath)) { + shardStream.filter(subPathPredicate) + .map(Path::toAbsolutePath) + .forEach(indexSubPaths::add); } } } diff --git a/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java b/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java index bfd3b2667b2b..9b92638a440f 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java +++ b/server/src/main/java/org/elasticsearch/env/NodeRepurposeCommand.java @@ -77,10 +77,10 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand { NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths); terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths"); - List shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths); + List shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths[0]); terminal.println(Terminal.Verbosity.VERBOSE, "Collecting index metadata paths"); - List indexMetadataPaths = NodeEnvironment.collectIndexMetadataPaths(nodePaths); + List indexMetadataPaths = NodeEnvironment.collectIndexMetadataPaths(nodePaths[0]); Set indexPaths = uniqueParentPaths(shardDataPaths, indexMetadataPaths); @@ -116,7 +116,7 @@ public class NodeRepurposeCommand extends ElasticsearchNodeCommand { NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths); terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths"); - List shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths); + List shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths[0]); if (shardDataPaths.isEmpty()) { terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND); return;