[Transform] Wait while index is blocked (#119542)

When the destination index is blocked, the bulk request is ignored.
Each subsequent trigger will check the cluster state if the index is
unblocked or else drop the trigger.

Users can call the ScheduleNow API to skip the index block check and let
the trigger start the next checkpoint - if the index is blocked then the
bulk request will fail again.

While the Transform is skipping triggers, the state will display as
`waiting` from the Get Stats API and on the Transform UI in Kibana.
This commit is contained in:
Pat Whelan 2025-01-08 16:42:00 -05:00 committed by GitHub
parent 82da6e970e
commit b367f70f61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 225 additions and 10 deletions

View file

@ -0,0 +1,5 @@
pr: 119542
summary: Wait while index is blocked
area: Transform
type: enhancement
issues: []

View file

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
@ -152,7 +153,7 @@ public class TransformIT extends TransformRestTestCase {
public void testContinuousTransformCrud() throws Exception { public void testContinuousTransformCrud() throws Exception {
var transformId = "transform-continuous-crud"; var transformId = "transform-continuous-crud";
var indexName = "continuous-crud-reviews"; var indexName = "continuous-crud-reviews";
createContinuousTransform(indexName, transformId); createContinuousTransform(indexName, transformId, "reviews-by-user-business-day");
var transformStats = getBasicTransformStats(transformId); var transformStats = getBasicTransformStats(transformId);
assertThat(transformStats.get("state"), equalTo("started")); assertThat(transformStats.get("state"), equalTo("started"));
@ -181,7 +182,7 @@ public class TransformIT extends TransformRestTestCase {
deleteTransform(transformId); deleteTransform(transformId);
} }
private void createContinuousTransform(String indexName, String transformId) throws Exception { private void createContinuousTransform(String indexName, String transformId, String destinationIndex) throws Exception {
createReviewsIndex(indexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); createReviewsIndex(indexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
var groups = Map.of( var groups = Map.of(
@ -197,8 +198,9 @@ public class TransformIT extends TransformRestTestCase {
.addAggregator(AggregationBuilders.avg("review_score").field("stars")) .addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); .addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
var config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day", QueryConfig.matchAll(), indexName) var config = createTransformConfigBuilder(transformId, destinationIndex, QueryConfig.matchAll(), indexName).setPivotConfig(
.setPivotConfig(createPivotConfig(groups, aggs)) createPivotConfig(groups, aggs)
)
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.setSettings(new SettingsConfig.Builder().setAlignCheckpoints(false).build()) .setSettings(new SettingsConfig.Builder().setAlignCheckpoints(false).build())
.build(); .build();
@ -216,7 +218,7 @@ public class TransformIT extends TransformRestTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testBasicContinuousTransformStats() throws Exception { public void testBasicContinuousTransformStats() throws Exception {
var transformId = "transform-continuous-basic-stats"; var transformId = "transform-continuous-basic-stats";
createContinuousTransform("continuous-basic-stats-reviews", transformId); createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day");
var transformStats = getBasicTransformStats(transformId); var transformStats = getBasicTransformStats(transformId);
assertEquals("started", XContentMapValues.extractValue("state", transformStats)); assertEquals("started", XContentMapValues.extractValue("state", transformStats));
@ -230,6 +232,40 @@ public class TransformIT extends TransformRestTestCase {
deleteTransform(transformId); deleteTransform(transformId);
} }
public void testDestinationIndexBlocked() throws Exception {
var transformId = "transform-continuous-blocked-destination";
var sourceIndexName = "source-reviews";
var destIndexName = "destination-reviews";
// create transform & indices, wait until 1st checkpoint is finished
createContinuousTransform(sourceIndexName, transformId, destIndexName);
// block destination index
Request request = new Request("PUT", destIndexName + "/_block/write");
assertAcknowledged(adminClient().performRequest(request));
// index more docs so the checkpoint tries to run, wait until transform stops
assertBusy(() -> {
indexDoc(42, sourceIndexName);
assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId));
}, 30, TimeUnit.SECONDS);
// unblock index
request = new Request("PUT", destIndexName + "/_settings");
request.setJsonEntity("""
{ "blocks.write": false }
""");
assertAcknowledged(adminClient().performRequest(request));
assertBusy(() -> {
indexDoc(42, sourceIndexName);
assertEquals(TransformStats.State.STARTED.value(), getTransformState(transformId));
}, 30, TimeUnit.SECONDS);
stopTransform(transformId);
deleteTransform(transformId);
}
public void testTransformLifecycleInALoop() throws Exception { public void testTransformLifecycleInALoop() throws Exception {
String transformId = "lifecycle-in-a-loop"; String transformId = "lifecycle-in-a-loop";
String indexName = transformId + "-src"; String indexName = transformId + "-src";
@ -652,4 +688,17 @@ public class TransformIT extends TransformRestTestCase {
bulkBuilder.append("\r\n"); bulkBuilder.append("\r\n");
doBulk(bulkBuilder.toString(), true); doBulk(bulkBuilder.toString(), true);
} }
private void indexDoc(long userId, String index) throws Exception {
StringBuilder bulkBuilder = new StringBuilder();
bulkBuilder.append(format("""
{"create":{"_index":"%s"}}
""", index));
String source = format("""
{"user_id":"user_%s","count":%s,"business_id":"business_%s","stars":%s,"timestamp":%s}
""", userId, 1, 2, 5, Instant.now().toEpochMilli());
bulkBuilder.append(source);
bulkBuilder.append("\r\n");
doBulk(bulkBuilder.toString(), true);
}
} }

View file

@ -270,6 +270,9 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
&& derivedState.equals(TransformStats.State.FAILED) == false) { && derivedState.equals(TransformStats.State.FAILED) == false) {
derivedState = TransformStats.State.STOPPING; derivedState = TransformStats.State.STOPPING;
reason = Strings.isNullOrEmpty(reason) ? "transform is set to stop at the next checkpoint" : reason; reason = Strings.isNullOrEmpty(reason) ? "transform is set to stop at the next checkpoint" : reason;
} else if (derivedState.equals(TransformStats.State.STARTED) && transformTask.getContext().isWaitingForIndexToUnblock()) {
derivedState = TransformStats.State.WAITING;
reason = Strings.isNullOrEmpty(reason) ? "transform is paused while destination index is blocked" : reason;
} }
return new TransformStats( return new TransformStats(
transformTask.getTransformId(), transformTask.getTransformId(),

View file

@ -128,6 +128,10 @@ public class TransportScheduleNowTransformAction extends TransportTasksAction<Tr
TransformTask transformTask, TransformTask transformTask,
ActionListener<Response> listener ActionListener<Response> listener
) { ) {
if (transformTask.getContext().isWaitingForIndexToUnblock()) {
logger.debug("[{}] Destination index is blocked. User requested a retry.", transformTask.getTransformId());
transformTask.getContext().setIsWaitingForIndexToUnblock(false);
}
transformScheduler.scheduleNow(request.getId()); transformScheduler.scheduleNow(request.getId());
listener.onResponse(Response.TRUE); listener.onResponse(Response.TRUE);
} }

View file

@ -25,8 +25,10 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportClosePointInTimeAction;
import org.elasticsearch.action.search.TransportOpenPointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -443,9 +445,36 @@ class ClientTransformIndexer extends TransformIndexer {
logger.debug("[{}] schedule was triggered but the Transform is upgrading. Ignoring trigger.", getJobId()); logger.debug("[{}] schedule was triggered but the Transform is upgrading. Ignoring trigger.", getJobId());
return false; return false;
} }
if (context.isWaitingForIndexToUnblock()) {
if (destinationIndexHasWriteBlock()) {
logger.debug("[{}] schedule was triggered but the destination index has a write block. Ignoring trigger.", getJobId());
return false;
}
logger.debug("[{}] destination index is no longer blocked.", getJobId());
context.setIsWaitingForIndexToUnblock(false);
}
return super.maybeTriggerAsyncJob(now); return super.maybeTriggerAsyncJob(now);
} }
private boolean destinationIndexHasWriteBlock() {
var clusterState = clusterService.state();
if (clusterState == null) {
// if we can't determine if the index is blocked, we assume it isn't, even though the bulk request may fail again
return false;
}
var destinationIndexName = transformConfig.getDestination().getIndex();
var destinationIndex = indexNameExpressionResolver.concreteWriteIndex(
clusterState,
IndicesOptions.lenientExpandOpen(),
destinationIndexName,
true,
false
);
return destinationIndex != null && clusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, destinationIndex.getName());
}
@Override @Override
protected void onStop() { protected void onStop() {
closePointInTime(); closePointInTime();

View file

@ -49,6 +49,14 @@ public class TransformContext {
private volatile AuthorizationState authState; private volatile AuthorizationState authState;
private volatile int pageSize = 0; private volatile int pageSize = 0;
/**
* If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it.
* {@link TransformFailureHandler} will silence the error so the Transform automatically retries.
* Every time the Transform runs, it will check if the index is unblocked and reset this to false.
* Users can override this via the `_schedule_now` API.
*/
private volatile boolean isWaitingForIndexToUnblock = false;
// the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished // Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
private final AtomicLong currentCheckpoint; private final AtomicLong currentCheckpoint;
@ -183,6 +191,14 @@ public class TransformContext {
this.shouldRecreateDestinationIndex = shouldRecreateDestinationIndex; this.shouldRecreateDestinationIndex = shouldRecreateDestinationIndex;
} }
public boolean isWaitingForIndexToUnblock() {
return isWaitingForIndexToUnblock;
}
public void setIsWaitingForIndexToUnblock(boolean isWaitingForIndexToUnblock) {
this.isWaitingForIndexToUnblock = isWaitingForIndexToUnblock;
}
public AuthorizationState getAuthState() { public AuthorizationState getAuthState() {
return authState; return authState;
} }

View file

@ -170,6 +170,7 @@ class TransformFailureHandler {
*/ */
private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) { private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) {
if (bulkIndexingException.getCause() instanceof ClusterBlockException) { if (bulkIndexingException.getCause() instanceof ClusterBlockException) {
context.setIsWaitingForIndexToUnblock(true);
retryWithoutIncrementingFailureCount( retryWithoutIncrementingFailureCount(
bulkIndexingException, bulkIndexingException,
bulkIndexingException.getDetailedMessage(), bulkIndexingException.getDetailedMessage(),

View file

@ -343,6 +343,46 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
); );
} }
public void testDeriveStatsWithIndexBlock() {
String transformId = "transform-with-stats";
String reason = "transform is paused while destination index is blocked";
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
TransformState runningState = new TransformState(
TransformTaskState.STARTED,
IndexerState.STARTED,
null,
0,
null,
null,
null,
false,
null
);
var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock());
context.setIsWaitingForIndexToUnblock(true);
var task = mock(TransformTask.class);
when(task.getContext()).thenReturn(context);
when(task.getTransformId()).thenReturn(transformId);
when(task.getState()).thenReturn(runningState);
when(task.getStats()).thenReturn(stats);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(
new TransformStats(
transformId,
TransformStats.State.WAITING,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
)
)
);
}
private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) { private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) {
when(task.getTransformId()).thenReturn(transformId); when(task.getTransformId()).thenReturn(transformId);
when(task.getState()).thenReturn(state); when(task.getState()).thenReturn(state);

View file

@ -22,13 +22,18 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple; import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -44,6 +49,7 @@ import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.TransformMetadata;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@ -53,6 +59,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformEffectiveSetti
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformExtension;
import org.elasticsearch.xpack.transform.TransformNode; import org.elasticsearch.xpack.transform.TransformNode;
@ -77,6 +84,10 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -467,6 +478,53 @@ public class ClientTransformIndexerTests extends ESTestCase {
} }
} }
public void testIndexBlocked() {
var service = serviceWithBlockCheck(true);
var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock());
var indexer = createTestIndexer(mock(), service, resolver(), context);
context.setIsWaitingForIndexToUnblock(true);
assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli()));
assertTrue(context.isWaitingForIndexToUnblock());
}
public void testIndexUnblocked() {
var service = serviceWithBlockCheck(false);
// set state to failed so that TransformIndexer returns false
var context = new TransformContext(TransformTaskState.FAILED, "", 0, mock());
var indexer = createTestIndexer(mock(), service, resolver(), context);
context.setIsWaitingForIndexToUnblock(true);
assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli()));
// ClientTransformIndexer's maybeTriggerAsyncJob should reset isWaitingForIndexToUnblock to false
assertFalse(context.isWaitingForIndexToUnblock());
}
private ClusterService serviceWithBlockCheck(boolean checkResponse) {
var clusterBlocks = mock(ClusterBlocks.class);
when(clusterBlocks.indexBlocked(eq(ClusterBlockLevel.WRITE), anyString())).thenReturn(checkResponse);
var metadata = mock(Metadata.class);
when(metadata.custom(eq(TransformMetadata.TYPE))).thenReturn(TransformMetadata.EMPTY_METADATA);
var clusterState = mock(ClusterState.class);
when(clusterState.blocks()).thenReturn(clusterBlocks);
when(clusterState.getMetadata()).thenReturn(metadata);
var clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(clusterState);
return clusterService;
}
private IndexNameExpressionResolver resolver() {
var resolver = mock(IndexNameExpressionResolver.class);
when(resolver.concreteWriteIndex(any(), any(), any(), anyBoolean(), anyBoolean())).thenAnswer(ans -> {
Index destIndex = mock();
when(destIndex.getName()).thenReturn(ans.getArgument(2));
return destIndex;
});
return resolver;
}
private static class MockClientTransformIndexer extends ClientTransformIndexer { private static class MockClientTransformIndexer extends ClientTransformIndexer {
MockClientTransformIndexer( MockClientTransformIndexer(
@ -627,13 +685,22 @@ public class ClientTransformIndexerTests extends ESTestCase {
} }
private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient client) { private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient client) {
return createTestIndexer(client, mock(), mock(), mock(TransformContext.class));
}
private ClientTransformIndexer createTestIndexer(
ParentTaskAssigningClient client,
ClusterService service,
IndexNameExpressionResolver resolver,
TransformContext context
) {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
return new ClientTransformIndexer( return new ClientTransformIndexer(
mock(ThreadPool.class), mock(ThreadPool.class),
mock(ClusterService.class), service,
mock(IndexNameExpressionResolver.class), resolver,
mock(TransformExtension.class), mock(TransformExtension.class),
new TransformServices( new TransformServices(
mock(IndexBasedTransformConfigManager.class), mock(IndexBasedTransformConfigManager.class),
@ -652,7 +719,7 @@ public class ClientTransformIndexerTests extends ESTestCase {
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()), new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class), context,
false false
); );
} }

View file

@ -93,7 +93,7 @@ public class TransformFailureHandlerTests extends ESTestCase {
randomBoolean() randomBoolean()
); );
List.of(true, false).forEach((unattended) -> { assertRetryFailureCountNotIncremented(bulkIndexingException, unattended); }); List.of(true, false).forEach((unattended) -> { assertClusterBlockHandled(bulkIndexingException, unattended); });
} }
public void testHandleIndexerFailure_IrrecoverableBulkIndexException() { public void testHandleIndexerFailure_IrrecoverableBulkIndexException() {
@ -197,7 +197,7 @@ public class TransformFailureHandlerTests extends ESTestCase {
} }
} }
private void assertRetryFailureCountNotIncremented(Exception e, boolean unattended) { private void assertClusterBlockHandled(Exception e, boolean unattended) {
String transformId = randomAlphaOfLength(10); String transformId = randomAlphaOfLength(10);
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build(); SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();
@ -211,6 +211,7 @@ public class TransformFailureHandlerTests extends ESTestCase {
assertNoFailure(handler, e, contextListener, settings, false); assertNoFailure(handler, e, contextListener, settings, false);
assertNoFailure(handler, e, contextListener, settings, false); assertNoFailure(handler, e, contextListener, settings, false);
assertNoFailure(handler, e, contextListener, settings, false); assertNoFailure(handler, e, contextListener, settings, false);
assertTrue(context.isWaitingForIndexToUnblock());
} }
private void assertFailure(Exception e) { private void assertFailure(Exception e) {