Simplify Snapshot ITs Further (#63655)

* Removing some more duplication and redundant logic.
* Aligning all timeouts to 30s (60s or even 10 minute timeouts should be unnecessary, if they aren't we should figure out why)
* Remove some usage of `actionGet()` in tests (it's just evil to suppress the stack-trace)
This commit is contained in:
Armin Braun 2020-10-14 18:05:10 +02:00 committed by GitHub
parent b44a03d837
commit a7a1c24456
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 201 additions and 301 deletions

View file

@ -75,13 +75,10 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystem;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.GroupPrincipal;
import java.nio.file.attribute.PosixFileAttributeView;
import java.nio.file.attribute.PosixFileAttributes;
@ -106,6 +103,7 @@ import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
@ -228,14 +226,10 @@ public class InstallPluginCommandTests extends ESTestCase {
static Path writeZip(Path structure, String prefix) throws IOException {
Path zip = createTempDir().resolve(structure.getFileName() + ".zip");
try (ZipOutputStream stream = new ZipOutputStream(Files.newOutputStream(zip))) {
Files.walkFileTree(structure, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString();
stream.putNextEntry(new ZipEntry(target));
Files.copy(file, stream);
return FileVisitResult.CONTINUE;
}
forEachFileRecursively(structure, (file, attrs) -> {
String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString();
stream.putNextEntry(new ZipEntry(target));
Files.copy(file, stream);
});
}
return zip;

View file

@ -171,7 +171,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
ActionFuture<CreateSnapshotResponse> future = client(masterNode).admin().cluster()
.prepareCreateSnapshot(repoName, snapshot).setWaitForCompletion(true).execute();
waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueSeconds(10L));
waitForBlockOnAnyDataNode(repoName);
NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);
@ -197,7 +197,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> snapshotFuture =
client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).execute();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(10L));
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
assertFutureThrows(snapshotFuture, SnapshotException.class);
@ -228,7 +228,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<CreateSnapshotResponse> snapshotResponse = internalCluster().masterClient().admin().cluster()
.prepareCreateSnapshot(repoName, "test-snap").setWaitForCompletion(true).execute();
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);

View file

@ -73,11 +73,8 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -92,6 +89,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@ -441,14 +439,9 @@ public class RelocationIT extends ESIntegTestCase {
if (Files.exists(shardLoc)) {
assertBusy(() -> {
try {
Files.walkFileTree(shardLoc, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
not(startsWith("recovery.")));
return FileVisitResult.CONTINUE;
}
});
forEachFileRecursively(shardLoc,
(file, attrs) -> assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
not(startsWith("recovery."))));
} catch (IOException e) {
throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
}

View file

@ -23,8 +23,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
@ -82,8 +80,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
.setWaitForCompletion(true).get();
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
logger.info("--> creating a garbage data blob");
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
@ -91,13 +88,13 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
garbageFuture.get();
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
logger.info("--> starting repository cleanup");
client().admin().cluster().prepareCleanupRepository(repoName).execute();
logger.info("--> waiting for block to kick in on " + masterNode);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName);
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress());
return masterNode;
@ -116,9 +113,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
}
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
logger.info("--> write two outdated index-N blobs");
for (int i = 0; i < 2; ++i) {
final PlainActionFuture<Void> createOldIndexNFuture = PlainActionFuture.newFuture();

View file

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
@ -79,8 +78,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final String sourceSnapshot = "source-snapshot";
final SnapshotInfo sourceSnapshotInfo = createFullSnapshot(repoName, sourceSnapshot);
final BlobStoreRepository repository =
(BlobStoreRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
final RepositoryData repositoryData = getRepositoryData(repoName);
final IndexId indexId = repositoryData.resolveIndexId(indexName);
final int shardId = 0;
@ -167,7 +165,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final String targetSnapshot = "target-snapshot";
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
assertFalse(cloneFuture.isDone());
ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class,
@ -201,7 +199,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final String targetSnapshot = "target-snapshot";
final ActionFuture<CreateSnapshotResponse> snapshot2Future =
startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
awaitNumberOfSnapshotsInProgress(2);
unblockNode(repoName, dataNode);
@ -224,7 +222,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final String targetSnapshot = "target-snapshot";
blockMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
final String indexFast = "index-fast";
createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
@ -255,7 +253,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin()
.prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute();
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
final String targetSnapshot = "target-snapshot";
assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get());
@ -282,7 +280,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final String targetSnapshot = "target-snapshot";
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
assertFalse(deleteFuture.isDone());
ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
@ -310,7 +308,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final String targetSnapshot1 = "target-snapshot";
blockMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
assertThat(cloneFuture1.isDone(), is(false));
final int extraClones = randomIntBetween(1, 5);
@ -366,7 +364,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
startCloneFromDataNode(repoName, sourceSnapshot, cloneName, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
internalCluster().restartNode(masterNode);
boolean cloneSucceeded = false;
try {
@ -377,7 +375,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
// snapshot on disconnect slowly enough for it to work out
}
awaitNoMoreRunningOperations(internalCluster().getMasterName());
awaitNoMoreRunningOperations();
// Check if the clone operation worked out by chance as a result of the clone request being retried because of the master failover
cloneSucceeded = cloneSucceeded ||
@ -418,10 +416,10 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
internalCluster().restartNode(masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations(internalCluster().getMasterName());
awaitNoMoreRunningOperations();
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2);
}
@ -443,10 +441,10 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations(internalCluster().getMasterName());
awaitNoMoreRunningOperations();
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}
@ -465,7 +463,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<CreateSnapshotResponse> sourceSnapshotFuture = masterClient.admin().cluster()
.prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute();
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
internalCluster().restartNode(dataNode);
assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
@ -490,7 +488,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
blockMasterOnWriteIndexFile(repoName);
final String cloneName = "clone-blocked";
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNumberOfSnapshotsInProgress(1);
blockNodeOnAnyFiles(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> otherSnapshot = startFullSnapshot(repoName, "other-snapshot");
@ -520,7 +518,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
blockMasterOnWriteIndexFile(repoName);
final String cloneName = "clone-blocked";
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNumberOfSnapshotsInProgress(1);
final String otherCloneName = "other-clone";
final ActionFuture<AcknowledgedResponse> otherClone = startClone(repoName, sourceSnapshot, otherCloneName, indexName);
@ -549,7 +547,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
blockMasterOnWriteIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> blockedSnapshot = startFullSnapshot(repoName, "snap-blocked");
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNumberOfSnapshotsInProgress(1);
final String cloneName = "clone";
final ActionFuture<AcknowledgedResponse> clone = startClone(repoName, sourceSnapshot, cloneName, indexName);
@ -589,13 +587,11 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
}
private void blockMasterOnReadIndexMeta(String repoName) {
((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
.setBlockOnReadIndexMeta();
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnReadIndexMeta();
}
private void blockMasterOnShardClone(String repoName) {
((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
.setBlockOnWriteShardLevelMeta();
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta();
}
/**

View file

@ -36,7 +36,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.discovery.AbstractDisruptionTestCase;
import org.elasticsearch.plugins.Plugin;
@ -283,7 +282,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, firstSnapshot);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "second-snapshot");
@ -416,7 +415,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String firstSnapshot = "snapshot-one";
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> firstSnapshotResponse = startFullSnapshotFromNonMasterClient(repoName, firstSnapshot);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
final String dataNode2 = internalCluster().startDataOnlyNode();
ensureStableCluster(5);
@ -437,7 +436,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockNodeOnAnyFiles(repoName, dataNode2);
final ActionFuture<CreateSnapshotResponse> snapshotThreeFuture = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three");
waitForBlock(dataNode2, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode2, repoName);
assertThat(firstSnapshotResponse.isDone(), is(false));
assertThat(secondSnapshotResponse.isDone(), is(false));
@ -508,7 +507,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> firstSnapshotResponse = startFullSnapshotFromMasterClient(repoName, "snapshot-one");
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
internalCluster().startDataOnlyNode();
ensureStableCluster(3);
@ -533,7 +532,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<AcknowledgedResponse> firstDeleteFuture = startDeleteSnapshot(repoName, "*");
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-queued");
awaitNumberOfSnapshotsInProgress(1);
@ -616,7 +615,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockNodeOnAnyFiles(repoName, masterNode);
ActionFuture<AcknowledgedResponse> firstDeleteFuture = client(masterNode).admin().cluster()
.prepareDeleteSnapshot(repoName, "*").execute();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
final ActionFuture<CreateSnapshotResponse> createThirdSnapshot = client(masterNode).admin().cluster()
.prepareCreateSnapshot(repoName, "snapshot-three").setWaitForCompletion(true).execute();
@ -654,7 +653,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> firstFailedSnapshotFuture =
startFullSnapshotFromMasterClient(repoName, "failing-snapshot-1");
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
final ActionFuture<CreateSnapshotResponse> secondFailedSnapshotFuture =
startFullSnapshotFromMasterClient(repoName, "failing-snapshot-2");
awaitNumberOfSnapshotsInProgress(2);
@ -720,7 +719,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String masterNode = internalCluster().getMasterName();
blockNodeOnAnyFiles(repoName, masterNode);
final ActionFuture<CreateSnapshotResponse> snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three");
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
corruptIndexN(repoPath, generation);
@ -748,7 +747,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String masterNode = internalCluster().getMasterName();
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three");
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
corruptIndexN(repoPath, generation);
@ -784,7 +783,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
createNSnapshots(blockedRepoName, randomIntBetween(1, 5));
blockNodeOnAnyFiles(blockedRepoName, masterNode);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteFromNonMasterClient(blockedRepoName, "*");
waitForBlock(masterNode, blockedRepoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, blockedRepoName);
awaitNDeletionsInProgress(1);
final ActionFuture<CreateSnapshotResponse> createBlockedSnapshot =
startFullSnapshotFromNonMasterClient(blockedRepoName, "queued-snapshot");
@ -793,7 +792,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final long generation = getRepositoryData(repoName).getGenId();
blockNodeOnAnyFiles(repoName, masterNode);
final ActionFuture<CreateSnapshotResponse> snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three");
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
awaitNumberOfSnapshotsInProgress(2);
corruptIndexN(repoPath, generation);
@ -893,9 +892,9 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
snapOneResponse.get();
snapTwoResponse.get();
logger.info("--> wait for snapshot to complete");
awaitNoMoreRunningOperations();
for (String snapshot : Arrays.asList(snapshotOne, snapshotTwo)) {
SnapshotInfo snapshotInfo = waitForCompletion(repoName, snapshot, TimeValue.timeValueSeconds(600));
SnapshotInfo snapshotInfo = getSnapshot(repoName, snapshot);
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.shardFailures().size(), equalTo(0));
}
@ -958,7 +957,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String snapshotName = "snap-name";
blockMasterFromDeletingIndexNFile(repoName);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, snapshotName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, snapshotName);
awaitNDeletionsInProgress(1);
unblockNode(repoName, masterName);
@ -980,7 +979,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
for (int i = 0; i < deletes; ++i) {
deleteResponses.add(client().admin().cluster().prepareDeleteSnapshot(repoName, "*").execute());
}
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNDeletionsInProgress(1);
for (ActionFuture<AcknowledgedResponse> deleteResponse : deleteResponses) {
assertFalse(deleteResponse.isDone());
@ -1005,7 +1004,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String masterName = internalCluster().getMasterName();
blockMasterFromDeletingIndexNFile(repoName);
final ActionFuture<CreateSnapshotResponse> snapshotThree = startFullSnapshotFromMasterClient(repoName, "snap-other");
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
final String snapshotOne = snapshotNames.get(0);
final ActionFuture<AcknowledgedResponse> deleteSnapshotOne = startDeleteSnapshot(repoName, snapshotOne);
@ -1114,8 +1113,8 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
awaitNumberOfSnapshotsInProgress(4);
final String initialMaster = internalCluster().getMasterName();
waitForBlock(initialMaster, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(initialMaster, otherRepoName, TimeValue.timeValueSeconds(30L));
waitForBlock(initialMaster, repoName);
waitForBlock(initialMaster, otherRepoName);
internalCluster().stopCurrentMasterNode();
ensureStableCluster(3, dataNode);
@ -1155,7 +1154,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
if (blockedDelete) {
awaitNDeletionsInProgress(1);
}
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
final String expectedFailureMessage = "Cannot start another operation, already running [" + limitToTest +
"] operations and the current limit for concurrent snapshot operations is set to [" + limitToTest + "]";
@ -1215,7 +1214,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final String snapshotName = "snap-1";
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, snapshotName);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, snapshotName);
awaitNDeletionsInProgress(1);
unblockNode(repoName, masterNode);
@ -1259,7 +1258,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockMasterOnWriteIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> blockedSnapshot = startFullSnapshot(repoName, "snap-blocked");
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNumberOfSnapshotsInProgress(1);
blockNodeOnAnyFiles(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> otherSnapshot = startFullSnapshot(repoName, "other-snapshot");
@ -1298,10 +1297,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
return snapshotNames;
}
private void awaitNoMoreRunningOperations() throws Exception {
awaitNoMoreRunningOperations(internalCluster().getMasterName());
}
private ActionFuture<AcknowledgedResponse> startDeleteFromNonMasterClient(String repoName, String snapshotName) {
logger.info("--> deleting snapshot [{}] from repo [{}] from non master client", snapshotName, repoName);
return internalCluster().nonMasterClient().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute();
@ -1357,19 +1352,19 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
}
private ActionFuture<AcknowledgedResponse> startAndBlockOnDeleteSnapshot(String repoName, String snapshotName)
throws InterruptedException {
throws Exception {
final String masterName = internalCluster().getMasterName();
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> fut = startDeleteSnapshot(repoName, snapshotName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
return fut;
}
private ActionFuture<CreateSnapshotResponse> startAndBlockFailingFullSnapshot(String blockedRepoName, String snapshotName)
throws InterruptedException {
throws Exception {
blockMasterFromFinalizingSnapshotOnIndexFile(blockedRepoName);
final ActionFuture<CreateSnapshotResponse> fut = startFullSnapshot(blockedRepoName, snapshotName);
waitForBlock(internalCluster().getMasterName(), blockedRepoName, TimeValue.timeValueSeconds(30L));
waitForBlock(internalCluster().getMasterName(), blockedRepoName);
return fut;
}
}

View file

@ -36,7 +36,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.IndexMetaDataGenerations;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
@ -148,7 +147,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final Repository repository = getRepositoryOnMaster(repoName);
logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repoName);
@ -262,7 +261,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> corrupt index-N blob");
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final Repository repository = getRepositoryOnMaster(repoName);
final RepositoryData repositoryData = getRepositoryData(repoName);
Files.write(repo.resolve("index-" + repositoryData.getGenId()), randomByteArrayOfLength(randomIntBetween(1, 100)));
@ -272,7 +271,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
final String otherRepoName = "other-repo";
createRepository(otherRepoName, "fs", Settings.builder()
.put("location", repo).put("compress", false));
final Repository otherRepo = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(otherRepoName);
final Repository otherRepo = getRepositoryOnMaster(otherRepoName);
logger.info("--> verify loading repository data from newly mounted repository throws RepositoryException");
expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo));

View file

@ -52,7 +52,6 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -88,11 +87,8 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -351,7 +347,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
.get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
waitForBlock(blockedNode, "test-repo");
logger.info("--> execution was blocked on node [{}], shutting it down", blockedNode);
unblockNode("test-repo", blockedNode);
@ -359,9 +355,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> stopping node [{}]", blockedNode);
stopNode(blockedNode);
logger.info("--> waiting for completion");
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60));
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
logger.info("--> done");
awaitNoMoreRunningOperations();
logger.info("Number of failed shards [{}]", getSnapshot("test-repo", "test-snap").shardFailures().size());
}
public void testSnapshotWithStuckNode() throws Exception {
@ -391,7 +386,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
.get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
waitForBlock(blockedNode, "test-repo");
logger.info("--> execution was blocked on node [{}], aborting snapshot", blockedNode);
@ -739,15 +734,13 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
final int numberOfShards = getNumShards("test-idx").numPrimaries;
logger.info("number of shards: {}", numberOfShards);
final String masterNode = blockMasterFromFinalizingSnapshotOnSnapFile("test-repo");
blockMasterFromFinalizingSnapshotOnSnapFile("test-repo");
final String dataNode = blockNodeWithIndex("test-repo", "test-idx");
dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false)
.setIndices("test-idx").get();
logger.info("--> stopping data node {}", dataNode);
stopNode(dataNode);
logger.info("--> stopping master node {} ", masterNode);
internalCluster().stopCurrentMasterNode();
logger.info("--> wait until the snapshot is done");
@ -1133,7 +1126,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
blockAllDataNodes(repoName);
final String snapshotName = "test-snap";
final ActionFuture<CreateSnapshotResponse> snapshotResponse = startFullSnapshot(repoName, snapshotName);
waitForBlock(dataNodeName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNodeName, repoName);
final AtomicBoolean blocked = new AtomicBoolean(true);
@ -1211,8 +1204,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertAcked(client().admin().indices().prepareDelete(indexName));
logger.info("--> wait for snapshot to complete");
SnapshotInfo snapshotInfo = waitForCompletion(repoName, "test-snap", TimeValue.timeValueSeconds(600));
awaitNoMoreRunningOperations();
SnapshotInfo snapshotInfo = getSnapshot(repoName, "test-snap");
assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL));
assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
logger.info("--> done");
@ -1230,32 +1223,22 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
private static List<Path> findRepoMetaBlobs(Path repoPath) throws IOException {
List<Path> files = new ArrayList<>();
Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
final String fileName = file.getFileName().toString();
if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) {
files.add(file);
}
return super.visitFile(file, attrs);
}
forEachFileRecursively(repoPath.resolve("indices"), ((file, basicFileAttributes) -> {
final String fileName = file.getFileName().toString();
if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) {
files.add(file);
}
);
}));
return files;
}
private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
List<Path> files = new ArrayList<>();
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.getFileName().toString().startsWith("__")){
files.add(file);
}
return super.visitFile(file, attrs);
}
forEachFileRecursively(repoPath.resolve("indices"), ((file, basicFileAttributes) -> {
if (file.getFileName().toString().startsWith("__")){
files.add(file);
}
);
}));
return files;
}
@ -1288,7 +1271,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
return fromXContent(SnapshottableMetadata::new, parser);
}
@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.API_AND_SNAPSHOT;

View file

@ -51,6 +51,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
@ -132,9 +133,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
}
if (!indicesToFlush.isEmpty()) {
String[] indices = indicesToFlush.toArray(new String[indicesToFlush.size()]);
logger.info("--> starting asynchronous flush for indices {}", Arrays.toString(indices));
flushResponseFuture = client().admin().indices().prepareFlush(indices).execute();
logger.info("--> starting asynchronous flush for indices {}", indicesToFlush);
flushResponseFuture = client().admin().indices().prepareFlush(indicesToFlush.toArray(Strings.EMPTY_ARRAY)).execute();
}
}
@ -792,18 +792,14 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
waitForBlock(blockedNode, "test-repo");
logger.info("--> execution was blocked on node [{}], moving shards away from this node", blockedNode);
Settings.Builder excludeSettings = Settings.builder().put("index.routing.allocation.exclude._name", blockedNode);
client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings).get();
logger.info("--> unblocking blocked node");
unblockNode("test-repo", blockedNode);
logger.info("--> waiting for completion");
logger.info("Number of failed shards [{}]",
waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size());
logger.info("--> done");
awaitNoMoreRunningOperations();
final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
@ -842,7 +838,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
waitForBlock(blockedNode, "test-repo");
logger.info("--> execution was blocked on node [{}], trying to delete repository", blockedNode);
@ -871,10 +867,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> unblocking blocked node");
unblockNode("test-repo", blockedNode);
logger.info("--> waiting for completion");
logger.info("Number of failed shards [{}]",
waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size());
logger.info("--> done");
awaitNoMoreRunningOperations();
final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
@ -1003,7 +996,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.setWaitForCompletion(false).setIncludeGlobalState(false).setIndices("test-idx").get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
waitForBlock(blockedNode, "test-repo");
logger.info("--> execution was blocked on node [{}], checking snapshot status with specified repository and snapshot",
blockedNode);
@ -1046,10 +1039,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> unblocking blocked node");
unblockNode("test-repo", blockedNode);
snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
awaitNoMoreRunningOperations();
snapshotInfo = getSnapshot("test-repo", "test-snap");
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
logger.info("--> done");
logger.info("--> checking snapshot status again after snapshot is done");
response = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap").execute().actionGet();
@ -1115,8 +1107,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
logger.info("--> wait for snapshot to complete");
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
awaitNoMoreRunningOperations();
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.shardFailures().size(), equalTo(0));
logger.info("--> done");
@ -1188,7 +1180,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
ActionFuture<CreateSnapshotResponse> future = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute();
logger.info("--> wait for block to kick in");
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
waitForBlockOnAnyDataNode("test-repo");
try {
// non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed
@ -1246,7 +1238,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.execute();
logger.info("--> waiting for block to kick in");
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
waitForBlockOnAnyDataNode("test-repo");
logger.info("--> close index while restore is running");
try {
@ -1302,7 +1294,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.execute();
logger.info("--> waiting for block to kick in");
waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueMinutes(1));
waitForBlockOnAnyDataNode(repoName);
logger.info("--> try deleting the snapshot while the restore is in progress (should throw an error)");
ConcurrentSnapshotExecutionException e = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
@ -1601,7 +1593,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
waitForBlock(blockedNode, repo);
logger.info("--> removing primary shard that is being snapshotted");
ClusterState clusterState = internalCluster().clusterService(internalCluster().getMasterName()).state();
@ -1616,7 +1608,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
unblockNode(repo, blockedNode);
logger.info("--> ensuring snapshot is aborted and the aborted shard was marked as failed");
SnapshotInfo snapshotInfo = waitForCompletion(repo, snapshot, TimeValue.timeValueSeconds(60));
awaitNoMoreRunningOperations();
SnapshotInfo snapshotInfo = getSnapshot(repo, snapshot);
assertEquals(1, snapshotInfo.shardFailures().size());
assertEquals(0, snapshotInfo.shardFailures().get(0).shardId());
assertThat(snapshotInfo.shardFailures().get(0).reason(), is("aborted"));
@ -1749,7 +1742,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
}
public void testSnapshottingWithMissingSequenceNumbers() {
public void testSnapshottingWithMissingSequenceNumbers() throws Exception {
final String repositoryName = "test-repo";
final String snapshotName = "test-snap";
final String indexName = "test-idx";
@ -1783,7 +1776,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
.setWaitForCompletion(true).execute().get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().get();
@ -1899,7 +1892,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(getSnapshotsResponse.getSnapshots("test-repo"), empty());
}
public void testHiddenIndicesIncludedInSnapshot() throws InterruptedException {
public void testHiddenIndicesIncludedInSnapshot() throws Exception {
Client client = client();
final String normalIndex = "normal-index";
final String hiddenIndex = "hidden-index";
@ -1943,7 +1936,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.prepareRestoreSnapshot(repoName, snapName)
.setWaitForCompletion(true)
.setIndices("*")
.execute().actionGet();
.execute().get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(),
equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()));
@ -1961,7 +1954,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.prepareRestoreSnapshot(repoName, snapName)
.setWaitForCompletion(true)
.setIndices("*", "-.*")
.execute().actionGet();
.execute().get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(),
equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()));
@ -1979,7 +1972,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.prepareRestoreSnapshot(repoName, snapName)
.setWaitForCompletion(true)
.setIndices("hid*")
.execute().actionGet();
.execute().get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(),
equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()));
@ -2013,8 +2006,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "mock", repoPath);
final MockRepository repository =
(MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final MockRepository repository = getRepositoryOnMaster(repoName);
repository.setFailOnIndexLatest(true);
createFullSnapshot(repoName, "snapshot-1");
repository.setFailOnIndexLatest(false);

View file

@ -19,7 +19,6 @@
package org.elasticsearch.snapshots;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.mockstore.MockRepository;
@ -63,7 +62,7 @@ public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {
.setWaitForCompletion(false)
.setIndices("test-index")
.get();
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
waitForBlock(blockedNode, "test-repo");
final SnapshotId snapshotId = getSnapshot("test-repo", "test-snap").snapshotId();

View file

@ -34,7 +34,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
@ -67,7 +66,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
.build();
}
public void testStatusApiConsistency() {
public void testStatusApiConsistency() throws Exception {
createRepository("test-repo", "fs");
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
@ -90,7 +89,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));
final List<SnapshotStatus> snapshotStatus = clusterAdmin().snapshotsStatus(
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).get().getSnapshots();
assertThat(snapshotStatus.size(), equalTo(1));
final SnapshotStatus snStatus = snapshotStatus.get(0);
assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
@ -113,7 +112,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture = startFullSnapshot("test-repo", "test-snap");
logger.info("--> wait for data nodes to get blocked");
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
waitForBlockOnAnyDataNode("test-repo");
awaitNumberOfSnapshotsInProgress(1);
assertEquals(SnapshotsInProgress.State.STARTED, client().admin().cluster().prepareSnapshotStatus("test-repo")
.setSnapshots("test-snap").get().getSnapshots().get(0).getState());
@ -121,8 +120,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo");
logger.info("--> wait for snapshot to finish");
createSnapshotResponseActionFuture.actionGet();
assertSuccessful(createSnapshotResponseActionFuture);
}
public void testExceptionOnMissingSnapBlob() throws IOException {
@ -247,7 +245,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<CreateSnapshotResponse> responseSnapshotTwo =
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotTwo).setWaitForCompletion(true).execute();
waitForBlock(dataNodeTwo, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNodeTwo, repoName);
assertBusy(() -> {
final SnapshotStatus snapshotStatusOne = getSnapshotStatus(repoName, snapshotOne);
@ -419,7 +417,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
.setWaitForCompletion(false)
.setIndices(indexName)
.get();
waitForBlock(initialBlockedNode, repositoryName, TimeValue.timeValueSeconds(60)); // wait for block to kick in
waitForBlock(initialBlockedNode, repositoryName); // wait for block to kick in
getSnapshotsResponse = client.admin().cluster()
.prepareGetSnapshots("test-repo")
.setSnapshots(randomFrom("_all", "_current", "snap-on-*", "*-on-empty-repo", "snap-on-empty-repo"))
@ -458,7 +456,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
.setWaitForCompletion(false)
.setIndices(indexName)
.get();
waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60)); // wait for block to kick in
waitForBlock(blockedNode, repositoryName); // wait for block to kick in
logger.info("--> get all snapshots with a current in-progress");
// with ignore unavailable set to true, should not throw an exception
@ -513,7 +511,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
.collect(Collectors.toList()), equalTo(sortedNames));
unblockNode(repositoryName, blockedNode); // unblock node
waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60));
awaitNoMoreRunningOperations();
}
private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {

View file

@ -21,7 +21,6 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.PlainActionFuture;
@ -36,6 +35,7 @@ import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
@ -150,7 +150,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
protected RepositoryData getRepositoryData(String repository) {
return getRepositoryData(internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repository));
return getRepositoryData((Repository) getRepositoryOnMaster(repository));
}
protected RepositoryData getRepositoryData(Repository repository) {
@ -169,103 +169,56 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
public static void assertFileCount(Path dir, int expectedCount) throws IOException {
final List<Path> found = new ArrayList<>();
Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
found.add(file);
return FileVisitResult.CONTINUE;
}
});
forEachFileRecursively(dir, ((path, basicFileAttributes) -> found.add(path)));
assertEquals("Unexpected file count, found: [" + found + "].", expectedCount, found.size());
}
public static int numberOfFiles(Path dir) throws IOException {
final AtomicInteger count = new AtomicInteger();
Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
count.incrementAndGet();
return FileVisitResult.CONTINUE;
}
});
forEachFileRecursively(dir, ((path, basicFileAttributes) -> count.incrementAndGet()));
return count.get();
}
public static void stopNode(final String node) throws IOException {
protected void stopNode(final String node) throws IOException {
logger.info("--> stopping node {}", node);
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(node));
}
public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
public void waitForBlock(String node, String repository) throws Exception {
logger.info("--> waiting for [{}] to be blocked on node [{}]", repository, node);
long start = System.currentTimeMillis();
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
while (System.currentTimeMillis() - start < timeout.millis()) {
if (mockRepository.blocked()) {
return;
}
Thread.sleep(100);
}
fail("Timeout waiting for node [" + node + "] to be blocked");
MockRepository mockRepository = getRepositoryOnNode(repository, node);
assertBusy(() -> assertTrue(mockRepository.blocked()), 30L, TimeUnit.SECONDS);
}
public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout.millis()) {
final SnapshotInfo snapshotInfo = getSnapshot(repository, snapshotName);
if (snapshotInfo.state().completed()) {
// Make sure that snapshot clean up operations are finished
ClusterStateResponse stateResponse = clusterAdmin().prepareState().get();
boolean found = false;
for (SnapshotsInProgress.Entry entry :
stateResponse.getState().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
final Snapshot curr = entry.snapshot();
if (curr.getRepository().equals(repository) && curr.getSnapshotId().getName().equals(snapshotName)) {
found = true;
break;
}
}
if (found == false) {
return snapshotInfo;
}
}
Thread.sleep(100);
}
fail("Timeout!!!");
return null;
public static void blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockAndFailOnWriteIndexFile();
}
public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).setBlockAndFailOnWriteIndexFile();
return masterName;
}
public static String blockMasterOnWriteIndexFile(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getMasterNodeInstance(RepositoriesService.class)
.repository(repositoryName)).setBlockOnWriteIndexFile();
return masterName;
public static void blockMasterOnWriteIndexFile(final String repositoryName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockOnWriteIndexFile();
}
public static void blockMasterFromDeletingIndexNFile(String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).setBlockOnDeleteIndexFile();
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockOnDeleteIndexFile();
}
public static String blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).setBlockAndFailOnWriteSnapFiles(true);
return masterName;
public static void blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockAndFailOnWriteSnapFiles();
}
@SuppressWarnings("unchecked")
protected static <T extends Repository> T getRepositoryOnMaster(String repositoryName) {
return ((T) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repositoryName));
}
@SuppressWarnings("unchecked")
protected static <T extends Repository> T getRepositoryOnNode(String repositoryName, String nodeName) {
return ((T) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repositoryName));
}
public static String blockNodeWithIndex(final String repositoryName, final String indexName) {
for(String node : internalCluster().nodesInclude(indexName)) {
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName))
.blockOnDataFiles(true);
for (String node : internalCluster().nodesInclude(indexName)) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repositoryName, node).blockOnDataFiles();
return node;
}
fail("No nodes for the index " + indexName + " found");
@ -273,24 +226,22 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
public static void blockNodeOnAnyFiles(String repository, String nodeName) {
((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName)
.repository(repository)).setBlockOnAnyFiles(true);
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, nodeName).setBlockOnAnyFiles();
}
public static void blockDataNode(String repository, String nodeName) {
((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName)
.repository(repository)).blockOnDataFiles(true);
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, nodeName).blockOnDataFiles();
}
public static void blockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository) repositoriesService.repository(repository)).blockOnDataFiles();
}
}
public static void unblockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).unblock();
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository) repositoriesService.repository(repository)).unblock();
}
}
@ -301,7 +252,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
}
public static void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException {
public static void waitForBlockOnAnyDataNode(String repository) throws InterruptedException {
final boolean blocked = waitUntil(() -> {
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
@ -310,14 +261,14 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
}
return false;
}, timeout.millis(), TimeUnit.MILLISECONDS);
}, 30L, TimeUnit.SECONDS);
assertTrue("No repository is blocked waiting on a data node", blocked);
}
public void unblockNode(final String repository, final String node) {
logger.info("--> unblocking [{}] on node [{}]", repository, node);
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repository)).unblock();
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, node).unblock();
}
protected void createRepository(String repoName, String type, Settings.Builder settings) {
@ -454,7 +405,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
assertNotNull(initialRepoMetadata);
assertThat("We can only manually insert a snapshot into a repository that does not have a generation tracked in the CS",
initialRepoMetadata.generation(), is(RepositoryData.UNKNOWN_REPO_GEN));
final Repository repo = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final Repository repo = getRepositoryOnMaster(repoName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID(random()));
logger.info("--> adding old version FAILED snapshot [{}] to repository [{}]", snapshotId, repoName);
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
@ -467,6 +418,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
SnapshotsService.OLD_SNAPSHOT_FORMAT, Function.identity(), f));
}
protected void awaitNoMoreRunningOperations() throws Exception {
awaitNoMoreRunningOperations(internalCluster().getMasterName());
}
protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
logger.info("--> verify no more operations in the cluster state");
awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() &&
@ -504,10 +459,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
protected ActionFuture<CreateSnapshotResponse> startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName,
String dataNode) throws InterruptedException {
String dataNode) throws Exception {
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> fut = startFullSnapshot(repoName, snapshotName);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
return fut;
}
@ -527,7 +482,8 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count);
}
protected static SnapshotInfo assertSuccessful(ActionFuture<CreateSnapshotResponse> future) throws Exception {
protected SnapshotInfo assertSuccessful(ActionFuture<CreateSnapshotResponse> future) throws Exception {
logger.info("--> wait for snapshot to finish");
final SnapshotInfo snapshotInfo = future.get().getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
return snapshotInfo;
@ -582,7 +538,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
protected void awaitMasterFinishRepoOperations() throws Exception {
logger.info("--> waiting for master to finish all repo operations on its SNAPSHOT pool");
final ThreadPool masterThreadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
final ThreadPool masterThreadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
assertBusy(() -> {
for (ThreadPoolStats.Stats stat : masterThreadPool.stats()) {
if (ThreadPool.Names.SNAPSHOT.equals(stat.getName())) {
@ -592,4 +548,15 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
});
}
public static void forEachFileRecursively(Path path,
CheckedBiConsumer<Path, BasicFileAttributes, IOException> forEach) throws IOException {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
forEach.accept(file, attrs);
return FileVisitResult.CONTINUE;
}
});
}
}

View file

@ -208,16 +208,16 @@ public class MockRepository extends FsRepository {
this.notifyAll();
}
public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
public void blockOnDataFiles() {
blockOnDataFiles = true;
}
public void setBlockOnAnyFiles(boolean blocked) {
blockOnAnyFiles = blocked;
public void setBlockOnAnyFiles() {
blockOnAnyFiles = true;
}
public void setBlockAndFailOnWriteSnapFiles(boolean blocked) {
blockAndFailOnWriteSnapFile = blocked;
public void setBlockAndFailOnWriteSnapFiles() {
blockAndFailOnWriteSnapFile = true;
}
public void setBlockAndFailOnWriteIndexFile() {

View file

@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
@ -447,7 +446,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
.setPartial(false)
.execute();
logger.info("--> wait for block to kick in");
waitForBlockOnAnyDataNode(repositoryName, TimeValue.timeValueMinutes(1));
waitForBlockOnAnyDataNode(repositoryName);
// non-partial snapshots do not allow delete operations on data streams where snapshot has not been completed
try {

View file

@ -88,9 +88,9 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase
@After
public void cleanUp() throws Exception {
awaitNoMoreRunningOperations(internalCluster().getMasterName());
awaitNoMoreRunningOperations();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(new String[]{SLM_HISTORY_DATA_STREAM});
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, req).actionGet());
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, req).get());
}
@Override
@ -192,7 +192,7 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase
// Check that the executed snapshot shows up in the SLM output as in_progress
logger.info("--> Waiting for at least one data node to hit the block");
waitForBlockOnAnyDataNode(REPO, TimeValue.timeValueSeconds(30L));
waitForBlockOnAnyDataNode(REPO);
assertBusy(() -> {
logger.info("--> at least one data node has hit the block");
GetSnapshotLifecycleAction.Response getResp =
@ -286,14 +286,13 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase
assertBusy(() -> assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
30, TimeUnit.SECONDS);
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(REPO);
blockMasterFromFinalizingSnapshotOnIndexFile(REPO);
logger.info("--> start snapshot");
ActionFuture<ExecuteSnapshotLifecycleAction.Response> snapshotFuture = client()
.execute(ExecuteSnapshotLifecycleAction.INSTANCE, new ExecuteSnapshotLifecycleAction.Request(policyId));
logger.info("--> waiting for block to kick in on " + masterNode);
waitForBlock(masterNode, REPO, TimeValue.timeValueSeconds(60));
waitForBlock(internalCluster().getMasterName(), REPO);
logger.info("--> stopping master node");
internalCluster().stopCurrentMasterNode();
@ -349,7 +348,7 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase
logger.info("--> verify that snapshot [{}] succeeded", successfulSnapshotName.get());
assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
.prepareGetSnapshots(REPO).setSnapshots(successfulSnapshotName.get()).execute().actionGet();
.prepareGetSnapshots(REPO).setSnapshots(successfulSnapshotName.get()).execute().get();
final SnapshotInfo snapshotInfo;
try {
snapshotInfo = snapshotsStatusResponse.getSnapshots(REPO).get(0);

View file

@ -31,6 +31,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.ClientHelper;
@ -43,11 +44,8 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsSta
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -324,24 +322,20 @@ public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableS
*/
private Map<String, BlobStoreIndexShardSnapshot> blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException {
final Map<String, BlobStoreIndexShardSnapshot> blobsPerShard = new HashMap<>();
Files.walkFileTree(repositoryLocation.resolve("indices"), new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
final String fileName = file.getFileName().toString();
if (fileName.equals("snap-" + snapshotId + ".dat")) {
blobsPerShard.put(
String.join(
"/",
snapshotId,
file.getParent().getParent().getFileName().toString(),
file.getParent().getFileName().toString()
),
INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file)))
);
}
return FileVisitResult.CONTINUE;
forEachFileRecursively(repositoryLocation.resolve("indices"), ((file, basicFileAttributes) -> {
final String fileName = file.getFileName().toString();
if (fileName.equals(BlobStoreRepository.SNAPSHOT_FORMAT.blobName(snapshotId))) {
blobsPerShard.put(
String.join(
"/",
snapshotId,
file.getParent().getParent().getFileName().toString(),
file.getParent().getFileName().toString()
),
INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file)))
);
}
});
}));
return Map.copyOf(blobsPerShard);
}

View file

@ -37,7 +37,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
@ -466,9 +465,8 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT
for (String node : internalCluster().getNodeNames()) {
final IndicesService service = internalCluster().getInstance(IndicesService.class, node);
if (service != null && service.hasIndex(restoredIndex)) {
final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
assertThat(
repositoriesService.repository(repositoryName).getRestoreThrottleTimeInNanos(),
getRepositoryOnNode(repositoryName, node).getRestoreThrottleTimeInNanos(),
useRateLimits ? greaterThan(0L) : equalTo(0L)
);
}