diff --git a/docs/changelog/125214.yaml b/docs/changelog/125214.yaml new file mode 100644 index 000000000000..7a72e09e7342 --- /dev/null +++ b/docs/changelog/125214.yaml @@ -0,0 +1,5 @@ +pr: 125214 +summary: Run `TransportGetDataStreamLifecycleAction` on local node +area: Data streams +type: enhancement +issues: [] diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java index 55835143fdca..4062b8915892 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java @@ -12,6 +12,7 @@ package org.elasticsearch.datastreams; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.support.CancellableActionTestPlugin; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.RefCountingListener; @@ -64,6 +65,13 @@ public class DataStreamRestActionCancellationIT extends ESIntegTestCase { runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME); } + public void testGetDataStreamLifecycleCancellation() { + runRestActionCancellationTest( + new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_lifecycle"), + GetDataStreamLifecycleAction.INSTANCE.name() + ); + } + private void runRestActionCancellationTest(Request request, String actionName) { final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java index 9398bc414f2b..a7f05852ed87 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java @@ -12,7 +12,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.datastreams.DataStreamsActionUtil; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -24,9 +25,10 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.Comparator; @@ -38,18 +40,23 @@ import java.util.Objects; * Collects the data streams from the cluster state, filters the ones that do not have a data stream lifecycle configured and then returns * a list of the data stream name and respective lifecycle configuration. */ -public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeReadProjectAction< +public class TransportGetDataStreamLifecycleAction extends TransportLocalProjectMetadataAction< GetDataStreamLifecycleAction.Request, GetDataStreamLifecycleAction.Response> { private final ClusterSettings clusterSettings; private final IndexNameExpressionResolver indexNameExpressionResolver; private final DataStreamGlobalRetentionSettings globalRetentionSettings; + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) + @SuppressWarnings("this-escape") @Inject public TransportGetDataStreamLifecycleAction( TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, ProjectResolver projectResolver, IndexNameExpressionResolver indexNameExpressionResolver, @@ -57,22 +64,28 @@ public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeRe ) { super( GetDataStreamLifecycleAction.INSTANCE.name(), - transportService, - clusterService, - threadPool, actionFilters, - GetDataStreamLifecycleAction.Request::new, - projectResolver, - GetDataStreamLifecycleAction.Response::new, - EsExecutors.DIRECT_EXECUTOR_SERVICE + transportService.getTaskManager(), + clusterService, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + projectResolver ); clusterSettings = clusterService.getClusterSettings(); this.indexNameExpressionResolver = indexNameExpressionResolver; this.globalRetentionSettings = globalRetentionSettings; + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + GetDataStreamLifecycleAction.Request::new, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override - protected void masterOperation( + protected void localClusterStateOperation( Task task, GetDataStreamLifecycleAction.Request request, ProjectState state, @@ -86,6 +99,7 @@ public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeRe ); Map dataStreams = state.metadata().dataStreams(); + ((CancellableTask) task).ensureNotCancelled(); listener.onResponse( new GetDataStreamLifecycleAction.Response( results.stream() diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamLifecycleAction.java index 4a1050d9723a..35a77ba88ea9 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamLifecycleAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; import java.util.List; @@ -46,7 +47,7 @@ public class RestGetDataStreamLifecycleAction extends BaseRestHandler { ); getDataLifecycleRequest.includeDefaults(request.paramAsBoolean("include_defaults", false)); getDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataLifecycleRequest.indicesOptions())); - return channel -> client.execute( + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( GetDataStreamLifecycleAction.INSTANCE, getDataLifecycleRequest, new RestRefCountedChunkedToXContentListener<>(channel) diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleAction.java index 401bd7a27c6f..5dac1d632e3d 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleAction.java @@ -15,7 +15,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; @@ -24,6 +24,10 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ChunkedToXContentObject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentObject; @@ -33,6 +37,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -44,7 +49,7 @@ public class GetDataStreamLifecycleAction { private GetDataStreamLifecycleAction() {/* no instances */} - public static class Request extends MasterNodeReadRequest implements IndicesRequest.Replaceable { + public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable { private String[] names; private IndicesOptions indicesOptions = IndicesOptions.builder() @@ -89,6 +94,16 @@ public class GetDataStreamLifecycleAction { return null; } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) public Request(StreamInput in) throws IOException { super(in); this.names = in.readOptionalStringArray(); @@ -96,14 +111,6 @@ public class GetDataStreamLifecycleAction { this.includeDefaults = in.readBoolean(); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalStringArray(names); - indicesOptions.writeIndicesOptions(out); - out.writeBoolean(includeDefaults); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -169,14 +176,11 @@ public class GetDataStreamLifecycleAction { public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField LIFECYCLE_FIELD = new ParseField("lifecycle"); - DataStreamLifecycle(StreamInput in) throws IOException { - this( - in.readString(), - in.readOptionalWriteable(org.elasticsearch.cluster.metadata.DataStreamLifecycle::new), - in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) && in.readBoolean() - ); - } - + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(dataStreamName); @@ -238,16 +242,6 @@ public class GetDataStreamLifecycleAction { this.globalRetention = globalRetention; } - public Response(StreamInput in) throws IOException { - this( - in.readCollectionAsList(DataStreamLifecycle::new), - in.readOptionalWriteable(RolloverConfiguration::new), - in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) - ? in.readOptionalWriteable(DataStreamGlobalRetention::read) - : null - ); - } - public List getDataStreamLifecycles() { return dataStreamLifecycles; } @@ -261,6 +255,11 @@ public class GetDataStreamLifecycleAction { return globalRetention; } + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(dataStreamLifecycles);