Run GetPipelineTransportAction on local node (#120445)

This action solely needs the cluster state, it can run on any node.
Additionally, it needs to be cancellable to avoid doing unnecessary work
after a client failure or timeout.

Relates #101805
This commit is contained in:
Niels Bauman 2025-01-22 08:16:31 +10:00 committed by GitHub
parent f404da0f62
commit 5efe216958
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 67 additions and 109 deletions

View file

@ -0,0 +1,5 @@
pr: 120445
summary: Run `GetPipelineTransportAction` on local node
area: Ingest Node
type: enhancement
issues: []

View file

@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTem
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
import org.elasticsearch.action.admin.indices.template.post.SimulateIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.post.SimulateIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.post.SimulateTemplateAction; import org.elasticsearch.action.admin.indices.template.post.SimulateTemplateAction;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.support.CancellableActionTestPlugin; import org.elasticsearch.action.support.CancellableActionTestPlugin;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.RefCountingListener;
@ -103,6 +104,10 @@ public class RestActionCancellationIT extends HttpSmokeTestCase {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_cluster/settings"), ClusterGetSettingsAction.NAME); runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_cluster/settings"), ClusterGetSettingsAction.NAME);
} }
public void testGetPipelineCancellation() {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_ingest/pipeline"), GetPipelineAction.NAME);
}
private void runRestActionCancellationTest(Request request, String actionName) { private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

View file

@ -10,14 +10,18 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest> { public class GetPipelineRequest extends LocalClusterStateRequest {
private final String[] ids; private final String[] ids;
private final boolean summary; private final boolean summary;
@ -35,19 +39,17 @@ public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest
this(masterNodeTimeout, false, ids); this(masterNodeTimeout, false, ids);
} }
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public GetPipelineRequest(StreamInput in) throws IOException { public GetPipelineRequest(StreamInput in) throws IOException {
super(in); super(in);
ids = in.readStringArray(); ids = in.readStringArray();
summary = in.readBoolean(); summary = in.readBoolean();
} }
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(ids);
out.writeBoolean(summary);
}
public String[] getIds() { public String[] getIds() {
return ids; return ids;
} }
@ -60,4 +62,9 @@ public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
return null; return null;
} }
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
} }

View file

@ -11,15 +11,14 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -30,16 +29,6 @@ public class GetPipelineResponse extends ActionResponse implements ToXContentObj
private final List<PipelineConfiguration> pipelines; private final List<PipelineConfiguration> pipelines;
private final boolean summary; private final boolean summary;
public GetPipelineResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
pipelines = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
pipelines.add(PipelineConfiguration.readFrom(in));
}
summary = in.readBoolean();
}
public GetPipelineResponse(List<PipelineConfiguration> pipelines, boolean summary) { public GetPipelineResponse(List<PipelineConfiguration> pipelines, boolean summary) {
this.pipelines = pipelines; this.pipelines = pipelines;
this.summary = summary; this.summary = summary;
@ -58,6 +47,11 @@ public class GetPipelineResponse extends ActionResponse implements ToXContentObj
return Collections.unmodifiableList(pipelines); return Collections.unmodifiableList(pipelines);
} }
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(pipelines); out.writeCollection(pipelines);

View file

@ -11,45 +11,56 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
public class GetPipelineTransportAction extends TransportMasterNodeReadAction<GetPipelineRequest, GetPipelineResponse> { public class GetPipelineTransportAction extends TransportLocalClusterStateAction<GetPipelineRequest, GetPipelineResponse> {
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject @Inject
public GetPipelineTransportAction( public GetPipelineTransportAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super( super(
GetPipelineAction.NAME, GetPipelineAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters, actionFilters,
GetPipelineRequest::new, transportService.getTaskManager(),
indexNameExpressionResolver, clusterService,
GetPipelineResponse::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE EsExecutors.DIRECT_EXECUTOR_SERVICE
); );
transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetPipelineRequest::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
} }
@Override @Override
protected void masterOperation(Task task, GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener) protected void localClusterStateOperation(
throws Exception { Task task,
GetPipelineRequest request,
ClusterState state,
ActionListener<GetPipelineResponse> listener
) throws Exception {
((CancellableTask) task).ensureNotCancelled();
listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds()), request.isSummary())); listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds()), request.isSummary()));
} }

View file

@ -18,6 +18,7 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException; import java.io.IOException;
@ -46,7 +47,7 @@ public class RestGetPipelineAction extends BaseRestHandler {
restRequest.paramAsBoolean("summary", false), restRequest.paramAsBoolean("summary", false),
Strings.splitStringByCommaToArray(restRequest.param("id")) Strings.splitStringByCommaToArray(restRequest.param("id"))
); );
return channel -> client.execute( return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
GetPipelineAction.INSTANCE, GetPipelineAction.INSTANCE,
request, request,
new RestToXContentListener<>(channel, GetPipelineResponse::status) new RestToXContentListener<>(channel, GetPipelineResponse::status)

View file

@ -10,18 +10,15 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.test.AbstractXContentSerializingTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.XContentType;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -29,7 +26,7 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class GetPipelineResponseTests extends AbstractXContentSerializingTestCase<GetPipelineResponse> { public class GetPipelineResponseTests extends ESTestCase {
private XContentBuilder getRandomXContentBuilder() throws IOException { private XContentBuilder getRandomXContentBuilder() throws IOException {
XContentType xContentType = randomFrom(XContentType.values()); XContentType xContentType = randomFrom(XContentType.values());
@ -83,7 +80,6 @@ public class GetPipelineResponseTests extends AbstractXContentSerializingTestCas
} }
} }
@Override
protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException { protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
List<PipelineConfiguration> pipelines = new ArrayList<>(); List<PipelineConfiguration> pipelines = new ArrayList<>();
@ -104,24 +100,4 @@ public class GetPipelineResponseTests extends AbstractXContentSerializingTestCas
return new GetPipelineResponse(pipelines); return new GetPipelineResponse(pipelines);
} }
@Override
protected GetPipelineResponse createTestInstance() {
try {
return new GetPipelineResponse(new ArrayList<>(createPipelineConfigMap().values()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
protected Writeable.Reader<GetPipelineResponse> instanceReader() {
return GetPipelineResponse::new;
}
@Override
protected GetPipelineResponse mutateInstance(GetPipelineResponse response) throws IOException {
return new GetPipelineResponse(
CollectionUtils.appendToCopy(response.pipelines(), createRandomPipeline("pipeline_" + response.pipelines().size() + 1))
);
}
} }

View file

@ -1,41 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.logstash.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class GetPipelineRequestTests extends AbstractWireSerializingTestCase<GetPipelineRequest> {
@Override
protected Writeable.Reader<GetPipelineRequest> instanceReader() {
return GetPipelineRequest::new;
}
@Override
protected GetPipelineRequest createTestInstance() {
return new GetPipelineRequest(randomList(0, 50, () -> randomAlphaOfLengthBetween(2, 10)));
}
@Override
protected GetPipelineRequest mutateInstance(GetPipelineRequest instance) {
List<String> ids = new ArrayList<>(instance.ids());
if (randomBoolean() || ids.size() == 0) {
// append another ID
ids.add(randomAlphaOfLengthBetween(2, 10));
} else {
// change the strings in the request
ids = ids.stream().map(id -> id + randomAlphaOfLength(1)).collect(Collectors.toList());
}
return new GetPipelineRequest(ids);
}
}