diff --git a/docs/reference/migration/apis/data-stream-reindex.asciidoc b/docs/reference/migration/apis/data-stream-reindex.asciidoc index 4641e0fe0911..ccaed9797972 100644 --- a/docs/reference/migration/apis/data-stream-reindex.asciidoc +++ b/docs/reference/migration/apis/data-stream-reindex.asciidoc @@ -21,9 +21,10 @@ from the original backing indices are copied to the resulting backing indices. This api runs in the background because reindexing all indices in a large data stream is expected to take a large amount of time and resources. The endpoint will return immediately and a persistent task will be created to run in the background. The current status of the task can be checked with -the <>. This status will be available for 24 hours after the task completes, whether -it finished successfully or failed. If the status is still available for a task, the task must be cancelled before it can be re-run. -A running or recently completed data stream reindex task can be cancelled using the <>. +the <>. This status will be available for 24 hours after the task +completes, whether it finished successfully or failed. However, only the last status is retained so re-running a reindex +will overwrite the previous status for that data stream. A running or recently completed data stream reindex task can be +cancelled using the <>. /////////////////////////////////////////////////////////// [source,console] diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index a6d9adc6b4e3..15d21b79fee9 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -13,11 +13,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -38,13 +40,15 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId); + + if (persistentTask == null) { + startTask(listener, persistentTaskId, params); + } else { + GetMigrationReindexStatusAction.Request statusRequest = new GetMigrationReindexStatusAction.Request(sourceDataStreamName); + statusRequest.setParentTask(task.getParentTaskId()); + client.execute( + GetMigrationReindexStatusAction.INSTANCE, + statusRequest, + listener.delegateFailureAndWrap((getListener, getResponse) -> { + if (getResponse.getEnrichedStatus().complete() == false) { + throw new ResourceAlreadyExistsException("Reindex task for data stream [{}] already exists", sourceDataStreamName); + } + CancelReindexDataStreamAction.Request cancelRequest = new CancelReindexDataStreamAction.Request(sourceDataStreamName); + cancelRequest.setParentTask(task.getParentTaskId()); + client.execute( + CancelReindexDataStreamAction.INSTANCE, + cancelRequest, + getListener.delegateFailureAndWrap( + (cancelListener, cancelResponse) -> startTask(cancelListener, persistentTaskId, params) + ) + ); + }) + ); + } + + } + + private void startTask(ActionListener listener, String persistentTaskId, ReindexDataStreamTaskParams params) { persistentTasksService.sendStartRequest( persistentTaskId, ReindexDataStreamTask.TASK_NAME, diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index a156e571b6ce..1d320f97a41a 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -623,6 +623,11 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase { assertThat(statusResponseString, ((List) statusResponseMap.get("errors")).size(), equalTo(expectedErrorCount)); } }, 60, TimeUnit.SECONDS); + + // Verify it's possible to reindex again after a successful reindex + reindexResponse = upgradeUserClient.performRequest(reindexRequest); + assertOK(reindexResponse); + Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel"); Response cancelResponse = upgradeUserClient.performRequest(cancelRequest); assertOK(cancelResponse);