From fc67f7cb41d88ae9d5b64de1885685aca6edb37f Mon Sep 17 00:00:00 2001 From: Brendan Cully Date: Thu, 14 Nov 2024 14:02:47 -0800 Subject: [PATCH] Attempt to clean up index before remote transfer (#115142) If a node crashes during recovery, it may leave temporary files behind that can consume disk space, which may be needed to complete recovery. So we attempt to clean up the index before transferring files from a recovery source. We attempt to load the latest snapshot of the target directory, which we supply to store's `cleanupAndVerify` method to remove any files not referenced by it. We treat a failure to load the latest snapshot as equivalent to an empty snapshot, which will cause `cleanupAndVerify` to purge the entire target directory and pull from scratch. Closes #104473 --- docs/changelog/115142.yaml | 6 +++ .../recovery/TruncatedRecoveryIT.java | 54 ++++++++++++++++--- .../recovery/PeerRecoveryTargetService.java | 30 +++++++++++ 3 files changed, 82 insertions(+), 8 deletions(-) create mode 100644 docs/changelog/115142.yaml diff --git a/docs/changelog/115142.yaml b/docs/changelog/115142.yaml new file mode 100644 index 000000000000..2af968ae156d --- /dev/null +++ b/docs/changelog/115142.yaml @@ -0,0 +1,6 @@ +pr: 115142 +summary: Attempt to clean up index before remote transfer +area: Recovery +type: enhancement +issues: + - 104473 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index 039a596f53b3..38eef4f72062 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -19,14 +19,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; +import org.elasticsearch.indices.recovery.RecoveryFilesInfoRequest; import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -34,6 +39,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -72,16 +78,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { // we use 2 nodes a lucky and unlucky one // the lucky one holds the primary // the unlucky one gets the replica and the truncated leftovers - NodeStats primariesNode = dataNodeStats.get(0); - NodeStats unluckyNode = dataNodeStats.get(1); + String primariesNode = dataNodeStats.get(0).getNode().getName(); + String unluckyNode = dataNodeStats.get(1).getNode().getName(); // create the index and prevent allocation on any other nodes than the lucky one // we have no replicas so far and make sure that we allocate the primary on the lucky node assertAcked( prepareCreate("test").setMapping("field1", "type=text", "the_id", "type=text") - .setSettings( - indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode.getNode().getName()) - ) + .setSettings(indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode)) ); // only allocate on the lucky node // index some docs and check if they are coming back @@ -102,20 +106,54 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { indicesAdmin().prepareFlush().setForce(true).get(); // double flush to create safe commit in case of async durability indicesAdmin().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get(); + // We write some garbage into the shard directory so that we can verify that it is cleaned up before we resend. + // Cleanup helps prevent recovery from failing due to lack of space from garbage left over from a previous + // recovery that crashed during file transmission. #104473 + // We can't look for the presence of the recovery temp files themselves because they are automatically + // cleaned up on clean shutdown by MultiFileWriter. + final String GARBAGE_PREFIX = "recovery.garbage."; + final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean truncate = new AtomicBoolean(true); + + IndicesService unluckyIndices = internalCluster().getInstance(IndicesService.class, unluckyNode); + Function getUnluckyIndexPath = (shardId) -> unluckyIndices.indexService(shardId.getIndex()) + .getShard(shardId.getId()) + .shardPath() + .resolveIndex(); + for (NodeStats dataNode : dataNodeStats) { MockTransportService.getInstance(dataNode.getNode().getName()) .addSendBehavior( - internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), + internalCluster().getInstance(TransportService.class, unluckyNode), (connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk()); + // During the first recovery attempt (when truncate is set), write an extra garbage file once for each + // file transmitted. We get multiple chunks per file but only one is the last. + if (truncate.get() && req.lastChunk()) { + final var shardPath = getUnluckyIndexPath.apply(req.shardId()); + final var garbagePath = Files.createTempFile(shardPath, GARBAGE_PREFIX, null); + logger.info("writing garbage at: {}", garbagePath); + } if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) { latch.countDown(); throw new RuntimeException("Caused some truncated files for fun and profit"); } + } else if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) { + // verify there are no garbage files present at the FILES_INFO stage of recovery. This precedes FILES_CHUNKS + // and so will run before garbage has been introduced on the first attempt, and before post-transfer cleanup + // has been performed on the second. + final var shardPath = getUnluckyIndexPath.apply(((RecoveryFilesInfoRequest) request).shardId()); + try (var list = Files.list(shardPath).filter(path -> path.getFileName().startsWith(GARBAGE_PREFIX))) { + final var garbageFiles = list.toArray(); + assertArrayEquals( + "garbage files should have been cleaned before file transmission", + new Path[0], + garbageFiles + ); + } } connection.sendRequest(requestId, action, request, options); } @@ -128,14 +166,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put( "index.routing.allocation.include._name", // now allow allocation on all nodes - primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName() + primariesNode + "," + unluckyNode ), "test" ); latch.await(); - // at this point we got some truncated left overs on the replica on the unlucky node + // at this point we got some truncated leftovers on the replica on the unlucky node // now we are allowing the recovery to allocate again and finish to see if we wipe the truncated files truncate.compareAndSet(true, false); ensureGreen("test"); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 308f1894b78d..c8d31d2060ca 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -397,6 +397,36 @@ public class PeerRecoveryTargetService implements IndexEventListener { } indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l)); }) + // peer recovery can consume a lot of disk space, so it's worth cleaning up locally ahead of the attempt + // operation runs only if the previous operation succeeded, and returns the previous operation's result. + // Failures at this stage aren't fatal, we can attempt to recover and then clean up again at the end. #104473 + .andThenApply(startingSeqNo -> { + Store.MetadataSnapshot snapshot; + try { + snapshot = indexShard.snapshotStoreMetadata(); + } catch (IOException e) { + // We give up on the contents for any checked exception thrown by snapshotStoreMetadata. We don't want to + // allow those to bubble up and interrupt recovery because the subsequent recovery attempt is expected + // to fix up these problems for us if it completes successfully. + if (e instanceof org.apache.lucene.index.IndexNotFoundException) { + // this is the expected case on first recovery, so don't spam the logs with exceptions + logger.debug(() -> format("no snapshot found for shard %s, treating as empty", indexShard.shardId())); + } else { + logger.warn(() -> format("unable to load snapshot for shard %s, treating as empty", indexShard.shardId()), e); + } + snapshot = Store.MetadataSnapshot.EMPTY; + } + + Store store = indexShard.store(); + store.incRef(); + try { + logger.debug(() -> format("cleaning up index directory for %s before recovery", indexShard.shardId())); + store.cleanupAndVerify("cleanup before peer recovery", snapshot); + } finally { + store.decRef(); + } + return startingSeqNo; + }) // now construct the start-recovery request .andThenApply(startingSeqNo -> { assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG