diff --git a/docs/changelog/119542.yaml b/docs/changelog/119542.yaml new file mode 100644 index 000000000000..aaf26c7dc4b0 --- /dev/null +++ b/docs/changelog/119542.yaml @@ -0,0 +1,5 @@ +pr: 119542 +summary: Wait while index is blocked +area: Transform +type: enhancement +issues: [] diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index ab478dc16f22..84519166eddb 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformStats; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; @@ -152,7 +153,7 @@ public class TransformIT extends TransformRestTestCase { public void testContinuousTransformCrud() throws Exception { var transformId = "transform-continuous-crud"; var indexName = "continuous-crud-reviews"; - createContinuousTransform(indexName, transformId); + createContinuousTransform(indexName, transformId, "reviews-by-user-business-day"); var transformStats = getBasicTransformStats(transformId); assertThat(transformStats.get("state"), equalTo("started")); @@ -181,7 +182,7 @@ public class TransformIT extends TransformRestTestCase { deleteTransform(transformId); } - private void createContinuousTransform(String indexName, String transformId) throws Exception { + private void createContinuousTransform(String indexName, String transformId, String destinationIndex) throws Exception { createReviewsIndex(indexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); var groups = Map.of( @@ -197,8 +198,9 @@ public class TransformIT extends TransformRestTestCase { .addAggregator(AggregationBuilders.avg("review_score").field("stars")) .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); - var config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day", QueryConfig.matchAll(), indexName) - .setPivotConfig(createPivotConfig(groups, aggs)) + var config = createTransformConfigBuilder(transformId, destinationIndex, QueryConfig.matchAll(), indexName).setPivotConfig( + createPivotConfig(groups, aggs) + ) .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) .setSettings(new SettingsConfig.Builder().setAlignCheckpoints(false).build()) .build(); @@ -216,7 +218,7 @@ public class TransformIT extends TransformRestTestCase { @SuppressWarnings("unchecked") public void testBasicContinuousTransformStats() throws Exception { var transformId = "transform-continuous-basic-stats"; - createContinuousTransform("continuous-basic-stats-reviews", transformId); + createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day"); var transformStats = getBasicTransformStats(transformId); assertEquals("started", XContentMapValues.extractValue("state", transformStats)); @@ -230,6 +232,40 @@ public class TransformIT extends TransformRestTestCase { deleteTransform(transformId); } + public void testDestinationIndexBlocked() throws Exception { + var transformId = "transform-continuous-blocked-destination"; + var sourceIndexName = "source-reviews"; + var destIndexName = "destination-reviews"; + + // create transform & indices, wait until 1st checkpoint is finished + createContinuousTransform(sourceIndexName, transformId, destIndexName); + + // block destination index + Request request = new Request("PUT", destIndexName + "/_block/write"); + assertAcknowledged(adminClient().performRequest(request)); + + // index more docs so the checkpoint tries to run, wait until transform stops + assertBusy(() -> { + indexDoc(42, sourceIndexName); + assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); + }, 30, TimeUnit.SECONDS); + + // unblock index + request = new Request("PUT", destIndexName + "/_settings"); + request.setJsonEntity(""" + { "blocks.write": false } + """); + assertAcknowledged(adminClient().performRequest(request)); + + assertBusy(() -> { + indexDoc(42, sourceIndexName); + assertEquals(TransformStats.State.STARTED.value(), getTransformState(transformId)); + }, 30, TimeUnit.SECONDS); + + stopTransform(transformId); + deleteTransform(transformId); + } + public void testTransformLifecycleInALoop() throws Exception { String transformId = "lifecycle-in-a-loop"; String indexName = transformId + "-src"; @@ -652,4 +688,17 @@ public class TransformIT extends TransformRestTestCase { bulkBuilder.append("\r\n"); doBulk(bulkBuilder.toString(), true); } + + private void indexDoc(long userId, String index) throws Exception { + StringBuilder bulkBuilder = new StringBuilder(); + bulkBuilder.append(format(""" + {"create":{"_index":"%s"}} + """, index)); + String source = format(""" + {"user_id":"user_%s","count":%s,"business_id":"business_%s","stars":%s,"timestamp":%s} + """, userId, 1, 2, 5, Instant.now().toEpochMilli()); + bulkBuilder.append(source); + bulkBuilder.append("\r\n"); + doBulk(bulkBuilder.toString(), true); + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index ae0eb000e70e..7de774224541 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -270,6 +270,9 @@ public class TransportGetTransformStatsAction extends TransportTasksAction listener ) { + if (transformTask.getContext().isWaitingForIndexToUnblock()) { + logger.debug("[{}] Destination index is blocked. User requested a retry.", transformTask.getTransformId()); + transformTask.getContext().setIsWaitingForIndexToUnblock(false); + } transformScheduler.scheduleNow(request.getId()); listener.onResponse(Response.TRUE); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 4a0c4c2921ba..c7fd6df4467d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -25,8 +25,10 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; @@ -443,9 +445,36 @@ class ClientTransformIndexer extends TransformIndexer { logger.debug("[{}] schedule was triggered but the Transform is upgrading. Ignoring trigger.", getJobId()); return false; } + if (context.isWaitingForIndexToUnblock()) { + if (destinationIndexHasWriteBlock()) { + logger.debug("[{}] schedule was triggered but the destination index has a write block. Ignoring trigger.", getJobId()); + return false; + } + logger.debug("[{}] destination index is no longer blocked.", getJobId()); + context.setIsWaitingForIndexToUnblock(false); + } + return super.maybeTriggerAsyncJob(now); } + private boolean destinationIndexHasWriteBlock() { + var clusterState = clusterService.state(); + if (clusterState == null) { + // if we can't determine if the index is blocked, we assume it isn't, even though the bulk request may fail again + return false; + } + + var destinationIndexName = transformConfig.getDestination().getIndex(); + var destinationIndex = indexNameExpressionResolver.concreteWriteIndex( + clusterState, + IndicesOptions.lenientExpandOpen(), + destinationIndexName, + true, + false + ); + return destinationIndex != null && clusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, destinationIndex.getName()); + } + @Override protected void onStop() { closePointInTime(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 53c3ff4091a9..73872fac097c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -49,6 +49,14 @@ public class TransformContext { private volatile AuthorizationState authState; private volatile int pageSize = 0; + /** + * If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it. + * {@link TransformFailureHandler} will silence the error so the Transform automatically retries. + * Every time the Transform runs, it will check if the index is unblocked and reset this to false. + * Users can override this via the `_schedule_now` API. + */ + private volatile boolean isWaitingForIndexToUnblock = false; + // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ // Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished private final AtomicLong currentCheckpoint; @@ -183,6 +191,14 @@ public class TransformContext { this.shouldRecreateDestinationIndex = shouldRecreateDestinationIndex; } + public boolean isWaitingForIndexToUnblock() { + return isWaitingForIndexToUnblock; + } + + public void setIsWaitingForIndexToUnblock(boolean isWaitingForIndexToUnblock) { + this.isWaitingForIndexToUnblock = isWaitingForIndexToUnblock; + } + public AuthorizationState getAuthState() { return authState; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java index 24586e5f3633..eeeabff9efd7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java @@ -170,6 +170,7 @@ class TransformFailureHandler { */ private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) { if (bulkIndexingException.getCause() instanceof ClusterBlockException) { + context.setIsWaitingForIndexToUnblock(true); retryWithoutIncrementingFailureCount( bulkIndexingException, bulkIndexingException.getDetailedMessage(), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java index a3f56efa9e13..52ea738b0331 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java @@ -343,6 +343,46 @@ public class TransportGetTransformStatsActionTests extends ESTestCase { ); } + public void testDeriveStatsWithIndexBlock() { + String transformId = "transform-with-stats"; + String reason = "transform is paused while destination index is blocked"; + TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); + TransformState runningState = new TransformState( + TransformTaskState.STARTED, + IndexerState.STARTED, + null, + 0, + null, + null, + null, + false, + null + ); + + var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock()); + context.setIsWaitingForIndexToUnblock(true); + var task = mock(TransformTask.class); + when(task.getContext()).thenReturn(context); + when(task.getTransformId()).thenReturn(transformId); + when(task.getState()).thenReturn(runningState); + when(task.getStats()).thenReturn(stats); + + assertThat( + TransportGetTransformStatsAction.deriveStats(task, null), + equalTo( + new TransformStats( + transformId, + TransformStats.State.WAITING, + reason, + null, + stats, + TransformCheckpointingInfo.EMPTY, + TransformHealth.GREEN + ) + ) + ); + } + private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) { when(task.getTransformId()).thenReturn(transformId); when(task.getState()).thenReturn(state); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index c8677c2816fc..1fbe5c53cacc 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -22,13 +22,18 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchHit; @@ -44,6 +49,7 @@ import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -53,6 +59,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformEffectiveSetti import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformNode; @@ -77,6 +84,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.IntStream; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -467,6 +478,53 @@ public class ClientTransformIndexerTests extends ESTestCase { } } + public void testIndexBlocked() { + var service = serviceWithBlockCheck(true); + var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock()); + + var indexer = createTestIndexer(mock(), service, resolver(), context); + context.setIsWaitingForIndexToUnblock(true); + + assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli())); + assertTrue(context.isWaitingForIndexToUnblock()); + } + + public void testIndexUnblocked() { + var service = serviceWithBlockCheck(false); + // set state to failed so that TransformIndexer returns false + var context = new TransformContext(TransformTaskState.FAILED, "", 0, mock()); + + var indexer = createTestIndexer(mock(), service, resolver(), context); + context.setIsWaitingForIndexToUnblock(true); + + assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli())); + // ClientTransformIndexer's maybeTriggerAsyncJob should reset isWaitingForIndexToUnblock to false + assertFalse(context.isWaitingForIndexToUnblock()); + } + + private ClusterService serviceWithBlockCheck(boolean checkResponse) { + var clusterBlocks = mock(ClusterBlocks.class); + when(clusterBlocks.indexBlocked(eq(ClusterBlockLevel.WRITE), anyString())).thenReturn(checkResponse); + var metadata = mock(Metadata.class); + when(metadata.custom(eq(TransformMetadata.TYPE))).thenReturn(TransformMetadata.EMPTY_METADATA); + var clusterState = mock(ClusterState.class); + when(clusterState.blocks()).thenReturn(clusterBlocks); + when(clusterState.getMetadata()).thenReturn(metadata); + var clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(clusterState); + return clusterService; + } + + private IndexNameExpressionResolver resolver() { + var resolver = mock(IndexNameExpressionResolver.class); + when(resolver.concreteWriteIndex(any(), any(), any(), anyBoolean(), anyBoolean())).thenAnswer(ans -> { + Index destIndex = mock(); + when(destIndex.getName()).thenReturn(ans.getArgument(2)); + return destIndex; + }); + return resolver; + } + private static class MockClientTransformIndexer extends ClientTransformIndexer { MockClientTransformIndexer( @@ -627,13 +685,22 @@ public class ClientTransformIndexerTests extends ESTestCase { } private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient client) { + return createTestIndexer(client, mock(), mock(), mock(TransformContext.class)); + } + + private ClientTransformIndexer createTestIndexer( + ParentTaskAssigningClient client, + ClusterService service, + IndexNameExpressionResolver resolver, + TransformContext context + ) { ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); return new ClientTransformIndexer( mock(ThreadPool.class), - mock(ClusterService.class), - mock(IndexNameExpressionResolver.class), + service, + resolver, mock(TransformExtension.class), new TransformServices( mock(IndexBasedTransformConfigManager.class), @@ -652,7 +719,7 @@ public class ClientTransformIndexerTests extends ESTestCase { new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()), new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), - mock(TransformContext.class), + context, false ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java index 3894ff3043cc..9f16e0d078b3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java @@ -93,7 +93,7 @@ public class TransformFailureHandlerTests extends ESTestCase { randomBoolean() ); - List.of(true, false).forEach((unattended) -> { assertRetryFailureCountNotIncremented(bulkIndexingException, unattended); }); + List.of(true, false).forEach((unattended) -> { assertClusterBlockHandled(bulkIndexingException, unattended); }); } public void testHandleIndexerFailure_IrrecoverableBulkIndexException() { @@ -197,7 +197,7 @@ public class TransformFailureHandlerTests extends ESTestCase { } } - private void assertRetryFailureCountNotIncremented(Exception e, boolean unattended) { + private void assertClusterBlockHandled(Exception e, boolean unattended) { String transformId = randomAlphaOfLength(10); SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build(); @@ -211,6 +211,7 @@ public class TransformFailureHandlerTests extends ESTestCase { assertNoFailure(handler, e, contextListener, settings, false); assertNoFailure(handler, e, contextListener, settings, false); assertNoFailure(handler, e, contextListener, settings, false); + assertTrue(context.isWaitingForIndexToUnblock()); } private void assertFailure(Exception e) {