diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java
index 846f29bfb6ef..34441d4160f4 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java
@@ -23,6 +23,8 @@ import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.ingest.GetPipelineRequest;
+import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
@@ -87,4 +89,26 @@ public final class ClusterClient {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
}
+
+ /**
+ * Get an existing pipeline
+ *
+ * See
+ * Get Pipeline API on elastic.co
+ */
+ public GetPipelineResponse getPipeline(GetPipelineRequest request, Header... headers) throws IOException {
+ return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::getPipeline,
+ GetPipelineResponse::fromXContent, emptySet(), headers);
+ }
+
+ /**
+ * Asynchronously get an existing pipeline
+ *
+ * See
+ * Get Pipeline API on elastic.co
+ */
+ public void getPipelineAsync(GetPipelineRequest request, ActionListener listener, Header... headers) {
+ restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
+ GetPipelineResponse::fromXContent, listener, emptySet(), headers);
+ }
}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
index 435381774b0c..e137d3d2f5c8 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
@@ -61,6 +61,7 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
@@ -620,6 +621,18 @@ final class RequestConverters {
return request;
}
+ static Request getPipeline(GetPipelineRequest getPipelineRequest) {
+ String endpoint = new EndpointBuilder()
+ .addPathPartAsIs("_ingest/pipeline")
+ .addCommaSeparatedPathParts(getPipelineRequest.getIds())
+ .build();
+ Request request = new Request(HttpGet.METHOD_NAME, endpoint);
+
+ Params parameters = new Params(request);
+ parameters.withMasterTimeout(getPipelineRequest.masterNodeTimeout());
+ return request;
+ }
+
static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java
index 44332b058bc1..caab4c282f4d 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java
@@ -22,6 +22,8 @@ package org.elasticsearch.client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.ingest.GetPipelineRequest;
+import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -32,7 +34,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings;
-import org.elasticsearch.ingest.Pipeline;
+import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
@@ -113,31 +115,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
- XContentType xContentType = randomFrom(XContentType.values());
- XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
- pipelineBuilder.startObject();
- {
- pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
- pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
- {
- pipelineBuilder.startObject().startObject("set");
- {
- pipelineBuilder
- .field("field", "foo")
- .field("value", "bar");
- }
- pipelineBuilder.endObject().endObject();
- pipelineBuilder.startObject().startObject("convert");
- {
- pipelineBuilder
- .field("field", "rank")
- .field("type", "integer");
- }
- pipelineBuilder.endObject().endObject();
- }
- pipelineBuilder.endArray();
- }
- pipelineBuilder.endObject();
+ XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
@@ -147,4 +125,27 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}
+
+ public void testGetPipeline() throws IOException {
+ String id = "some_pipeline_id";
+ XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
+ {
+ PutPipelineRequest request = new PutPipelineRequest(
+ id,
+ BytesReference.bytes(pipelineBuilder),
+ pipelineBuilder.contentType()
+ );
+ createPipeline(request);
+ }
+
+ GetPipelineRequest request = new GetPipelineRequest(id);
+
+ GetPipelineResponse response =
+ execute(request, highLevelClient().cluster()::getPipeline, highLevelClient().cluster()::getPipelineAsync);
+ assertTrue(response.isFound());
+ assertEquals(response.pipelines().get(0).getId(), id);
+ PipelineConfiguration expectedConfig =
+ new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
+ assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap());
+ }
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
index aabe2c4d1e27..f7a934405c2a 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
@@ -21,7 +21,12 @@ package org.elasticsearch.client;
import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
@@ -80,4 +85,42 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
super(restClient, (client) -> {}, Collections.emptyList());
}
}
+
+ protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
+ XContentType xContentType = randomFrom(XContentType.values());
+ XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
+ pipelineBuilder.startObject();
+ {
+ pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
+ pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
+ {
+ pipelineBuilder.startObject().startObject("set");
+ {
+ pipelineBuilder
+ .field("field", "foo")
+ .field("value", "bar");
+ }
+ pipelineBuilder.endObject().endObject();
+ pipelineBuilder.startObject().startObject("convert");
+ {
+ pipelineBuilder
+ .field("field", "rank")
+ .field("type", "integer");
+ }
+ pipelineBuilder.endObject().endObject();
+ }
+ pipelineBuilder.endArray();
+ }
+ pipelineBuilder.endObject();
+ return pipelineBuilder;
+ }
+
+ protected static void createPipeline(String pipelineId) throws IOException {
+ XContentBuilder builder = buildRandomXContentPipeline();
+ createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
+ }
+
+ protected static void createPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
+ assertOK(client().performRequest(RequestConverters.putPipeline(putPipelineRequest)));
+ }
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
index 5388b5ba82e6..bf69aa766368 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
@@ -63,6 +63,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
@@ -1450,6 +1451,20 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(expectedParams, expectedRequest.getParameters());
}
+ public void testGetPipeline() {
+ String pipelineId = "some_pipeline_id";
+ Map expectedParams = new HashMap<>();
+ GetPipelineRequest request = new GetPipelineRequest("some_pipeline_id");
+ setRandomMasterTimeout(request, expectedParams);
+ Request expectedRequest = RequestConverters.getPipeline(request);
+ StringJoiner endpoint = new StringJoiner("/", "/", "");
+ endpoint.add("_ingest/pipeline");
+ endpoint.add(pipelineId);
+ assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
+ assertEquals(HttpGet.METHOD_NAME, expectedRequest.getMethod());
+ assertEquals(expectedParams, expectedRequest.getParameters());
+ }
+
public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java
index 29bb2d05afcc..07785ecc03dc 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java
@@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.ingest.GetPipelineRequest;
+import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
@@ -34,11 +36,13 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.ingest.PipelineConfiguration;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -257,4 +261,74 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
+
+ public void testGetPipeline() throws IOException {
+ RestHighLevelClient client = highLevelClient();
+
+ {
+ createPipeline("my-pipeline-id");
+ }
+
+ {
+ // tag::get-pipeline-request
+ GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id"); // <1>
+ // end::get-pipeline-request
+
+ // tag::get-pipeline-request-masterTimeout
+ request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
+ request.masterNodeTimeout("1m"); // <2>
+ // end::get-pipeline-request-masterTimeout
+
+ // tag::get-pipeline-execute
+ GetPipelineResponse response = client.cluster().getPipeline(request); // <1>
+ // end::get-pipeline-execute
+
+ // tag::get-pipeline-response
+ boolean successful = response.isFound(); // <1>
+ List pipelines = response.pipelines(); // <2>
+ for(PipelineConfiguration pipeline: pipelines) {
+ Map config = pipeline.getConfigAsMap(); // <3>
+ }
+ // end::get-pipeline-response
+
+ assertTrue(successful);
+ }
+ }
+
+ public void testGetPipelineAsync() throws Exception {
+ RestHighLevelClient client = highLevelClient();
+
+ {
+ createPipeline("my-pipeline-id");
+ }
+
+ {
+ GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id");
+
+ // tag::get-pipeline-execute-listener
+ ActionListener listener =
+ new ActionListener() {
+ @Override
+ public void onResponse(GetPipelineResponse response) {
+ // <1>
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ // <2>
+ }
+ };
+ // end::get-pipeline-execute-listener
+
+ // Replace the empty listener by a blocking listener in test
+ final CountDownLatch latch = new CountDownLatch(1);
+ listener = new LatchedActionListener<>(listener, latch);
+
+ // tag::get-pipeline-execute-async
+ client.cluster().getPipelineAsync(request, listener); // <1>
+ // end::get-pipeline-execute-async
+
+ assertTrue(latch.await(30L, TimeUnit.SECONDS));
+ }
+ }
}
diff --git a/docs/java-rest/high-level/cluster/get_pipeline.asciidoc b/docs/java-rest/high-level/cluster/get_pipeline.asciidoc
new file mode 100644
index 000000000000..d6a9472a715e
--- /dev/null
+++ b/docs/java-rest/high-level/cluster/get_pipeline.asciidoc
@@ -0,0 +1,75 @@
+[[java-rest-high-cluster-get-pipeline]]
+=== Get Pipeline API
+
+[[java-rest-high-cluster-get-pipeline-request]]
+==== Get Pipeline Request
+
+A `GetPipelineRequest` requires one or more `pipelineIds` to fetch.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-request]
+--------------------------------------------------
+<1> The pipeline id to fetch
+
+==== Optional arguments
+The following arguments can optionally be provided:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-request-masterTimeout]
+--------------------------------------------------
+<1> Timeout to connect to the master node as a `TimeValue`
+<2> Timeout to connect to the master node as a `String`
+
+[[java-rest-high-cluster-get-pipeline-sync]]
+==== Synchronous Execution
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute]
+--------------------------------------------------
+<1> Execute the request and get back the response in a GetPipelineResponse object.
+
+[[java-rest-high-cluster-get-pipeline-async]]
+==== Asynchronous Execution
+
+The asynchronous execution of a get pipeline request requires both the `GetPipelineRequest`
+instance and an `ActionListener` instance to be passed to the asynchronous
+method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute-async]
+--------------------------------------------------
+<1> The `GetPipelineRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `GetPipelineResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument
+<2> Called in case of failure. The raised exception is provided as an argument
+
+[[java-rest-high-cluster-get-pipeline-response]]
+==== Get Pipeline Response
+
+The returned `GetPipelineResponse` allows to retrieve information about the executed
+ operation as follows:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-response]
+--------------------------------------------------
+<1> Check if a matching pipeline id was found or not.
+<2> Get the list of pipelines found as a list of `PipelineConfig` objects.
+<3> Get the individual configuration of each pipeline as a `Map`.
diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc
index ff1e03afe3ec..412fa2aec424 100644
--- a/docs/java-rest/high-level/supported-apis.asciidoc
+++ b/docs/java-rest/high-level/supported-apis.asciidoc
@@ -107,9 +107,11 @@ The Java High Level REST Client supports the following Cluster APIs:
* <>
* <>
+* <>
include::cluster/put_settings.asciidoc[]
include::cluster/put_pipeline.asciidoc[]
+include::cluster/get_pipeline.asciidoc[]
== Snapshot APIs
diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java
index 30843bdff9b2..297a7f0efc1d 100644
--- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java
+++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java
@@ -20,16 +20,24 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class GetPipelineResponse extends ActionResponse implements StatusToXContentObject {
@@ -42,8 +50,13 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
this.pipelines = pipelines;
}
+ /**
+ * Get the list of pipelines that were a part of this response.
+ * The pipeline id can be obtained using getId on the PipelineConfiguration object.
+ * @return A list of {@link PipelineConfiguration} objects.
+ */
public List pipelines() {
- return pipelines;
+ return Collections.unmodifiableList(pipelines);
}
@Override
@@ -83,4 +96,66 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
builder.endObject();
return builder;
}
+
+ /**
+ *
+ * @param parser the parser for the XContent that contains the serialized GetPipelineResponse.
+ * @return an instance of GetPipelineResponse read from the parser
+ * @throws IOException If the parsing fails
+ */
+ public static GetPipelineResponse fromXContent(XContentParser parser) throws IOException {
+ ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
+ List pipelines = new ArrayList<>();
+ while(parser.nextToken().equals(Token.FIELD_NAME)) {
+ String pipelineId = parser.currentName();
+ parser.nextToken();
+ XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
+ contentBuilder.generator().copyCurrentStructure(parser);
+ PipelineConfiguration pipeline =
+ new PipelineConfiguration(
+ pipelineId, BytesReference.bytes(contentBuilder), contentBuilder.contentType()
+ );
+ pipelines.add(pipeline);
+ }
+ ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser::getTokenLocation);
+ return new GetPipelineResponse(pipelines);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ } else if (other instanceof GetPipelineResponse){
+ GetPipelineResponse otherResponse = (GetPipelineResponse)other;
+ if (pipelines == null) {
+ return otherResponse.pipelines == null;
+ } else {
+ // We need a map here because order does not matter for equality
+ Map otherPipelineMap = new HashMap<>();
+ for (PipelineConfiguration pipeline: otherResponse.pipelines) {
+ otherPipelineMap.put(pipeline.getId(), pipeline);
+ }
+ for (PipelineConfiguration pipeline: pipelines) {
+ PipelineConfiguration otherPipeline = otherPipelineMap.get(pipeline.getId());
+ if (!pipeline.equals(otherPipeline)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+ for (PipelineConfiguration pipeline: pipelines) {
+ // We only take the sum here to ensure that the order does not matter.
+ result += (pipeline == null ? 0 : pipeline.hashCode());
+ }
+ return result;
+ }
+
}
diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java
index 737bad8ee5b0..3b296bf52d30 100644
--- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java
+++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java
@@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -148,14 +149,14 @@ public final class PipelineConfiguration extends AbstractDiffable {
+
+ private XContentBuilder getRandomXContentBuilder() throws IOException {
+ XContentType xContentType = randomFrom(XContentType.values());
+ return XContentBuilder.builder(xContentType.xContent());
+ }
+
+ private PipelineConfiguration createRandomPipeline(String pipelineId) throws IOException {
+ String field = "field_" + randomInt();
+ String value = "value_" + randomInt();
+ XContentBuilder builder = getRandomXContentBuilder();
+ builder.startObject();
+ // We only use a single SetProcessor here in each pipeline to test.
+ // Since the contents are returned as a configMap anyway this does not matter for fromXContent
+ builder.startObject("set");
+ builder.field("field", field);
+ builder.field("value", value);
+ builder.endObject();
+ builder.endObject();
+ return
+ new PipelineConfiguration(
+ pipelineId, BytesReference.bytes(builder), builder.contentType()
+ );
+ }
+
+ private Map createPipelineConfigMap() throws IOException {
+ int numPipelines = randomInt(5);
+ Map pipelinesMap = new HashMap<>();
+ for (int i=0; i pipelinesMap = createPipelineConfigMap();
+ GetPipelineResponse response = new GetPipelineResponse(new ArrayList<>(pipelinesMap.values()));
+ XContentBuilder builder = response.toXContent(getRandomXContentBuilder(), ToXContent.EMPTY_PARAMS);
+ XContentParser parser =
+ builder
+ .generator()
+ .contentType()
+ .xContent()
+ .createParser(
+ xContentRegistry(),
+ LoggingDeprecationHandler.INSTANCE,
+ BytesReference.bytes(builder).streamInput()
+ );
+ GetPipelineResponse parsedResponse = GetPipelineResponse.fromXContent(parser);
+ List actualPipelines = response.pipelines();
+ List parsedPipelines = parsedResponse.pipelines();
+ assertEquals(actualPipelines.size(), parsedPipelines.size());
+ for (PipelineConfiguration pipeline: parsedPipelines) {
+ assertTrue(pipelinesMap.containsKey(pipeline.getId()));
+ assertEquals(pipelinesMap.get(pipeline.getId()).getConfigAsMap(), pipeline.getConfigAsMap());
+ }
+ }
+
+ @Override
+ protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException {
+ return GetPipelineResponse.fromXContent(parser);
+ }
+
+ @Override
+ protected GetPipelineResponse createBlankInstance() {
+ return new GetPipelineResponse();
+ }
+
+ @Override
+ protected GetPipelineResponse createTestInstance() {
+ try {
+ return new GetPipelineResponse(new ArrayList<>(createPipelineConfigMap().values()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ protected boolean supportsUnknownFields() {
+ return false;
+ }
+
+ @Override
+ protected GetPipelineResponse mutateInstance(GetPipelineResponse response) {
+ try {
+ List clonePipelines = new ArrayList<>(response.pipelines());
+ clonePipelines.add(createRandomPipeline("pipeline_" + clonePipelines.size() + 1));
+ return new GetPipelineResponse(clonePipelines);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}