diff --git a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/InstallPluginCommandTests.java b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/InstallPluginCommandTests.java index f62b7ac4e1b9..351633f831cd 100644 --- a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/InstallPluginCommandTests.java +++ b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/InstallPluginCommandTests.java @@ -75,13 +75,10 @@ import java.nio.charset.StandardCharsets; import java.nio.file.DirectoryStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.FileSystem; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; -import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.GroupPrincipal; import java.nio.file.attribute.PosixFileAttributeView; import java.nio.file.attribute.PosixFileAttributes; @@ -106,6 +103,7 @@ import java.util.stream.Stream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -228,14 +226,10 @@ public class InstallPluginCommandTests extends ESTestCase { static Path writeZip(Path structure, String prefix) throws IOException { Path zip = createTempDir().resolve(structure.getFileName() + ".zip"); try (ZipOutputStream stream = new ZipOutputStream(Files.newOutputStream(zip))) { - Files.walkFileTree(structure, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString(); - stream.putNextEntry(new ZipEntry(target)); - Files.copy(file, stream); - return FileVisitResult.CONTINUE; - } + forEachFileRecursively(structure, (file, attrs) -> { + String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString(); + stream.putNextEntry(new ZipEntry(target)); + Files.copy(file, stream); }); } return zip; diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index 5aa1d3d3d517..267bc9831c9a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -171,7 +171,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { ActionFuture future = client(masterNode).admin().cluster() .prepareCreateSnapshot(repoName, snapshot).setWaitForCompletion(true).execute(); - waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueSeconds(10L)); + waitForBlockOnAnyDataNode(repoName); NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT); internalCluster().setDisruptionScheme(networkDisruption); @@ -197,7 +197,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { blockMasterFromFinalizingSnapshotOnIndexFile(repoName); final ActionFuture snapshotFuture = client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).execute(); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(10L)); + waitForBlock(masterNode, repoName); unblockNode(repoName, masterNode); assertFutureThrows(snapshotFuture, SnapshotException.class); @@ -228,7 +228,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { final ActionFuture snapshotResponse = internalCluster().masterClient().admin().cluster() .prepareCreateSnapshot(repoName, "test-snap").setWaitForCompletion(true).execute(); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode, repoName); final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT); internalCluster().setDisruptionScheme(networkDisruption); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 9333d1a7ba6a..26b57c4829e8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -73,11 +73,8 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -92,6 +89,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -441,14 +439,9 @@ public class RelocationIT extends ESIntegTestCase { if (Files.exists(shardLoc)) { assertBusy(() -> { try { - Files.walkFileTree(shardLoc, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - assertThat("found a temporary recovery file: " + file, file.getFileName().toString(), - not(startsWith("recovery."))); - return FileVisitResult.CONTINUE; - } - }); + forEachFileRecursively(shardLoc, + (file, attrs) -> assertThat("found a temporary recovery file: " + file, file.getFileName().toString(), + not(startsWith("recovery.")))); } catch (IOException e) { throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index ad8f50227361..dd629f29313f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -23,8 +23,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; @@ -82,8 +80,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap") .setWaitForCompletion(true).get(); - final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); - final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName); + final BlobStoreRepository repository = getRepositoryOnMaster(repoName); logger.info("--> creating a garbage data blob"); final PlainActionFuture garbageFuture = PlainActionFuture.newFuture(); @@ -91,13 +88,13 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true))); garbageFuture.get(); - final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName); + blockMasterFromFinalizingSnapshotOnIndexFile(repoName); logger.info("--> starting repository cleanup"); client().admin().cluster().prepareCleanupRepository(repoName).execute(); - logger.info("--> waiting for block to kick in on " + masterNode); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60)); + final String masterNode = internalCluster().getMasterName(); + waitForBlock(masterNode, repoName); awaitClusterState(state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress()); return masterNode; @@ -116,9 +113,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.SUCCESS)); } - final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); - final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName); - + final BlobStoreRepository repository = getRepositoryOnMaster(repoName); logger.info("--> write two outdated index-N blobs"); for (int i = 0; i < 2; ++i) { final PlainActionFuture createOldIndexNFuture = PlainActionFuture.newFuture(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index a034e283b2c8..667ce11b05d4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; @@ -79,8 +78,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final String sourceSnapshot = "source-snapshot"; final SnapshotInfo sourceSnapshotInfo = createFullSnapshot(repoName, sourceSnapshot); - final BlobStoreRepository repository = - (BlobStoreRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + final BlobStoreRepository repository = getRepositoryOnMaster(repoName); final RepositoryData repositoryData = getRepositoryData(repoName); final IndexId indexId = repositoryData.resolveIndexId(indexName); final int shardId = 0; @@ -167,7 +165,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final String targetSnapshot = "target-snapshot"; blockNodeOnAnyFiles(repoName, masterName); final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); assertFalse(cloneFuture.isDone()); ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, @@ -201,7 +199,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final String targetSnapshot = "target-snapshot"; final ActionFuture snapshot2Future = startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode, repoName); final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName); awaitNumberOfSnapshotsInProgress(2); unblockNode(repoName, dataNode); @@ -224,7 +222,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final String targetSnapshot = "target-snapshot"; blockMasterOnShardClone(repoName); final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); final String indexFast = "index-fast"; createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100)); @@ -255,7 +253,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { blockDataNode(repoName, dataNode); final ActionFuture snapshotFuture = clusterAdmin() .prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute(); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode, repoName); final String targetSnapshot = "target-snapshot"; assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get()); @@ -282,7 +280,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final String targetSnapshot = "target-snapshot"; blockNodeOnAnyFiles(repoName, masterName); final ActionFuture deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); assertFalse(deleteFuture.isDone()); ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () -> @@ -310,7 +308,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final String targetSnapshot1 = "target-snapshot"; blockMasterOnShardClone(repoName); final ActionFuture cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); assertThat(cloneFuture1.isDone(), is(false)); final int extraClones = randomIntBetween(1, 5); @@ -366,7 +364,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { startCloneFromDataNode(repoName, sourceSnapshot, cloneName, testIndex); awaitNumberOfSnapshotsInProgress(1); final String masterNode = internalCluster().getMasterName(); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); internalCluster().restartNode(masterNode); boolean cloneSucceeded = false; try { @@ -377,7 +375,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { // snapshot on disconnect slowly enough for it to work out } - awaitNoMoreRunningOperations(internalCluster().getMasterName()); + awaitNoMoreRunningOperations(); // Check if the clone operation worked out by chance as a result of the clone request being retried because of the master failover cloneSucceeded = cloneSucceeded || @@ -418,10 +416,10 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final ActionFuture cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex); awaitNumberOfSnapshotsInProgress(1); final String masterNode = internalCluster().getMasterName(); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); internalCluster().restartNode(masterNode); expectThrows(SnapshotException.class, cloneFuture::actionGet); - awaitNoMoreRunningOperations(internalCluster().getMasterName()); + awaitNoMoreRunningOperations(); assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2); } @@ -443,10 +441,10 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final ActionFuture cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex); awaitNumberOfSnapshotsInProgress(1); final String masterNode = internalCluster().getMasterName(); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); unblockNode(repoName, masterNode); expectThrows(SnapshotException.class, cloneFuture::actionGet); - awaitNoMoreRunningOperations(internalCluster().getMasterName()); + awaitNoMoreRunningOperations(); assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1); assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); } @@ -465,7 +463,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { final ActionFuture sourceSnapshotFuture = masterClient.admin().cluster() .prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute(); awaitNumberOfSnapshotsInProgress(1); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode, repoName); internalCluster().restartNode(dataNode); assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL)); @@ -490,7 +488,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { blockMasterOnWriteIndexFile(repoName); final String cloneName = "clone-blocked"; final ActionFuture blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); awaitNumberOfSnapshotsInProgress(1); blockNodeOnAnyFiles(repoName, dataNode); final ActionFuture otherSnapshot = startFullSnapshot(repoName, "other-snapshot"); @@ -520,7 +518,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { blockMasterOnWriteIndexFile(repoName); final String cloneName = "clone-blocked"; final ActionFuture blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); awaitNumberOfSnapshotsInProgress(1); final String otherCloneName = "other-clone"; final ActionFuture otherClone = startClone(repoName, sourceSnapshot, otherCloneName, indexName); @@ -549,7 +547,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { blockMasterOnWriteIndexFile(repoName); final ActionFuture blockedSnapshot = startFullSnapshot(repoName, "snap-blocked"); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); awaitNumberOfSnapshotsInProgress(1); final String cloneName = "clone"; final ActionFuture clone = startClone(repoName, sourceSnapshot, cloneName, indexName); @@ -589,13 +587,11 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase { } private void blockMasterOnReadIndexMeta(String repoName) { - ((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName)) - .setBlockOnReadIndexMeta(); + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockOnReadIndexMeta(); } private void blockMasterOnShardClone(String repoName) { - ((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName)) - .setBlockOnWriteShardLevelMeta(); + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta(); } /** diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 6e206a4a146f..1ec29efe77e4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -36,7 +36,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.discovery.AbstractDisruptionTestCase; import org.elasticsearch.plugins.Plugin; @@ -283,7 +282,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockMasterFromFinalizingSnapshotOnIndexFile(repoName); final ActionFuture deleteFuture = startDeleteSnapshot(repoName, firstSnapshot); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); final ActionFuture snapshotFuture = startFullSnapshot(repoName, "second-snapshot"); @@ -416,7 +415,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { final String firstSnapshot = "snapshot-one"; blockDataNode(repoName, dataNode); final ActionFuture firstSnapshotResponse = startFullSnapshotFromNonMasterClient(repoName, firstSnapshot); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode, repoName); final String dataNode2 = internalCluster().startDataOnlyNode(); ensureStableCluster(5); @@ -437,7 +436,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockNodeOnAnyFiles(repoName, dataNode2); final ActionFuture snapshotThreeFuture = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three"); - waitForBlock(dataNode2, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode2, repoName); assertThat(firstSnapshotResponse.isDone(), is(false)); assertThat(secondSnapshotResponse.isDone(), is(false)); @@ -508,7 +507,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockDataNode(repoName, dataNode); final ActionFuture firstSnapshotResponse = startFullSnapshotFromMasterClient(repoName, "snapshot-one"); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode, repoName); internalCluster().startDataOnlyNode(); ensureStableCluster(3); @@ -533,7 +532,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockMasterFromFinalizingSnapshotOnIndexFile(repoName); final ActionFuture firstDeleteFuture = startDeleteSnapshot(repoName, "*"); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); final ActionFuture snapshotFuture = startFullSnapshot(repoName, "snapshot-queued"); awaitNumberOfSnapshotsInProgress(1); @@ -616,7 +615,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockNodeOnAnyFiles(repoName, masterNode); ActionFuture firstDeleteFuture = client(masterNode).admin().cluster() .prepareDeleteSnapshot(repoName, "*").execute(); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); final ActionFuture createThirdSnapshot = client(masterNode).admin().cluster() .prepareCreateSnapshot(repoName, "snapshot-three").setWaitForCompletion(true).execute(); @@ -654,7 +653,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockMasterFromFinalizingSnapshotOnIndexFile(repoName); final ActionFuture firstFailedSnapshotFuture = startFullSnapshotFromMasterClient(repoName, "failing-snapshot-1"); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); final ActionFuture secondFailedSnapshotFuture = startFullSnapshotFromMasterClient(repoName, "failing-snapshot-2"); awaitNumberOfSnapshotsInProgress(2); @@ -720,7 +719,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { final String masterNode = internalCluster().getMasterName(); blockNodeOnAnyFiles(repoName, masterNode); final ActionFuture snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three"); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); corruptIndexN(repoPath, generation); @@ -748,7 +747,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { final String masterNode = internalCluster().getMasterName(); blockMasterFromFinalizingSnapshotOnIndexFile(repoName); final ActionFuture snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three"); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); corruptIndexN(repoPath, generation); @@ -784,7 +783,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { createNSnapshots(blockedRepoName, randomIntBetween(1, 5)); blockNodeOnAnyFiles(blockedRepoName, masterNode); final ActionFuture deleteFuture = startDeleteFromNonMasterClient(blockedRepoName, "*"); - waitForBlock(masterNode, blockedRepoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, blockedRepoName); awaitNDeletionsInProgress(1); final ActionFuture createBlockedSnapshot = startFullSnapshotFromNonMasterClient(blockedRepoName, "queued-snapshot"); @@ -793,7 +792,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { final long generation = getRepositoryData(repoName).getGenId(); blockNodeOnAnyFiles(repoName, masterNode); final ActionFuture snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three"); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); awaitNumberOfSnapshotsInProgress(2); corruptIndexN(repoPath, generation); @@ -893,9 +892,9 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { snapOneResponse.get(); snapTwoResponse.get(); - logger.info("--> wait for snapshot to complete"); + awaitNoMoreRunningOperations(); for (String snapshot : Arrays.asList(snapshotOne, snapshotTwo)) { - SnapshotInfo snapshotInfo = waitForCompletion(repoName, snapshot, TimeValue.timeValueSeconds(600)); + SnapshotInfo snapshotInfo = getSnapshot(repoName, snapshot); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo.shardFailures().size(), equalTo(0)); } @@ -958,7 +957,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { final String snapshotName = "snap-name"; blockMasterFromDeletingIndexNFile(repoName); final ActionFuture snapshotFuture = startFullSnapshot(repoName, snapshotName); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); final ActionFuture deleteFuture = startDeleteSnapshot(repoName, snapshotName); awaitNDeletionsInProgress(1); unblockNode(repoName, masterName); @@ -980,7 +979,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { for (int i = 0; i < deletes; ++i) { deleteResponses.add(client().admin().cluster().prepareDeleteSnapshot(repoName, "*").execute()); } - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); awaitNDeletionsInProgress(1); for (ActionFuture deleteResponse : deleteResponses) { assertFalse(deleteResponse.isDone()); @@ -1005,7 +1004,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { final String masterName = internalCluster().getMasterName(); blockMasterFromDeletingIndexNFile(repoName); final ActionFuture snapshotThree = startFullSnapshotFromMasterClient(repoName, "snap-other"); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); final String snapshotOne = snapshotNames.get(0); final ActionFuture deleteSnapshotOne = startDeleteSnapshot(repoName, snapshotOne); @@ -1114,8 +1113,8 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { awaitNumberOfSnapshotsInProgress(4); final String initialMaster = internalCluster().getMasterName(); - waitForBlock(initialMaster, repoName, TimeValue.timeValueSeconds(30L)); - waitForBlock(initialMaster, otherRepoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(initialMaster, repoName); + waitForBlock(initialMaster, otherRepoName); internalCluster().stopCurrentMasterNode(); ensureStableCluster(3, dataNode); @@ -1155,7 +1154,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { if (blockedDelete) { awaitNDeletionsInProgress(1); } - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); final String expectedFailureMessage = "Cannot start another operation, already running [" + limitToTest + "] operations and the current limit for concurrent snapshot operations is set to [" + limitToTest + "]"; @@ -1215,7 +1214,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockMasterFromFinalizingSnapshotOnIndexFile(repoName); final String snapshotName = "snap-1"; final ActionFuture snapshotFuture = startFullSnapshot(repoName, snapshotName); - waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterNode, repoName); final ActionFuture deleteFuture = startDeleteSnapshot(repoName, snapshotName); awaitNDeletionsInProgress(1); unblockNode(repoName, masterNode); @@ -1259,7 +1258,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { blockMasterOnWriteIndexFile(repoName); final ActionFuture blockedSnapshot = startFullSnapshot(repoName, "snap-blocked"); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); awaitNumberOfSnapshotsInProgress(1); blockNodeOnAnyFiles(repoName, dataNode); final ActionFuture otherSnapshot = startFullSnapshot(repoName, "other-snapshot"); @@ -1298,10 +1297,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { return snapshotNames; } - private void awaitNoMoreRunningOperations() throws Exception { - awaitNoMoreRunningOperations(internalCluster().getMasterName()); - } - private ActionFuture startDeleteFromNonMasterClient(String repoName, String snapshotName) { logger.info("--> deleting snapshot [{}] from repo [{}] from non master client", snapshotName, repoName); return internalCluster().nonMasterClient().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(); @@ -1357,19 +1352,19 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { } private ActionFuture startAndBlockOnDeleteSnapshot(String repoName, String snapshotName) - throws InterruptedException { + throws Exception { final String masterName = internalCluster().getMasterName(); blockNodeOnAnyFiles(repoName, masterName); final ActionFuture fut = startDeleteSnapshot(repoName, snapshotName); - waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(masterName, repoName); return fut; } private ActionFuture startAndBlockFailingFullSnapshot(String blockedRepoName, String snapshotName) - throws InterruptedException { + throws Exception { blockMasterFromFinalizingSnapshotOnIndexFile(blockedRepoName); final ActionFuture fut = startFullSnapshot(blockedRepoName, snapshotName); - waitForBlock(internalCluster().getMasterName(), blockedRepoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(internalCluster().getMasterName(), blockedRepoName); return fut; } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index 76d12a6d387c..a3b8108d67f2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; @@ -148,7 +147,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + final Repository repository = getRepositoryOnMaster(repoName); logger.info("--> move index-N blob to next generation"); final RepositoryData repositoryData = getRepositoryData(repoName); @@ -262,7 +261,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); logger.info("--> corrupt index-N blob"); - final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + final Repository repository = getRepositoryOnMaster(repoName); final RepositoryData repositoryData = getRepositoryData(repoName); Files.write(repo.resolve("index-" + repositoryData.getGenId()), randomByteArrayOfLength(randomIntBetween(1, 100))); @@ -272,7 +271,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas final String otherRepoName = "other-repo"; createRepository(otherRepoName, "fs", Settings.builder() .put("location", repo).put("compress", false)); - final Repository otherRepo = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(otherRepoName); + final Repository otherRepo = getRepositoryOnMaster(otherRepoName); logger.info("--> verify loading repository data from newly mounted repository throws RepositoryException"); expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo)); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 8dda62c6631a..d3fc4e289a5f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -88,11 +87,8 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -351,7 +347,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest .get(); logger.info("--> waiting for block to kick in"); - waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + waitForBlock(blockedNode, "test-repo"); logger.info("--> execution was blocked on node [{}], shutting it down", blockedNode); unblockNode("test-repo", blockedNode); @@ -359,9 +355,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest logger.info("--> stopping node [{}]", blockedNode); stopNode(blockedNode); logger.info("--> waiting for completion"); - SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60)); - logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); - logger.info("--> done"); + awaitNoMoreRunningOperations(); + logger.info("Number of failed shards [{}]", getSnapshot("test-repo", "test-snap").shardFailures().size()); } public void testSnapshotWithStuckNode() throws Exception { @@ -391,7 +386,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest .get(); logger.info("--> waiting for block to kick in"); - waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + waitForBlock(blockedNode, "test-repo"); logger.info("--> execution was blocked on node [{}], aborting snapshot", blockedNode); @@ -739,15 +734,13 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest final int numberOfShards = getNumShards("test-idx").numPrimaries; logger.info("number of shards: {}", numberOfShards); - final String masterNode = blockMasterFromFinalizingSnapshotOnSnapFile("test-repo"); + blockMasterFromFinalizingSnapshotOnSnapFile("test-repo"); final String dataNode = blockNodeWithIndex("test-repo", "test-idx"); dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false) .setIndices("test-idx").get(); - logger.info("--> stopping data node {}", dataNode); stopNode(dataNode); - logger.info("--> stopping master node {} ", masterNode); internalCluster().stopCurrentMasterNode(); logger.info("--> wait until the snapshot is done"); @@ -1133,7 +1126,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest blockAllDataNodes(repoName); final String snapshotName = "test-snap"; final ActionFuture snapshotResponse = startFullSnapshot(repoName, snapshotName); - waitForBlock(dataNodeName, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNodeName, repoName); final AtomicBoolean blocked = new AtomicBoolean(true); @@ -1211,8 +1204,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest assertAcked(client().admin().indices().prepareDelete(indexName)); - logger.info("--> wait for snapshot to complete"); - SnapshotInfo snapshotInfo = waitForCompletion(repoName, "test-snap", TimeValue.timeValueSeconds(600)); + awaitNoMoreRunningOperations(); + SnapshotInfo snapshotInfo = getSnapshot(repoName, "test-snap"); assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL)); assertThat(snapshotInfo.shardFailures().size(), greaterThan(0)); logger.info("--> done"); @@ -1230,32 +1223,22 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest private static List findRepoMetaBlobs(Path repoPath) throws IOException { List files = new ArrayList<>(); - Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor<>() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - final String fileName = file.getFileName().toString(); - if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) { - files.add(file); - } - return super.visitFile(file, attrs); - } + forEachFileRecursively(repoPath.resolve("indices"), ((file, basicFileAttributes) -> { + final String fileName = file.getFileName().toString(); + if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) { + files.add(file); } - ); + })); return files; } private List scanSnapshotFolder(Path repoPath) throws IOException { List files = new ArrayList<>(); - Files.walkFileTree(repoPath, new SimpleFileVisitor(){ - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if (file.getFileName().toString().startsWith("__")){ - files.add(file); - } - return super.visitFile(file, attrs); - } + forEachFileRecursively(repoPath.resolve("indices"), ((file, basicFileAttributes) -> { + if (file.getFileName().toString().startsWith("__")){ + files.add(file); } - ); + })); return files; } @@ -1288,7 +1271,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest return fromXContent(SnapshottableMetadata::new, parser); } - @Override public EnumSet context() { return Metadata.API_AND_SNAPSHOT; diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index aca33b4e4e98..f0e9165fe7f1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -51,6 +51,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Numbers; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; @@ -132,9 +133,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } } if (!indicesToFlush.isEmpty()) { - String[] indices = indicesToFlush.toArray(new String[indicesToFlush.size()]); - logger.info("--> starting asynchronous flush for indices {}", Arrays.toString(indices)); - flushResponseFuture = client().admin().indices().prepareFlush(indices).execute(); + logger.info("--> starting asynchronous flush for indices {}", indicesToFlush); + flushResponseFuture = client().admin().indices().prepareFlush(indicesToFlush.toArray(Strings.EMPTY_ARRAY)).execute(); } } @@ -792,18 +792,14 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); logger.info("--> waiting for block to kick in"); - waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + waitForBlock(blockedNode, "test-repo"); logger.info("--> execution was blocked on node [{}], moving shards away from this node", blockedNode); Settings.Builder excludeSettings = Settings.builder().put("index.routing.allocation.exclude._name", blockedNode); client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings).get(); - logger.info("--> unblocking blocked node"); unblockNode("test-repo", blockedNode); - logger.info("--> waiting for completion"); - logger.info("Number of failed shards [{}]", - waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size()); - logger.info("--> done"); + awaitNoMoreRunningOperations(); final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); @@ -842,7 +838,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); logger.info("--> waiting for block to kick in"); - waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + waitForBlock(blockedNode, "test-repo"); logger.info("--> execution was blocked on node [{}], trying to delete repository", blockedNode); @@ -871,10 +867,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> unblocking blocked node"); unblockNode("test-repo", blockedNode); - logger.info("--> waiting for completion"); - logger.info("Number of failed shards [{}]", - waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size()); - logger.info("--> done"); + awaitNoMoreRunningOperations(); final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); @@ -1003,7 +996,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .setWaitForCompletion(false).setIncludeGlobalState(false).setIndices("test-idx").get(); logger.info("--> waiting for block to kick in"); - waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + waitForBlock(blockedNode, "test-repo"); logger.info("--> execution was blocked on node [{}], checking snapshot status with specified repository and snapshot", blockedNode); @@ -1046,10 +1039,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> unblocking blocked node"); unblockNode("test-repo", blockedNode); - snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); + awaitNoMoreRunningOperations(); + snapshotInfo = getSnapshot("test-repo", "test-snap"); logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); - logger.info("--> done"); - logger.info("--> checking snapshot status again after snapshot is done"); response = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap").execute().actionGet(); @@ -1115,8 +1107,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> snapshot"); client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); - logger.info("--> wait for snapshot to complete"); - SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); + awaitNoMoreRunningOperations(); + SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo.shardFailures().size(), equalTo(0)); logger.info("--> done"); @@ -1188,7 +1180,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas ActionFuture future = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute(); logger.info("--> wait for block to kick in"); - waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + waitForBlockOnAnyDataNode("test-repo"); try { // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed @@ -1246,7 +1238,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .execute(); logger.info("--> waiting for block to kick in"); - waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + waitForBlockOnAnyDataNode("test-repo"); logger.info("--> close index while restore is running"); try { @@ -1302,7 +1294,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .execute(); logger.info("--> waiting for block to kick in"); - waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueMinutes(1)); + waitForBlockOnAnyDataNode(repoName); logger.info("--> try deleting the snapshot while the restore is in progress (should throw an error)"); ConcurrentSnapshotExecutionException e = expectThrows(ConcurrentSnapshotExecutionException.class, () -> @@ -1601,7 +1593,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); - waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); + waitForBlock(blockedNode, repo); logger.info("--> removing primary shard that is being snapshotted"); ClusterState clusterState = internalCluster().clusterService(internalCluster().getMasterName()).state(); @@ -1616,7 +1608,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas unblockNode(repo, blockedNode); logger.info("--> ensuring snapshot is aborted and the aborted shard was marked as failed"); - SnapshotInfo snapshotInfo = waitForCompletion(repo, snapshot, TimeValue.timeValueSeconds(60)); + awaitNoMoreRunningOperations(); + SnapshotInfo snapshotInfo = getSnapshot(repo, snapshot); assertEquals(1, snapshotInfo.shardFailures().size()); assertEquals(0, snapshotInfo.shardFailures().get(0).shardId()); assertThat(snapshotInfo.shardFailures().get(0).reason(), is("aborted")); @@ -1749,7 +1742,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } } - public void testSnapshottingWithMissingSequenceNumbers() { + public void testSnapshottingWithMissingSequenceNumbers() throws Exception { final String repositoryName = "test-repo"; final String snapshotName = "test-snap"; final String indexName = "test-idx"; @@ -1783,7 +1776,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> restore all indices from the snapshot"); RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") - .setWaitForCompletion(true).execute().actionGet(); + .setWaitForCompletion(true).execute().get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().get(); @@ -1899,7 +1892,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(getSnapshotsResponse.getSnapshots("test-repo"), empty()); } - public void testHiddenIndicesIncludedInSnapshot() throws InterruptedException { + public void testHiddenIndicesIncludedInSnapshot() throws Exception { Client client = client(); final String normalIndex = "normal-index"; final String hiddenIndex = "hidden-index"; @@ -1943,7 +1936,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .prepareRestoreSnapshot(repoName, snapName) .setWaitForCompletion(true) .setIndices("*") - .execute().actionGet(); + .execute().get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards())); @@ -1961,7 +1954,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .prepareRestoreSnapshot(repoName, snapName) .setWaitForCompletion(true) .setIndices("*", "-.*") - .execute().actionGet(); + .execute().get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards())); @@ -1979,7 +1972,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .prepareRestoreSnapshot(repoName, snapName) .setWaitForCompletion(true) .setIndices("hid*") - .execute().actionGet(); + .execute().get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards())); @@ -2013,8 +2006,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas final String repoName = "test-repo"; final Path repoPath = randomRepoPath(); createRepository(repoName, "mock", repoPath); - final MockRepository repository = - (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + final MockRepository repository = getRepositoryOnMaster(repoName); repository.setFailOnIndexLatest(true); createFullSnapshot(repoName, "snapshot-1"); repository.setFailOnIndexLatest(false); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java index 495b6e6d427d..b49f35c88e1b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.snapshots; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -63,7 +62,7 @@ public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase { .setWaitForCompletion(false) .setIndices("test-index") .get(); - waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + waitForBlock(blockedNode, "test-repo"); final SnapshotId snapshotId = getSnapshot("test-repo", "test-snap").snapshotId(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java index eb0f17068152..e09c9bfe001c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java @@ -34,7 +34,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.threadpool.ThreadPool; @@ -67,7 +66,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { .build(); } - public void testStatusApiConsistency() { + public void testStatusApiConsistency() throws Exception { createRepository("test-repo", "fs"); createIndex("test-idx-1", "test-idx-2", "test-idx-3"); @@ -90,7 +89,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { assertThat(snapshotInfo.version(), equalTo(Version.CURRENT)); final List snapshotStatus = clusterAdmin().snapshotsStatus( - new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots(); + new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).get().getSnapshots(); assertThat(snapshotStatus.size(), equalTo(1)); final SnapshotStatus snStatus = snapshotStatus.get(0); assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime()); @@ -113,7 +112,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { ActionFuture createSnapshotResponseActionFuture = startFullSnapshot("test-repo", "test-snap"); logger.info("--> wait for data nodes to get blocked"); - waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + waitForBlockOnAnyDataNode("test-repo"); awaitNumberOfSnapshotsInProgress(1); assertEquals(SnapshotsInProgress.State.STARTED, client().admin().cluster().prepareSnapshotStatus("test-repo") .setSnapshots("test-snap").get().getSnapshots().get(0).getState()); @@ -121,8 +120,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { logger.info("--> unblock all data nodes"); unblockAllDataNodes("test-repo"); - logger.info("--> wait for snapshot to finish"); - createSnapshotResponseActionFuture.actionGet(); + assertSuccessful(createSnapshotResponseActionFuture); } public void testExceptionOnMissingSnapBlob() throws IOException { @@ -247,7 +245,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { final ActionFuture responseSnapshotTwo = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotTwo).setWaitForCompletion(true).execute(); - waitForBlock(dataNodeTwo, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNodeTwo, repoName); assertBusy(() -> { final SnapshotStatus snapshotStatusOne = getSnapshotStatus(repoName, snapshotOne); @@ -419,7 +417,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { .setWaitForCompletion(false) .setIndices(indexName) .get(); - waitForBlock(initialBlockedNode, repositoryName, TimeValue.timeValueSeconds(60)); // wait for block to kick in + waitForBlock(initialBlockedNode, repositoryName); // wait for block to kick in getSnapshotsResponse = client.admin().cluster() .prepareGetSnapshots("test-repo") .setSnapshots(randomFrom("_all", "_current", "snap-on-*", "*-on-empty-repo", "snap-on-empty-repo")) @@ -458,7 +456,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { .setWaitForCompletion(false) .setIndices(indexName) .get(); - waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60)); // wait for block to kick in + waitForBlock(blockedNode, repositoryName); // wait for block to kick in logger.info("--> get all snapshots with a current in-progress"); // with ignore unavailable set to true, should not throw an exception @@ -513,7 +511,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { .collect(Collectors.toList()), equalTo(sortedNames)); unblockNode(repositoryName, blockedNode); // unblock node - waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60)); + awaitNoMoreRunningOperations(); } private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index a5f413330c37..16a19c452629 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -21,7 +21,6 @@ package org.elasticsearch.snapshots; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.PlainActionFuture; @@ -36,6 +35,7 @@ import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -150,7 +150,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } protected RepositoryData getRepositoryData(String repository) { - return getRepositoryData(internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repository)); + return getRepositoryData((Repository) getRepositoryOnMaster(repository)); } protected RepositoryData getRepositoryData(Repository repository) { @@ -169,103 +169,56 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { public static void assertFileCount(Path dir, int expectedCount) throws IOException { final List found = new ArrayList<>(); - Files.walkFileTree(dir, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - found.add(file); - return FileVisitResult.CONTINUE; - } - }); + forEachFileRecursively(dir, ((path, basicFileAttributes) -> found.add(path))); assertEquals("Unexpected file count, found: [" + found + "].", expectedCount, found.size()); } public static int numberOfFiles(Path dir) throws IOException { final AtomicInteger count = new AtomicInteger(); - Files.walkFileTree(dir, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - count.incrementAndGet(); - return FileVisitResult.CONTINUE; - } - }); + forEachFileRecursively(dir, ((path, basicFileAttributes) -> count.incrementAndGet())); return count.get(); } - public static void stopNode(final String node) throws IOException { + protected void stopNode(final String node) throws IOException { + logger.info("--> stopping node {}", node); internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(node)); } - public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException { + public void waitForBlock(String node, String repository) throws Exception { logger.info("--> waiting for [{}] to be blocked on node [{}]", repository, node); - long start = System.currentTimeMillis(); - RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node); - MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); - while (System.currentTimeMillis() - start < timeout.millis()) { - if (mockRepository.blocked()) { - return; - } - Thread.sleep(100); - } - fail("Timeout waiting for node [" + node + "] to be blocked"); + MockRepository mockRepository = getRepositoryOnNode(repository, node); + assertBusy(() -> assertTrue(mockRepository.blocked()), 30L, TimeUnit.SECONDS); } - public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeout.millis()) { - final SnapshotInfo snapshotInfo = getSnapshot(repository, snapshotName); - if (snapshotInfo.state().completed()) { - // Make sure that snapshot clean up operations are finished - ClusterStateResponse stateResponse = clusterAdmin().prepareState().get(); - boolean found = false; - for (SnapshotsInProgress.Entry entry : - stateResponse.getState().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { - final Snapshot curr = entry.snapshot(); - if (curr.getRepository().equals(repository) && curr.getSnapshotId().getName().equals(snapshotName)) { - found = true; - break; - } - } - if (found == false) { - return snapshotInfo; - } - } - Thread.sleep(100); - } - fail("Timeout!!!"); - return null; + public static void blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) { + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repositoryName).setBlockAndFailOnWriteIndexFile(); } - public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) { - final String masterName = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) - .repository(repositoryName)).setBlockAndFailOnWriteIndexFile(); - return masterName; - } - - public static String blockMasterOnWriteIndexFile(final String repositoryName) { - final String masterName = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getMasterNodeInstance(RepositoriesService.class) - .repository(repositoryName)).setBlockOnWriteIndexFile(); - return masterName; + public static void blockMasterOnWriteIndexFile(final String repositoryName) { + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repositoryName).setBlockOnWriteIndexFile(); } public static void blockMasterFromDeletingIndexNFile(String repositoryName) { - final String masterName = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) - .repository(repositoryName)).setBlockOnDeleteIndexFile(); + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repositoryName).setBlockOnDeleteIndexFile(); } - public static String blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) { - final String masterName = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) - .repository(repositoryName)).setBlockAndFailOnWriteSnapFiles(true); - return masterName; + public static void blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) { + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repositoryName).setBlockAndFailOnWriteSnapFiles(); + } + + @SuppressWarnings("unchecked") + protected static T getRepositoryOnMaster(String repositoryName) { + return ((T) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repositoryName)); + } + + @SuppressWarnings("unchecked") + protected static T getRepositoryOnNode(String repositoryName, String nodeName) { + return ((T) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repositoryName)); } public static String blockNodeWithIndex(final String repositoryName, final String indexName) { - for(String node : internalCluster().nodesInclude(indexName)) { - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName)) - .blockOnDataFiles(true); + for (String node : internalCluster().nodesInclude(indexName)) { + AbstractSnapshotIntegTestCase.getRepositoryOnNode(repositoryName, node).blockOnDataFiles(); return node; } fail("No nodes for the index " + indexName + " found"); @@ -273,24 +226,22 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } public static void blockNodeOnAnyFiles(String repository, String nodeName) { - ((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName) - .repository(repository)).setBlockOnAnyFiles(true); + AbstractSnapshotIntegTestCase.getRepositoryOnNode(repository, nodeName).setBlockOnAnyFiles(); } public static void blockDataNode(String repository, String nodeName) { - ((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName) - .repository(repository)).blockOnDataFiles(true); + AbstractSnapshotIntegTestCase.getRepositoryOnNode(repository, nodeName).blockOnDataFiles(); } public static void blockAllDataNodes(String repository) { - for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { - ((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true); + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + ((MockRepository) repositoriesService.repository(repository)).blockOnDataFiles(); } } public static void unblockAllDataNodes(String repository) { - for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { - ((MockRepository)repositoriesService.repository(repository)).unblock(); + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + ((MockRepository) repositoriesService.repository(repository)).unblock(); } } @@ -301,7 +252,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } } - public static void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException { + public static void waitForBlockOnAnyDataNode(String repository) throws InterruptedException { final boolean blocked = waitUntil(() -> { for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); @@ -310,14 +261,14 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } } return false; - }, timeout.millis(), TimeUnit.MILLISECONDS); + }, 30L, TimeUnit.SECONDS); assertTrue("No repository is blocked waiting on a data node", blocked); } public void unblockNode(final String repository, final String node) { logger.info("--> unblocking [{}] on node [{}]", repository, node); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repository)).unblock(); + AbstractSnapshotIntegTestCase.getRepositoryOnNode(repository, node).unblock(); } protected void createRepository(String repoName, String type, Settings.Builder settings) { @@ -454,7 +405,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { assertNotNull(initialRepoMetadata); assertThat("We can only manually insert a snapshot into a repository that does not have a generation tracked in the CS", initialRepoMetadata.generation(), is(RepositoryData.UNKNOWN_REPO_GEN)); - final Repository repo = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + final Repository repo = getRepositoryOnMaster(repoName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID(random())); logger.info("--> adding old version FAILED snapshot [{}] to repository [{}]", snapshotId, repoName); final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, @@ -467,6 +418,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { SnapshotsService.OLD_SNAPSHOT_FORMAT, Function.identity(), f)); } + protected void awaitNoMoreRunningOperations() throws Exception { + awaitNoMoreRunningOperations(internalCluster().getMasterName()); + } + protected void awaitNoMoreRunningOperations(String viaNode) throws Exception { logger.info("--> verify no more operations in the cluster state"); awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() && @@ -504,10 +459,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } protected ActionFuture startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName, - String dataNode) throws InterruptedException { + String dataNode) throws Exception { blockDataNode(repoName, dataNode); final ActionFuture fut = startFullSnapshot(repoName, snapshotName); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + waitForBlock(dataNode, repoName); return fut; } @@ -527,7 +482,8 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count); } - protected static SnapshotInfo assertSuccessful(ActionFuture future) throws Exception { + protected SnapshotInfo assertSuccessful(ActionFuture future) throws Exception { + logger.info("--> wait for snapshot to finish"); final SnapshotInfo snapshotInfo = future.get().getSnapshotInfo(); assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); return snapshotInfo; @@ -582,7 +538,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { protected void awaitMasterFinishRepoOperations() throws Exception { logger.info("--> waiting for master to finish all repo operations on its SNAPSHOT pool"); - final ThreadPool masterThreadPool = internalCluster().getMasterNodeInstance(ThreadPool.class); + final ThreadPool masterThreadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class); assertBusy(() -> { for (ThreadPoolStats.Stats stat : masterThreadPool.stats()) { if (ThreadPool.Names.SNAPSHOT.equals(stat.getName())) { @@ -592,4 +548,15 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } }); } + + public static void forEachFileRecursively(Path path, + CheckedBiConsumer forEach) throws IOException { + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + forEach.accept(file, attrs); + return FileVisitResult.CONTINUE; + } + }); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 6ed291f0da18..c9516ea59fe7 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -208,16 +208,16 @@ public class MockRepository extends FsRepository { this.notifyAll(); } - public void blockOnDataFiles(boolean blocked) { - blockOnDataFiles = blocked; + public void blockOnDataFiles() { + blockOnDataFiles = true; } - public void setBlockOnAnyFiles(boolean blocked) { - blockOnAnyFiles = blocked; + public void setBlockOnAnyFiles() { + blockOnAnyFiles = true; } - public void setBlockAndFailOnWriteSnapFiles(boolean blocked) { - blockAndFailOnWriteSnapFile = blocked; + public void setBlockAndFailOnWriteSnapFiles() { + blockAndFailOnWriteSnapFile = true; } public void setBlockAndFailOnWriteIndexFile() { diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java index 3bf8956fe33c..f8e20e67d074 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -447,7 +446,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase { .setPartial(false) .execute(); logger.info("--> wait for block to kick in"); - waitForBlockOnAnyDataNode(repositoryName, TimeValue.timeValueMinutes(1)); + waitForBlockOnAnyDataNode(repositoryName); // non-partial snapshots do not allow delete operations on data streams where snapshot has not been completed try { diff --git a/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java index 4021baa3cce9..211569b2933a 100644 --- a/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java +++ b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -88,9 +88,9 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase @After public void cleanUp() throws Exception { - awaitNoMoreRunningOperations(internalCluster().getMasterName()); + awaitNoMoreRunningOperations(); DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{SLM_HISTORY_DATA_STREAM}); - assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, req).actionGet()); + assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, req).get()); } @Override @@ -192,7 +192,7 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase // Check that the executed snapshot shows up in the SLM output as in_progress logger.info("--> Waiting for at least one data node to hit the block"); - waitForBlockOnAnyDataNode(REPO, TimeValue.timeValueSeconds(30L)); + waitForBlockOnAnyDataNode(REPO); assertBusy(() -> { logger.info("--> at least one data node has hit the block"); GetSnapshotLifecycleAction.Response getResp = @@ -286,14 +286,13 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase assertBusy(() -> assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()), 30, TimeUnit.SECONDS); - final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(REPO); + blockMasterFromFinalizingSnapshotOnIndexFile(REPO); logger.info("--> start snapshot"); ActionFuture snapshotFuture = client() .execute(ExecuteSnapshotLifecycleAction.INSTANCE, new ExecuteSnapshotLifecycleAction.Request(policyId)); - logger.info("--> waiting for block to kick in on " + masterNode); - waitForBlock(masterNode, REPO, TimeValue.timeValueSeconds(60)); + waitForBlock(internalCluster().getMasterName(), REPO); logger.info("--> stopping master node"); internalCluster().stopCurrentMasterNode(); @@ -349,7 +348,7 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase logger.info("--> verify that snapshot [{}] succeeded", successfulSnapshotName.get()); assertBusy(() -> { GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() - .prepareGetSnapshots(REPO).setSnapshots(successfulSnapshotName.get()).execute().actionGet(); + .prepareGetSnapshots(REPO).setSnapshots(successfulSnapshotName.get()).execute().get(); final SnapshotInfo snapshotInfo; try { snapshotInfo = snapshotsStatusResponse.getSnapshots(REPO).get(0); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index f7ed970a9815..52870b7713f4 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.core.ClientHelper; @@ -43,11 +44,8 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsSta import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.io.IOException; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -324,24 +322,20 @@ public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableS */ private Map blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException { final Map blobsPerShard = new HashMap<>(); - Files.walkFileTree(repositoryLocation.resolve("indices"), new SimpleFileVisitor<>() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - final String fileName = file.getFileName().toString(); - if (fileName.equals("snap-" + snapshotId + ".dat")) { - blobsPerShard.put( - String.join( - "/", - snapshotId, - file.getParent().getParent().getFileName().toString(), - file.getParent().getFileName().toString() - ), - INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file))) - ); - } - return FileVisitResult.CONTINUE; + forEachFileRecursively(repositoryLocation.resolve("indices"), ((file, basicFileAttributes) -> { + final String fileName = file.getFileName().toString(); + if (fileName.equals(BlobStoreRepository.SNAPSHOT_FORMAT.blobName(snapshotId))) { + blobsPerShard.put( + String.join( + "/", + snapshotId, + file.getParent().getParent().getFileName().toString(), + file.getParent().getFileName().toString() + ), + INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file))) + ); } - }); + })); return Map.copyOf(blobsPerShard); } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 595a22557069..65155e8a72ee 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -37,7 +37,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; @@ -466,9 +465,8 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT for (String node : internalCluster().getNodeNames()) { final IndicesService service = internalCluster().getInstance(IndicesService.class, node); if (service != null && service.hasIndex(restoredIndex)) { - final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node); assertThat( - repositoriesService.repository(repositoryName).getRestoreThrottleTimeInNanos(), + getRepositoryOnNode(repositoryName, node).getRestoreThrottleTimeInNanos(), useRateLimits ? greaterThan(0L) : equalTo(0L) ); }