diff --git a/docs/changelog/120445.yaml b/docs/changelog/120445.yaml new file mode 100644 index 000000000000..fa521a275757 --- /dev/null +++ b/docs/changelog/120445.yaml @@ -0,0 +1,5 @@ +pr: 120445 +summary: Run `GetPipelineTransportAction` on local node +area: Ingest Node +type: enhancement +issues: [] diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestActionCancellationIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestActionCancellationIT.java index c48ae9ba1843..cb7c60a208b4 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestActionCancellationIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestActionCancellationIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTem import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction; import org.elasticsearch.action.admin.indices.template.post.SimulateIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.post.SimulateTemplateAction; +import org.elasticsearch.action.ingest.GetPipelineAction; import org.elasticsearch.action.support.CancellableActionTestPlugin; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.RefCountingListener; @@ -103,6 +104,10 @@ public class RestActionCancellationIT extends HttpSmokeTestCase { runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_cluster/settings"), ClusterGetSettingsAction.NAME); } + public void testGetPipelineCancellation() { + runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_ingest/pipeline"), GetPipelineAction.NAME); + } + private void runRestActionCancellationTest(Request request, String actionName) { final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java index e0906ed954d8..f2f7a3774856 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java @@ -10,14 +10,18 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; -public class GetPipelineRequest extends MasterNodeReadRequest { +public class GetPipelineRequest extends LocalClusterStateRequest { private final String[] ids; private final boolean summary; @@ -35,19 +39,17 @@ public class GetPipelineRequest extends MasterNodeReadRequest headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java index 760b87af49a7..8e919c071edf 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java @@ -11,15 +11,14 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -30,16 +29,6 @@ public class GetPipelineResponse extends ActionResponse implements ToXContentObj private final List pipelines; private final boolean summary; - public GetPipelineResponse(StreamInput in) throws IOException { - super(in); - int size = in.readVInt(); - pipelines = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - pipelines.add(PipelineConfiguration.readFrom(in)); - } - summary = in.readBoolean(); - } - public GetPipelineResponse(List pipelines, boolean summary) { this.pipelines = pipelines; this.summary = summary; @@ -58,6 +47,11 @@ public class GetPipelineResponse extends ActionResponse implements ToXContentObj return Collections.unmodifiableList(pipelines); } + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(pipelines); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java index 2618ac934251..4c0d6f1be482 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java @@ -11,45 +11,56 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalClusterStateAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -public class GetPipelineTransportAction extends TransportMasterNodeReadAction { +public class GetPipelineTransportAction extends TransportLocalClusterStateAction { + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) + @SuppressWarnings("this-escape") @Inject - public GetPipelineTransportAction( - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver - ) { + public GetPipelineTransportAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) { super( GetPipelineAction.NAME, - transportService, - clusterService, - threadPool, actionFilters, - GetPipelineRequest::new, - indexNameExpressionResolver, - GetPipelineResponse::new, + transportService.getTaskManager(), + clusterService, EsExecutors.DIRECT_EXECUTOR_SERVICE ); + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + GetPipelineRequest::new, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override - protected void masterOperation(Task task, GetPipelineRequest request, ClusterState state, ActionListener listener) - throws Exception { + protected void localClusterStateOperation( + Task task, + GetPipelineRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + ((CancellableTask) task).ensureNotCancelled(); listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds()), request.isSummary())); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java index c44f63293059..54a466687389 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; @@ -46,7 +47,7 @@ public class RestGetPipelineAction extends BaseRestHandler { restRequest.paramAsBoolean("summary", false), Strings.splitStringByCommaToArray(restRequest.param("id")) ); - return channel -> client.execute( + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( GetPipelineAction.INSTANCE, request, new RestToXContentListener<>(channel, GetPipelineResponse::status) diff --git a/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java index 61284a49b250..f740c07e0c58 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java @@ -10,18 +10,15 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.ingest.PipelineConfiguration; -import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,7 +26,7 @@ import java.util.Map; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; -public class GetPipelineResponseTests extends AbstractXContentSerializingTestCase { +public class GetPipelineResponseTests extends ESTestCase { private XContentBuilder getRandomXContentBuilder() throws IOException { XContentType xContentType = randomFrom(XContentType.values()); @@ -83,7 +80,6 @@ public class GetPipelineResponseTests extends AbstractXContentSerializingTestCas } } - @Override protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); List pipelines = new ArrayList<>(); @@ -104,24 +100,4 @@ public class GetPipelineResponseTests extends AbstractXContentSerializingTestCas return new GetPipelineResponse(pipelines); } - @Override - protected GetPipelineResponse createTestInstance() { - try { - return new GetPipelineResponse(new ArrayList<>(createPipelineConfigMap().values())); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - protected Writeable.Reader instanceReader() { - return GetPipelineResponse::new; - } - - @Override - protected GetPipelineResponse mutateInstance(GetPipelineResponse response) throws IOException { - return new GetPipelineResponse( - CollectionUtils.appendToCopy(response.pipelines(), createRandomPipeline("pipeline_" + response.pipelines().size() + 1)) - ); - } } diff --git a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequestTests.java b/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequestTests.java deleted file mode 100644 index f680b9422913..000000000000 --- a/x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequestTests.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.logstash.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -public class GetPipelineRequestTests extends AbstractWireSerializingTestCase { - - @Override - protected Writeable.Reader instanceReader() { - return GetPipelineRequest::new; - } - - @Override - protected GetPipelineRequest createTestInstance() { - return new GetPipelineRequest(randomList(0, 50, () -> randomAlphaOfLengthBetween(2, 10))); - } - - @Override - protected GetPipelineRequest mutateInstance(GetPipelineRequest instance) { - List ids = new ArrayList<>(instance.ids()); - if (randomBoolean() || ids.size() == 0) { - // append another ID - ids.add(randomAlphaOfLengthBetween(2, 10)); - } else { - // change the strings in the request - ids = ids.stream().map(id -> id + randomAlphaOfLength(1)).collect(Collectors.toList()); - } - return new GetPipelineRequest(ids); - } -}