Run TransportGetDataStreamLifecycleAction on local node (#125214)

This action solely needs the cluster state, it can run on any node.
Additionally, it needs to be cancellable to avoid doing unnecessary work
after a client failure or timeout.

Relates #101805
This commit is contained in:
Niels Bauman 2025-03-22 12:00:47 +01:00 committed by GitHub
parent fdd453734d
commit bbc47d9cad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 68 additions and 41 deletions

View file

@ -0,0 +1,5 @@
pr: 125214
summary: Run `TransportGetDataStreamLifecycleAction` on local node
area: Data streams
type: enhancement
issues: []

View file

@ -12,6 +12,7 @@ package org.elasticsearch.datastreams;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.support.CancellableActionTestPlugin; import org.elasticsearch.action.support.CancellableActionTestPlugin;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.RefCountingListener;
@ -64,6 +65,13 @@ public class DataStreamRestActionCancellationIT extends ESIntegTestCase {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME); runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME);
} }
public void testGetDataStreamLifecycleCancellation() {
runRestActionCancellationTest(
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_lifecycle"),
GetDataStreamLifecycleAction.INSTANCE.name()
);
}
private void runRestActionCancellationTest(Request request, String actionName) { private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

View file

@ -12,7 +12,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.DataStreamsActionUtil; import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -24,9 +25,10 @@ import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.Comparator; import java.util.Comparator;
@ -38,18 +40,23 @@ import java.util.Objects;
* Collects the data streams from the cluster state, filters the ones that do not have a data stream lifecycle configured and then returns * Collects the data streams from the cluster state, filters the ones that do not have a data stream lifecycle configured and then returns
* a list of the data stream name and respective lifecycle configuration. * a list of the data stream name and respective lifecycle configuration.
*/ */
public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeReadProjectAction< public class TransportGetDataStreamLifecycleAction extends TransportLocalProjectMetadataAction<
GetDataStreamLifecycleAction.Request, GetDataStreamLifecycleAction.Request,
GetDataStreamLifecycleAction.Response> { GetDataStreamLifecycleAction.Response> {
private final ClusterSettings clusterSettings; private final ClusterSettings clusterSettings;
private final IndexNameExpressionResolver indexNameExpressionResolver; private final IndexNameExpressionResolver indexNameExpressionResolver;
private final DataStreamGlobalRetentionSettings globalRetentionSettings; private final DataStreamGlobalRetentionSettings globalRetentionSettings;
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject @Inject
public TransportGetDataStreamLifecycleAction( public TransportGetDataStreamLifecycleAction(
TransportService transportService, TransportService transportService,
ClusterService clusterService, ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters, ActionFilters actionFilters,
ProjectResolver projectResolver, ProjectResolver projectResolver,
IndexNameExpressionResolver indexNameExpressionResolver, IndexNameExpressionResolver indexNameExpressionResolver,
@ -57,22 +64,28 @@ public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeRe
) { ) {
super( super(
GetDataStreamLifecycleAction.INSTANCE.name(), GetDataStreamLifecycleAction.INSTANCE.name(),
transportService,
clusterService,
threadPool,
actionFilters, actionFilters,
GetDataStreamLifecycleAction.Request::new, transportService.getTaskManager(),
projectResolver, clusterService,
GetDataStreamLifecycleAction.Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE,
EsExecutors.DIRECT_EXECUTOR_SERVICE projectResolver
); );
clusterSettings = clusterService.getClusterSettings(); clusterSettings = clusterService.getClusterSettings();
this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexNameExpressionResolver = indexNameExpressionResolver;
this.globalRetentionSettings = globalRetentionSettings; this.globalRetentionSettings = globalRetentionSettings;
transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetDataStreamLifecycleAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
} }
@Override @Override
protected void masterOperation( protected void localClusterStateOperation(
Task task, Task task,
GetDataStreamLifecycleAction.Request request, GetDataStreamLifecycleAction.Request request,
ProjectState state, ProjectState state,
@ -86,6 +99,7 @@ public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeRe
); );
Map<String, DataStream> dataStreams = state.metadata().dataStreams(); Map<String, DataStream> dataStreams = state.metadata().dataStreams();
((CancellableTask) task).ensureNotCancelled();
listener.onResponse( listener.onResponse(
new GetDataStreamLifecycleAction.Response( new GetDataStreamLifecycleAction.Response(
results.stream() results.stream()

View file

@ -18,6 +18,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import java.util.List; import java.util.List;
@ -46,7 +47,7 @@ public class RestGetDataStreamLifecycleAction extends BaseRestHandler {
); );
getDataLifecycleRequest.includeDefaults(request.paramAsBoolean("include_defaults", false)); getDataLifecycleRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
getDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataLifecycleRequest.indicesOptions())); getDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataLifecycleRequest.indicesOptions()));
return channel -> client.execute( return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
GetDataStreamLifecycleAction.INSTANCE, GetDataStreamLifecycleAction.INSTANCE,
getDataLifecycleRequest, getDataLifecycleRequest,
new RestRefCountedChunkedToXContentListener<>(channel) new RestRefCountedChunkedToXContentListener<>(channel)

View file

@ -15,7 +15,7 @@ import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -24,6 +24,10 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject; import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.ToXContentObject;
@ -33,6 +37,7 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /**
@ -44,7 +49,7 @@ public class GetDataStreamLifecycleAction {
private GetDataStreamLifecycleAction() {/* no instances */} private GetDataStreamLifecycleAction() {/* no instances */}
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable { public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
private String[] names; private String[] names;
private IndicesOptions indicesOptions = IndicesOptions.builder() private IndicesOptions indicesOptions = IndicesOptions.builder()
@ -89,6 +94,16 @@ public class GetDataStreamLifecycleAction {
return null; return null;
} }
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public Request(StreamInput in) throws IOException { public Request(StreamInput in) throws IOException {
super(in); super(in);
this.names = in.readOptionalStringArray(); this.names = in.readOptionalStringArray();
@ -96,14 +111,6 @@ public class GetDataStreamLifecycleAction {
this.includeDefaults = in.readBoolean(); this.includeDefaults = in.readBoolean();
} }
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(names);
indicesOptions.writeIndicesOptions(out);
out.writeBoolean(includeDefaults);
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
@ -169,14 +176,11 @@ public class GetDataStreamLifecycleAction {
public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField LIFECYCLE_FIELD = new ParseField("lifecycle"); public static final ParseField LIFECYCLE_FIELD = new ParseField("lifecycle");
DataStreamLifecycle(StreamInput in) throws IOException { /**
this( * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
in.readString(), * we no longer need to support calling this action remotely.
in.readOptionalWriteable(org.elasticsearch.cluster.metadata.DataStreamLifecycle::new), */
in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) && in.readBoolean() @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
);
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(dataStreamName); out.writeString(dataStreamName);
@ -238,16 +242,6 @@ public class GetDataStreamLifecycleAction {
this.globalRetention = globalRetention; this.globalRetention = globalRetention;
} }
public Response(StreamInput in) throws IOException {
this(
in.readCollectionAsList(DataStreamLifecycle::new),
in.readOptionalWriteable(RolloverConfiguration::new),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
? in.readOptionalWriteable(DataStreamGlobalRetention::read)
: null
);
}
public List<DataStreamLifecycle> getDataStreamLifecycles() { public List<DataStreamLifecycle> getDataStreamLifecycles() {
return dataStreamLifecycles; return dataStreamLifecycles;
} }
@ -261,6 +255,11 @@ public class GetDataStreamLifecycleAction {
return globalRetention; return globalRetention;
} }
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(dataStreamLifecycles); out.writeCollection(dataStreamLifecycles);