diff --git a/docs/changelog/122852.yaml b/docs/changelog/122852.yaml new file mode 100644 index 000000000000..579404bdc7f0 --- /dev/null +++ b/docs/changelog/122852.yaml @@ -0,0 +1,5 @@ +pr: 122852 +summary: Run `TransportGetDataStreamsAction` on local node +area: Data streams +type: enhancement +issues: [] diff --git a/modules/data-streams/build.gradle b/modules/data-streams/build.gradle index 62c3efdc5ecc..6a76dbd55623 100644 --- a/modules/data-streams/build.gradle +++ b/modules/data-streams/build.gradle @@ -22,6 +22,7 @@ dependencies { testImplementation project(path: ':test:test-clusters') testImplementation project(":modules:mapper-extras") internalClusterTestImplementation project(":modules:mapper-extras") + internalClusterTestImplementation project(':modules:rest-root') } tasks.withType(StandaloneRestIntegTestTask).configureEach { diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java new file mode 100644 index 000000000000..55835143fdca --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams; + +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.support.CancellableActionTestPlugin; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.root.MainRestPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.transport.netty4.Netty4Plugin; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener; +import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.oneOf; + +public class DataStreamRestActionCancellationIT extends ESIntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME) + .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return List.of(getTestTransportPlugin(), MainRestPlugin.class, CancellableActionTestPlugin.class, DataStreamsPlugin.class); + } + + public void testGetDataStreamCancellation() { + runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream"), GetDataStreamAction.NAME); + runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME); + } + + private void runRestActionCancellationTest(Request request, String actionName) { + final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + + try ( + var restClient = createRestClient(node); + var capturingAction = CancellableActionTestPlugin.capturingActionOnNode(actionName, node) + ) { + final var responseFuture = new PlainActionFuture(); + final var restInvocation = restClient.performRequestAsync(request, wrapAsRestResponseListener(responseFuture)); + + if (randomBoolean()) { + // cancel by aborting the REST request + capturingAction.captureAndCancel(restInvocation::cancel); + expectThrows(ExecutionException.class, CancellationException.class, () -> responseFuture.get(10, TimeUnit.SECONDS)); + } else { + // cancel via the task management API + final var cancelFuture = new PlainActionFuture(); + capturingAction.captureAndCancel( + () -> SubscribableListener + + .newForked( + l -> restClient.performRequestAsync( + getListTasksRequest(node, actionName), + wrapAsRestResponseListener(l.map(ObjectPath::createFromResponse)) + ) + ) + + .andThen((l, listTasksResponse) -> { + final var taskCount = listTasksResponse.evaluateArraySize("tasks"); + assertThat(taskCount, greaterThan(0)); + try (var listeners = new RefCountingListener(l)) { + for (int i = 0; i < taskCount; i++) { + final var taskPrefix = "tasks." + i + "."; + assertTrue(listTasksResponse.evaluate(taskPrefix + "cancellable")); + assertFalse(listTasksResponse.evaluate(taskPrefix + "cancelled")); + restClient.performRequestAsync( + getCancelTaskRequest( + listTasksResponse.evaluate(taskPrefix + "node"), + listTasksResponse.evaluate(taskPrefix + "id") + ), + wrapAsRestResponseListener(listeners.acquire(DataStreamRestActionCancellationIT::assertOK)) + ); + } + } + }) + + .addListener(cancelFuture) + ); + cancelFuture.get(10, TimeUnit.SECONDS); + expectThrows(Exception.class, () -> responseFuture.get(10, TimeUnit.SECONDS)); + } + + assertAllTasksHaveFinished(actionName); + } catch (Exception e) { + fail(e); + } + } + + private static Request getListTasksRequest(String taskNode, String actionName) { + final var listTasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks"); + listTasksRequest.addParameter("nodes", taskNode); + listTasksRequest.addParameter("actions", actionName); + listTasksRequest.addParameter("group_by", "none"); + return listTasksRequest; + } + + private static Request getCancelTaskRequest(String taskNode, int taskId) { + final var cancelTaskRequest = new Request(HttpPost.METHOD_NAME, Strings.format("/_tasks/%s:%d/_cancel", taskNode, taskId)); + cancelTaskRequest.addParameter("wait_for_completion", null); + return cancelTaskRequest; + } + + public static void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), oneOf(200, 201)); + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java index f37158ba3fc6..1ffc82a263e4 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java @@ -17,7 +17,8 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.IndexProperties; import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.ManagedBy; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ProjectState; @@ -39,6 +40,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettingProvider; @@ -47,6 +49,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -63,7 +66,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING; -public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjectAction< +public class TransportGetDataStreamsAction extends TransportLocalProjectMetadataAction< GetDataStreamAction.Request, GetDataStreamAction.Response> { @@ -76,6 +79,12 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec private final IndexSettingProviders indexSettingProviders; private final Client client; + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) + @SuppressWarnings("this-escape") @Inject public TransportGetDataStreamsAction( TransportService transportService, @@ -92,14 +101,11 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec ) { super( GetDataStreamAction.NAME, - transportService, - clusterService, - threadPool, actionFilters, - GetDataStreamAction.Request::new, - projectResolver, - GetDataStreamAction.Response::new, - transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT) + transportService.getTaskManager(), + clusterService, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + projectResolver ); this.indexNameExpressionResolver = indexNameExpressionResolver; this.systemIndices = systemIndices; @@ -108,21 +114,32 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; this.indexSettingProviders = indexSettingProviders; this.client = new OriginSettingClient(client, "stack"); + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + GetDataStreamAction.Request::new, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override - protected void masterOperation( + protected void localClusterStateOperation( Task task, GetDataStreamAction.Request request, ProjectState state, ActionListener listener ) throws Exception { + ((CancellableTask) task).ensureNotCancelled(); if (request.verbose()) { DataStreamsStatsAction.Request req = new DataStreamsStatsAction.Request(); req.indices(request.indices()); client.execute(DataStreamsStatsAction.INSTANCE, req, new ActionListener<>() { @Override public void onResponse(DataStreamsStatsAction.Response response) { + ((CancellableTask) task).ensureNotCancelled(); final Map maxTimestamps = Arrays.stream(response.getDataStreams()) .collect( Collectors.toMap( diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java index be157608b1c3..d1e04eb4072b 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import java.util.List; @@ -64,7 +65,11 @@ public class RestGetDataStreamsAction extends BaseRestHandler { getDataStreamsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false)); getDataStreamsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamsRequest.indicesOptions())); getDataStreamsRequest.verbose(request.paramAsBoolean("verbose", false)); - return channel -> client.execute(GetDataStreamAction.INSTANCE, getDataStreamsRequest, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + GetDataStreamAction.INSTANCE, + getDataStreamsRequest, + new RestToXContentListener<>(channel) + ); } @Override diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsRequestTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsRequestTests.java deleted file mode 100644 index 3daf9121e099..000000000000 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsRequestTests.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.datastreams.action; - -import org.elasticsearch.action.datastreams.GetDataStreamAction.Request; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { - - @Override - protected Writeable.Reader instanceReader() { - return Request::new; - } - - @Override - protected Request createTestInstance() { - var req = new Request(TEST_REQUEST_TIMEOUT, switch (randomIntBetween(1, 4)) { - case 1 -> generateRandomStringArray(3, 8, false, false); - case 2 -> { - String[] parameters = generateRandomStringArray(3, 8, false, false); - for (int k = 0; k < parameters.length; k++) { - parameters[k] = parameters[k] + "*"; - } - yield parameters; - } - case 3 -> new String[] { "*" }; - default -> null; - }); - req.verbose(randomBoolean()); - return req; - } - - @Override - protected Request mutateInstance(Request instance) { - var indices = instance.indices(); - var indicesOpts = instance.indicesOptions(); - var includeDefaults = instance.includeDefaults(); - var verbose = instance.verbose(); - switch (randomIntBetween(0, 3)) { - case 0 -> indices = randomValueOtherThan(indices, () -> generateRandomStringArray(3, 8, false, false)); - case 1 -> indicesOpts = randomValueOtherThan( - indicesOpts, - () -> IndicesOptions.fromOptions( - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean() - ) - ); - case 2 -> includeDefaults = includeDefaults == false; - case 3 -> verbose = verbose == false; - } - var newReq = new Request(instance.masterNodeTimeout(), indices); - newReq.includeDefaults(includeDefaults); - newReq.indicesOptions(indicesOpts); - newReq.verbose(verbose); - return newReq; - } - -} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java index d9efa4d458f4..09d43989bff1 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java @@ -14,14 +14,10 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.DataStreamOptions; -import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; -import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -29,9 +25,6 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.json.JsonXContent; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,30 +33,7 @@ import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureSto import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase { - - @Override - protected Writeable.Reader instanceReader() { - return Response::new; - } - - @Override - protected Response createTestInstance() { - int numDataStreams = randomIntBetween(0, 8); - List dataStreams = new ArrayList<>(); - for (int i = 0; i < numDataStreams; i++) { - dataStreams.add(generateRandomDataStreamInfo()); - } - return new Response(dataStreams); - } - - @Override - protected Response mutateInstance(Response instance) { - if (instance.getDataStreams().isEmpty()) { - return new Response(List.of(generateRandomDataStreamInfo())); - } - return new Response(instance.getDataStreams().stream().map(this::mutateInstance).toList()); - } +public class GetDataStreamsResponseTests extends ESTestCase { @SuppressWarnings("unchecked") public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Exception { @@ -283,97 +253,4 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase assertThat(ManagedBy.LIFECYCLE.displayValue, is("Data stream lifecycle")); assertThat(ManagedBy.UNMANAGED.displayValue, is("Unmanaged")); } - - private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) { - var dataStream = instance.getDataStream(); - var failureStoreEffectivelyEnabled = instance.isFailureStoreEffectivelyEnabled(); - var status = instance.getDataStreamStatus(); - var indexTemplate = instance.getIndexTemplate(); - var ilmPolicyName = instance.getIlmPolicy(); - var timeSeries = instance.getTimeSeries(); - var indexSettings = instance.getIndexSettingsValues(); - var templatePreferIlm = instance.templatePreferIlmValue(); - var maximumTimestamp = instance.getMaximumTimestamp(); - switch (randomIntBetween(0, 8)) { - case 0 -> dataStream = randomValueOtherThan(dataStream, DataStreamTestHelper::randomInstance); - case 1 -> status = randomValueOtherThan(status, () -> randomFrom(ClusterHealthStatus.values())); - case 2 -> indexTemplate = randomBoolean() && indexTemplate != null ? null : randomAlphaOfLengthBetween(2, 10); - case 3 -> ilmPolicyName = randomBoolean() && ilmPolicyName != null ? null : randomAlphaOfLengthBetween(2, 10); - case 4 -> timeSeries = randomBoolean() && timeSeries != null - ? null - : randomValueOtherThan(timeSeries, () -> new Response.TimeSeries(generateRandomTimeSeries())); - case 5 -> indexSettings = randomValueOtherThan( - indexSettings, - () -> randomBoolean() - ? Map.of() - : Map.of( - new Index(randomAlphaOfLengthBetween(50, 100), UUIDs.base64UUID()), - new Response.IndexProperties( - randomBoolean(), - randomAlphaOfLengthBetween(50, 100), - randomBoolean() ? ManagedBy.ILM : ManagedBy.LIFECYCLE, - null - ) - ) - ); - case 6 -> templatePreferIlm = templatePreferIlm ? false : true; - case 7 -> maximumTimestamp = (maximumTimestamp == null) - ? randomNonNegativeLong() - : (usually() ? randomValueOtherThan(maximumTimestamp, ESTestCase::randomNonNegativeLong) : null); - case 8 -> failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled ? false : true; - } - return new Response.DataStreamInfo( - dataStream, - failureStoreEffectivelyEnabled, - status, - indexTemplate, - ilmPolicyName, - timeSeries, - indexSettings, - templatePreferIlm, - maximumTimestamp, - null - ); - } - - private List> generateRandomTimeSeries() { - List> timeSeries = new ArrayList<>(); - int numTimeSeries = randomIntBetween(0, 3); - for (int j = 0; j < numTimeSeries; j++) { - timeSeries.add(new Tuple<>(Instant.now(), Instant.now())); - } - return timeSeries; - } - - private Map generateRandomIndexSettingsValues() { - Map values = new HashMap<>(); - for (int i = 0; i < randomIntBetween(0, 3); i++) { - values.put( - new Index(randomAlphaOfLengthBetween(50, 100), UUIDs.base64UUID()), - new Response.IndexProperties( - randomBoolean(), - randomAlphaOfLengthBetween(50, 100), - randomBoolean() ? ManagedBy.ILM : ManagedBy.LIFECYCLE, - randomBoolean() ? randomFrom(IndexMode.values()).getName() : null - ) - ); - } - return values; - } - - private Response.DataStreamInfo generateRandomDataStreamInfo() { - List> timeSeries = randomBoolean() ? generateRandomTimeSeries() : null; - return new Response.DataStreamInfo( - DataStreamTestHelper.randomInstance(), - randomBoolean(), - ClusterHealthStatus.GREEN, - randomAlphaOfLengthBetween(2, 10), - randomAlphaOfLengthBetween(2, 10), - timeSeries != null ? new Response.TimeSeries(timeSeries) : null, - generateRandomIndexSettingsValues(), - randomBoolean(), - usually() ? randomNonNegativeLong() : null, - usually() ? randomFrom(IndexMode.values()).getName() : null - ); - } } diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index c0fae1443413..0fdf30fa4ca9 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -15,7 +15,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.cluster.SimpleDiffable; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.DataStream; @@ -28,8 +28,12 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -53,7 +57,7 @@ public class GetDataStreamAction extends ActionType implements IndicesRequest.Replaceable { + public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable { private String[] names; private IndicesOptions indicesOptions = IndicesOptions.builder() @@ -104,6 +108,16 @@ public class GetDataStreamAction extends ActionType headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) public Request(StreamInput in) throws IOException { super(in); this.names = in.readOptionalStringArray(); @@ -120,19 +134,6 @@ public class GetDataStreamAction extends ActionType> temporalRanges) implements Writeable { - TimeSeries(StreamInput in) throws IOException { - this(in.readCollectionAsList(in1 -> new Tuple<>(in1.readInstant(), in1.readInstant()))); - } - + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(temporalRanges, (out1, value) -> { @@ -618,16 +602,6 @@ public class GetDataStreamAction extends ActionType getDataStreams() { return dataStreams; } @@ -642,6 +616,11 @@ public class GetDataStreamAction extends ActionType