Attempt to clean up index before remote transfer (#115142)

If a node crashes during recovery, it may leave temporary files behind
that can consume disk space, which may be needed to complete recovery.
So we attempt to clean up the index before transferring files from
a recovery source. We attempt to load the latest snapshot of the target
directory, which we supply to store's `cleanupAndVerify` method to remove
any files not referenced by it. We treat a failure to load the latest snapshot
as equivalent to an empty snapshot, which will cause `cleanupAndVerify` to
purge the entire target directory and pull from scratch.

Closes #104473
This commit is contained in:
Brendan Cully 2024-11-14 14:02:47 -08:00 committed by GitHub
parent a40c444c72
commit fc67f7cb41
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 82 additions and 8 deletions

View file

@ -0,0 +1,6 @@
pr: 115142
summary: Attempt to clean up index before remote transfer
area: Recovery
type: enhancement
issues:
- 104473

View file

@ -19,14 +19,19 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoveryFilesInfoRequest;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -34,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -72,16 +78,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
// we use 2 nodes a lucky and unlucky one
// the lucky one holds the primary
// the unlucky one gets the replica and the truncated leftovers
NodeStats primariesNode = dataNodeStats.get(0);
NodeStats unluckyNode = dataNodeStats.get(1);
String primariesNode = dataNodeStats.get(0).getNode().getName();
String unluckyNode = dataNodeStats.get(1).getNode().getName();
// create the index and prevent allocation on any other nodes than the lucky one
// we have no replicas so far and make sure that we allocate the primary on the lucky node
assertAcked(
prepareCreate("test").setMapping("field1", "type=text", "the_id", "type=text")
.setSettings(
indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode.getNode().getName())
)
.setSettings(indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode))
); // only allocate on the lucky node
// index some docs and check if they are coming back
@ -102,20 +106,54 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
indicesAdmin().prepareFlush().setForce(true).get(); // double flush to create safe commit in case of async durability
indicesAdmin().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get();
// We write some garbage into the shard directory so that we can verify that it is cleaned up before we resend.
// Cleanup helps prevent recovery from failing due to lack of space from garbage left over from a previous
// recovery that crashed during file transmission. #104473
// We can't look for the presence of the recovery temp files themselves because they are automatically
// cleaned up on clean shutdown by MultiFileWriter.
final String GARBAGE_PREFIX = "recovery.garbage.";
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean truncate = new AtomicBoolean(true);
IndicesService unluckyIndices = internalCluster().getInstance(IndicesService.class, unluckyNode);
Function<ShardId, Path> getUnluckyIndexPath = (shardId) -> unluckyIndices.indexService(shardId.getIndex())
.getShard(shardId.getId())
.shardPath()
.resolveIndex();
for (NodeStats dataNode : dataNodeStats) {
MockTransportService.getInstance(dataNode.getNode().getName())
.addSendBehavior(
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
internalCluster().getInstance(TransportService.class, unluckyNode),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk());
// During the first recovery attempt (when truncate is set), write an extra garbage file once for each
// file transmitted. We get multiple chunks per file but only one is the last.
if (truncate.get() && req.lastChunk()) {
final var shardPath = getUnluckyIndexPath.apply(req.shardId());
final var garbagePath = Files.createTempFile(shardPath, GARBAGE_PREFIX, null);
logger.info("writing garbage at: {}", garbagePath);
}
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {
latch.countDown();
throw new RuntimeException("Caused some truncated files for fun and profit");
}
} else if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) {
// verify there are no garbage files present at the FILES_INFO stage of recovery. This precedes FILES_CHUNKS
// and so will run before garbage has been introduced on the first attempt, and before post-transfer cleanup
// has been performed on the second.
final var shardPath = getUnluckyIndexPath.apply(((RecoveryFilesInfoRequest) request).shardId());
try (var list = Files.list(shardPath).filter(path -> path.getFileName().startsWith(GARBAGE_PREFIX))) {
final var garbageFiles = list.toArray();
assertArrayEquals(
"garbage files should have been cleaned before file transmission",
new Path[0],
garbageFiles
);
}
}
connection.sendRequest(requestId, action, request, options);
}
@ -128,14 +166,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(
"index.routing.allocation.include._name", // now allow allocation on all nodes
primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName()
primariesNode + "," + unluckyNode
),
"test"
);
latch.await();
// at this point we got some truncated left overs on the replica on the unlucky node
// at this point we got some truncated leftovers on the replica on the unlucky node
// now we are allowing the recovery to allocate again and finish to see if we wipe the truncated files
truncate.compareAndSet(true, false);
ensureGreen("test");

View file

@ -397,6 +397,36 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l));
})
// peer recovery can consume a lot of disk space, so it's worth cleaning up locally ahead of the attempt
// operation runs only if the previous operation succeeded, and returns the previous operation's result.
// Failures at this stage aren't fatal, we can attempt to recover and then clean up again at the end. #104473
.andThenApply(startingSeqNo -> {
Store.MetadataSnapshot snapshot;
try {
snapshot = indexShard.snapshotStoreMetadata();
} catch (IOException e) {
// We give up on the contents for any checked exception thrown by snapshotStoreMetadata. We don't want to
// allow those to bubble up and interrupt recovery because the subsequent recovery attempt is expected
// to fix up these problems for us if it completes successfully.
if (e instanceof org.apache.lucene.index.IndexNotFoundException) {
// this is the expected case on first recovery, so don't spam the logs with exceptions
logger.debug(() -> format("no snapshot found for shard %s, treating as empty", indexShard.shardId()));
} else {
logger.warn(() -> format("unable to load snapshot for shard %s, treating as empty", indexShard.shardId()), e);
}
snapshot = Store.MetadataSnapshot.EMPTY;
}
Store store = indexShard.store();
store.incRef();
try {
logger.debug(() -> format("cleaning up index directory for %s before recovery", indexShard.shardId()));
store.cleanupAndVerify("cleanup before peer recovery", snapshot);
} finally {
store.decRef();
}
return startingSeqNo;
})
// now construct the start-recovery request
.andThenApply(startingSeqNo -> {
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG