diff --git a/docs/changelog/115142.yaml b/docs/changelog/115142.yaml new file mode 100644 index 000000000000..2af968ae156d --- /dev/null +++ b/docs/changelog/115142.yaml @@ -0,0 +1,6 @@ +pr: 115142 +summary: Attempt to clean up index before remote transfer +area: Recovery +type: enhancement +issues: + - 104473 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index 039a596f53b3..38eef4f72062 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -19,14 +19,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; +import org.elasticsearch.indices.recovery.RecoveryFilesInfoRequest; import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -34,6 +39,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -72,16 +78,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { // we use 2 nodes a lucky and unlucky one // the lucky one holds the primary // the unlucky one gets the replica and the truncated leftovers - NodeStats primariesNode = dataNodeStats.get(0); - NodeStats unluckyNode = dataNodeStats.get(1); + String primariesNode = dataNodeStats.get(0).getNode().getName(); + String unluckyNode = dataNodeStats.get(1).getNode().getName(); // create the index and prevent allocation on any other nodes than the lucky one // we have no replicas so far and make sure that we allocate the primary on the lucky node assertAcked( prepareCreate("test").setMapping("field1", "type=text", "the_id", "type=text") - .setSettings( - indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode.getNode().getName()) - ) + .setSettings(indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode)) ); // only allocate on the lucky node // index some docs and check if they are coming back @@ -102,20 +106,54 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { indicesAdmin().prepareFlush().setForce(true).get(); // double flush to create safe commit in case of async durability indicesAdmin().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get(); + // We write some garbage into the shard directory so that we can verify that it is cleaned up before we resend. + // Cleanup helps prevent recovery from failing due to lack of space from garbage left over from a previous + // recovery that crashed during file transmission. #104473 + // We can't look for the presence of the recovery temp files themselves because they are automatically + // cleaned up on clean shutdown by MultiFileWriter. + final String GARBAGE_PREFIX = "recovery.garbage."; + final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean truncate = new AtomicBoolean(true); + + IndicesService unluckyIndices = internalCluster().getInstance(IndicesService.class, unluckyNode); + Function getUnluckyIndexPath = (shardId) -> unluckyIndices.indexService(shardId.getIndex()) + .getShard(shardId.getId()) + .shardPath() + .resolveIndex(); + for (NodeStats dataNode : dataNodeStats) { MockTransportService.getInstance(dataNode.getNode().getName()) .addSendBehavior( - internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), + internalCluster().getInstance(TransportService.class, unluckyNode), (connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk()); + // During the first recovery attempt (when truncate is set), write an extra garbage file once for each + // file transmitted. We get multiple chunks per file but only one is the last. + if (truncate.get() && req.lastChunk()) { + final var shardPath = getUnluckyIndexPath.apply(req.shardId()); + final var garbagePath = Files.createTempFile(shardPath, GARBAGE_PREFIX, null); + logger.info("writing garbage at: {}", garbagePath); + } if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) { latch.countDown(); throw new RuntimeException("Caused some truncated files for fun and profit"); } + } else if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) { + // verify there are no garbage files present at the FILES_INFO stage of recovery. This precedes FILES_CHUNKS + // and so will run before garbage has been introduced on the first attempt, and before post-transfer cleanup + // has been performed on the second. + final var shardPath = getUnluckyIndexPath.apply(((RecoveryFilesInfoRequest) request).shardId()); + try (var list = Files.list(shardPath).filter(path -> path.getFileName().startsWith(GARBAGE_PREFIX))) { + final var garbageFiles = list.toArray(); + assertArrayEquals( + "garbage files should have been cleaned before file transmission", + new Path[0], + garbageFiles + ); + } } connection.sendRequest(requestId, action, request, options); } @@ -128,14 +166,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put( "index.routing.allocation.include._name", // now allow allocation on all nodes - primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName() + primariesNode + "," + unluckyNode ), "test" ); latch.await(); - // at this point we got some truncated left overs on the replica on the unlucky node + // at this point we got some truncated leftovers on the replica on the unlucky node // now we are allowing the recovery to allocate again and finish to see if we wipe the truncated files truncate.compareAndSet(true, false); ensureGreen("test"); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 308f1894b78d..c8d31d2060ca 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -397,6 +397,36 @@ public class PeerRecoveryTargetService implements IndexEventListener { } indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l)); }) + // peer recovery can consume a lot of disk space, so it's worth cleaning up locally ahead of the attempt + // operation runs only if the previous operation succeeded, and returns the previous operation's result. + // Failures at this stage aren't fatal, we can attempt to recover and then clean up again at the end. #104473 + .andThenApply(startingSeqNo -> { + Store.MetadataSnapshot snapshot; + try { + snapshot = indexShard.snapshotStoreMetadata(); + } catch (IOException e) { + // We give up on the contents for any checked exception thrown by snapshotStoreMetadata. We don't want to + // allow those to bubble up and interrupt recovery because the subsequent recovery attempt is expected + // to fix up these problems for us if it completes successfully. + if (e instanceof org.apache.lucene.index.IndexNotFoundException) { + // this is the expected case on first recovery, so don't spam the logs with exceptions + logger.debug(() -> format("no snapshot found for shard %s, treating as empty", indexShard.shardId())); + } else { + logger.warn(() -> format("unable to load snapshot for shard %s, treating as empty", indexShard.shardId()), e); + } + snapshot = Store.MetadataSnapshot.EMPTY; + } + + Store store = indexShard.store(); + store.incRef(); + try { + logger.debug(() -> format("cleaning up index directory for %s before recovery", indexShard.shardId())); + store.cleanupAndVerify("cleanup before peer recovery", snapshot); + } finally { + store.decRef(); + } + return startingSeqNo; + }) // now construct the start-recovery request .andThenApply(startingSeqNo -> { assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG