diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index adc1e93efde4..3acb8d829730 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -118,7 +118,8 @@ final class RequestConverters { Params parameters = new Params(request); parameters.withTimeout(bulkRequest.timeout()); parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); - + parameters.withPipeline(bulkRequest.pipeline()); + parameters.withRouting(bulkRequest.routing()); // Bulk API only supports newline delimited JSON or Smile. Before executing // the bulk, we need to check that all requests have the same content-type // and this content-type is supported by the Bulk API. diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java index fdd5634ddd6b..378eb4f0069c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java @@ -28,14 +28,18 @@ import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.SearchHit; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -44,10 +48,19 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType; import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -268,23 +281,124 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs); } - private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception { + @SuppressWarnings("unchecked") + public void testGlobalParametersAndSingleRequest() throws Exception { + createIndexWithMultipleShards("test"); + + final CountDownLatch latch = new CountDownLatch(1); + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); + createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ"); + + // tag::bulk-processor-mix-parameters + try (BulkProcessor processor = initBulkProcessorBuilder(listener) + .setGlobalIndex("tweets") + .setGlobalType("_doc") + .setGlobalRouting("routing") + .setGlobalPipeline("pipeline_id") + .build()) { + + + processor.add(new IndexRequest() // <1> + .source(XContentType.JSON, "user", "some user")); + processor.add(new IndexRequest("blogs", "post_type", "1") // <2> + .source(XContentType.JSON, "title", "some title")); + } + // end::bulk-request-mix-pipeline + latch.await(); + + Iterable hits = searchAll(new SearchRequest("tweets").routing("routing")); + assertThat(hits, everyItem(hasProperty(fieldFromSource("user"), equalTo("some user")))); + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); + + + Iterable blogs = searchAll(new SearchRequest("blogs").routing("routing")); + assertThat(blogs, everyItem(hasProperty(fieldFromSource("title"), equalTo("some title")))); + assertThat(blogs, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); + } + + @SuppressWarnings("unchecked") + public void testGlobalParametersAndBulkProcessor() throws Exception { + createIndexWithMultipleShards("test"); + + final CountDownLatch latch = new CountDownLatch(1); + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); + createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ"); + + int numDocs = randomIntBetween(10, 10); + try (BulkProcessor processor = initBulkProcessorBuilder(listener) + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) + .setGlobalIndex("test") + .setGlobalType("test") + .setGlobalRouting("routing") + .setGlobalPipeline("pipeline_id") + .build()) { + + indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id"); + latch.await(); + + assertThat(listener.beforeCounts.get(), equalTo(1)); + assertThat(listener.afterCounts.get(), equalTo(1)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertResponseItems(listener.bulkItems, numDocs); + + Iterable hits = searchAll(new SearchRequest("test").routing("routing")); + + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); + assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test")))); + assertThat(hits, containsInAnyOrder(expectedIds(numDocs))); + } + } + + @SuppressWarnings("unchecked") + private Matcher[] expectedIds(int numDocs) { + return IntStream.rangeClosed(1, numDocs) + .boxed() + .map(n -> hasId(n.toString())) + .>toArray(Matcher[]::new); + } + + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType, + String globalIndex, String globalType, String globalPipeline) throws Exception { MultiGetRequest multiGetRequest = new MultiGetRequest(); for (int i = 1; i <= numDocs; i++) { if (randomBoolean()) { - processor.add(new IndexRequest("test", "test", Integer.toString(i)) - .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); + processor.add(new IndexRequest(localIndex, localType, Integer.toString(i)) + .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); } else { - final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n" - + Strings.toString(JsonXContent.contentBuilder() - .startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n"; - processor.add(new BytesArray(source), null, null, XContentType.JSON); + BytesArray data = bytesBulkRequest(localIndex, localType, i); + processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON); } - multiGetRequest.add("test", "test", Integer.toString(i)); + multiGetRequest.add(localIndex, localType, Integer.toString(i)); } return multiGetRequest; } + private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException { + String action = Strings.toString(jsonBuilder() + .startObject() + .startObject("index") + .field("_index", localIndex) + .field("_type", localType) + .field("_id", Integer.toString(id)) + .endObject() + .endObject() + ); + String source = Strings.toString(jsonBuilder() + .startObject() + .field("field", randomRealisticUnicodeOfLengthBetween(1, 30)) + .endObject() + ); + + String request = action + "\n" + source + "\n"; + return new BytesArray(request); + } + + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception { + return indexDocs(processor, numDocs, "test", "test", null, null, null); + } + private static void assertResponseItems(List bulkItemResponses, int numDocs) { assertThat(bulkItemResponses.size(), is(numDocs)); int i = 1; @@ -343,4 +457,5 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { } } + } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java new file mode 100644 index 000000000000..cf8f1ebfdbd7 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java @@ -0,0 +1,217 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.SearchHit; + +import java.io.IOException; +import java.util.function.Function; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class BulkRequestWithGlobalParametersIT extends ESRestHighLevelClientTestCase { + + @SuppressWarnings("unchecked") + public void testGlobalPipelineOnBulkRequest() throws IOException { + createFieldAddingPipleine("xyz", "fieldNameXYZ", "valueXYZ"); + + BulkRequest request = new BulkRequest(); + request.add(new IndexRequest("test", "doc", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("test", "doc", "2") + .source(XContentType.JSON, "field", "bulk2")); + request.pipeline("xyz"); + + bulk(request); + + Iterable hits = searchAll("test"); + assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2"))); + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); + } + + public void testPipelineOnRequestOverridesGlobalPipeline() throws IOException { + createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ"); + createFieldAddingPipleine("perIndexId", "someNewField", "someValue"); + + BulkRequest request = new BulkRequest(); + request.pipeline("globalId"); + request.add(new IndexRequest("test", "doc", "1") + .source(XContentType.JSON, "field", "bulk1") + .setPipeline("perIndexId")); + request.add(new IndexRequest("test", "doc", "2") + .source(XContentType.JSON, "field", "bulk2") + .setPipeline("perIndexId")); + + bulk(request); + + Iterable hits = searchAll("test"); + assertThat(hits, everyItem(hasProperty(fieldFromSource("someNewField"), equalTo("someValue")))); + // global pipeline was not applied + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldXYZ"), nullValue()))); + } + + @SuppressWarnings("unchecked") + public void testMixPipelineOnRequestAndGlobal() throws IOException { + createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ"); + createFieldAddingPipleine("perIndexId", "someNewField", "someValue"); + + // tag::bulk-request-mix-pipeline + BulkRequest request = new BulkRequest(); + request.pipeline("globalId"); + + request.add(new IndexRequest("test", "doc", "1") + .source(XContentType.JSON, "field", "bulk1") + .setPipeline("perIndexId")); // <1> + + request.add(new IndexRequest("test", "doc", "2") + .source(XContentType.JSON, "field", "bulk2")); // <2> + // end::bulk-request-mix-pipeline + bulk(request); + + Iterable hits = searchAll("test"); + assertThat(hits, containsInAnyOrder( + both(hasId("1")) + .and(hasProperty(fieldFromSource("someNewField"), equalTo("someValue"))), + both(hasId("2")) + .and(hasProperty(fieldFromSource("fieldXYZ"), equalTo("valueXYZ"))))); + } + + public void testGlobalIndex() throws IOException { + BulkRequest request = new BulkRequest("global_index", null); + request.add(new IndexRequest().type("doc").id("1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest().type("doc").id("2") + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("global_index"); + assertThat(hits, everyItem(hasIndex("global_index"))); + } + + @SuppressWarnings("unchecked") + public void testIndexGlobalAndPerRequest() throws IOException { + BulkRequest request = new BulkRequest("global_index", null); + request.add(new IndexRequest("local_index", "doc", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest().type("doc").id("2") // will take global index + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("local_index", "global_index"); + assertThat(hits, containsInAnyOrder( + both(hasId("1")) + .and(hasIndex("local_index")), + both(hasId("2")) + .and(hasIndex("global_index")))); + } + + public void testGlobalType() throws IOException { + BulkRequest request = new BulkRequest(null, "global_type"); + request.add(new IndexRequest("index").id("1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index").id("2") + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("index"); + assertThat(hits, everyItem(hasType("global_type"))); + } + + @SuppressWarnings("unchecked") + public void testTypeGlobalAndPerRequest() throws IOException { + BulkRequest request = new BulkRequest(null, "global_type"); + request.add(new IndexRequest("index1", "local_type", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index2").id("2") // will take global type + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("index1", "index2"); + assertThat(hits, containsInAnyOrder( + both(hasId("1")) + .and(hasType("local_type")), + both(hasId("2")) + .and(hasType("global_type")))); + } + + @SuppressWarnings("unchecked") + public void testGlobalRouting() throws IOException { + createIndexWithMultipleShards("index"); + BulkRequest request = new BulkRequest(null, null); + request.add(new IndexRequest("index", "type", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index", "type", "2") + .source(XContentType.JSON, "field", "bulk1")); + request.routing("1"); + bulk(request); + + Iterable emptyHits = searchAll(new SearchRequest("index").routing("xxx")); + assertThat(emptyHits, is(emptyIterable())); + + Iterable hits = searchAll(new SearchRequest("index").routing("1")); + assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2"))); + } + + @SuppressWarnings("unchecked") + public void testMixLocalAndGlobalRouting() throws IOException { + BulkRequest request = new BulkRequest(null, null); + request.routing("globalRouting"); + request.add(new IndexRequest("index", "type", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index", "type", "2") + .routing("localRouting") + .source(XContentType.JSON, "field", "bulk1")); + + bulk(request); + + Iterable hits = searchAll(new SearchRequest("index").routing("globalRouting", "localRouting")); + assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2"))); + } + + private BulkResponse bulk(BulkRequest request) throws IOException { + BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync); + assertFalse(bulkResponse.hasFailures()); + return bulkResponse; + } + + @SuppressWarnings("unchecked") + private static Function fieldFromSource(String fieldName) { + return (response) -> (T) response.getSourceAsMap().get(fieldName); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java index 07c0d818bfa8..d31b9f04dbbb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java @@ -21,7 +21,10 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -30,15 +33,20 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.AfterClass; import org.junit.Before; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase { @@ -125,6 +133,22 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase { return buildRandomXContentPipeline(pipelineBuilder); } + protected static void createFieldAddingPipleine(String id, String fieldName, String value) throws IOException { + XContentBuilder pipeline = jsonBuilder() + .startObject() + .startArray("processors") + .startObject() + .startObject("set") + .field("field", fieldName) + .field("value", value) + .endObject() + .endObject() + .endArray() + .endObject(); + + createPipeline(new PutPipelineRequest(id, BytesReference.bytes(pipeline), XContentType.JSON)); + } + protected static void createPipeline(String pipelineId) throws IOException { XContentBuilder builder = buildRandomXContentPipeline(); createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType())); @@ -154,4 +178,32 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase { .put(ThreadContext.PREFIX + ".Authorization", token) .build(); } + + protected Iterable searchAll(String... indices) throws IOException { + SearchRequest searchRequest = new SearchRequest(indices); + return searchAll(searchRequest); + } + + protected Iterable searchAll(SearchRequest searchRequest) throws IOException { + refreshIndexes(searchRequest.indices()); + SearchResponse search = highLevelClient().search(searchRequest, RequestOptions.DEFAULT); + return search.getHits(); + } + + protected void refreshIndexes(String... indices) throws IOException { + String joinedIndices = Arrays.stream(indices) + .collect(Collectors.joining(",")); + Response refreshResponse = client().performRequest(new Request("POST", "/" + joinedIndices + "/_refresh")); + assertEquals(200, refreshResponse.getStatusLine().getStatusCode()); + } + + protected void createIndexWithMultipleShards(String index) throws IOException { + CreateIndexRequest indexRequest = new CreateIndexRequest(index); + int shards = randomIntBetween(8,10); + indexRequest.settings(Settings.builder() + .put("index.number_of_shards", shards) + .put("index.number_of_replicas", 0) + ); + highLevelClient().indices().create(indexRequest, RequestOptions.DEFAULT); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index e5922658fd92..4640ab56599b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -100,6 +100,7 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.RandomObjects; +import org.hamcrest.Matchers; import java.io.IOException; import java.io.InputStream; @@ -860,6 +861,21 @@ public class RequestConvertersTests extends ESTestCase { } } + public void testGlobalPipelineOnBulkRequest() throws IOException { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.pipeline("xyz"); + bulkRequest.add(new IndexRequest("test", "doc", "11") + .source(XContentType.JSON, "field", "bulk1")); + bulkRequest.add(new IndexRequest("test", "doc", "12") + .source(XContentType.JSON, "field", "bulk2")); + bulkRequest.add(new IndexRequest("test", "doc", "13") + .source(XContentType.JSON, "field", "bulk3")); + + Request request = RequestConverters.bulk(bulkRequest); + + assertThat(request.getParameters(), Matchers.hasEntry("pipeline","xyz")); + } + public void testSearchNullSource() throws IOException { SearchRequest searchRequest = new SearchRequest(); Request request = RequestConverters.search(searchRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index 200a3a9b0f93..82c82784661c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -749,6 +749,16 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { request.waitForActiveShards(2); // <1> request.waitForActiveShards(ActiveShardCount.ALL); // <2> // end::bulk-request-active-shards + // tag::bulk-request-pipeline + request.pipeline("pipelineId"); // <1> + // end::bulk-request-pipeline + // tag::bulk-request-routing + request.routing("routingId"); // <1> + // end::bulk-request-routing + + // tag::bulk-request-index-type + BulkRequest defaulted = new BulkRequest("posts","_doc"); // <1> + // end::bulk-request-index-type // tag::bulk-execute-listener ActionListener listener = new ActionListener() { diff --git a/docs/java-api/docs/bulk.asciidoc b/docs/java-api/docs/bulk.asciidoc index 03c6ae719e5b..6141fabbf5b9 100644 --- a/docs/java-api/docs/bulk.asciidoc +++ b/docs/java-api/docs/bulk.asciidoc @@ -165,3 +165,26 @@ client.admin().indices().prepareRefresh().get(); client.prepareSearch().get(); -------------------------------------------------- + +[[java-docs-bulk-global-parameters]] +==== Global Parameters + +Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global + parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters + have to be set before any sub request is added - index, type - and you have to specify them during BulkRequest or + BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/BulkProcessorIT.java[bulk-processor-mix-parameters] +-------------------------------------------------- +<1> global parameters from the BulkRequest will be applied on a sub request +<2> local pipeline parameter on a sub request will override global parameters from BulkRequest + + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/BulkRequestWithGlobalParametersIT.java[bulk-request-mix-pipeline] +-------------------------------------------------- +<1> local pipeline parameter on a sub request will override global pipeline from the BulkRequest +<2> global parameter from the BulkRequest will be applied on a sub request diff --git a/docs/java-rest/high-level/document/bulk.asciidoc b/docs/java-rest/high-level/document/bulk.asciidoc index db9a3463135e..d794779435af 100644 --- a/docs/java-rest/high-level/document/bulk.asciidoc +++ b/docs/java-rest/high-level/document/bulk.asciidoc @@ -70,6 +70,25 @@ the index/update/delete operations. `ActiveShardCount.ALL`, `ActiveShardCount.ONE` or `ActiveShardCount.DEFAULT` (default) +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-pipeline] +-------------------------------------------------- +<1> Global pipelineId used on all sub requests, unless overridden on a sub request + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-routing] +-------------------------------------------------- +<1> Global routingId used on all sub requests, unless overridden on a sub request + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-index-type] +-------------------------------------------------- +<1> A bulk request with global index and type used on all sub requests, unless overridden on a sub request. +Both parameters are @Nullable and can only be set during BulkRequest creation. + include::../execution.asciidoc[] [id="{upid}-{api}-response"] diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index b0d553534e44..e2d01aad230b 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -35,12 +35,25 @@ import java.util.Locale; */ public interface DocWriteRequest extends IndicesRequest { + /** + * Set the index for this request + * @return the Request + */ + T index(String index); + /** * Get the index that this request operates on * @return the index */ String index(); + + /** + * Set the type for this request + * @return the Request + */ + T type(String type); + /** * Get the type that this request operates on * @return the type diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index a2cae6b7bec2..c083c8956779 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Supplier; /** * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request @@ -88,6 +89,10 @@ public class BulkProcessor implements Closeable { private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); private TimeValue flushInterval = null; private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff(); + private String globalIndex; + private String globalType; + private String globalRouting; + private String globalPipeline; private Builder(BiConsumer> consumer, Listener listener, Scheduler scheduler, Runnable onClose) { @@ -136,6 +141,26 @@ public class BulkProcessor implements Closeable { return this; } + public Builder setGlobalIndex(String globalIndex) { + this.globalIndex = globalIndex; + return this; + } + + public Builder setGlobalType(String globalType) { + this.globalType = globalType; + return this; + } + + public Builder setGlobalRouting(String globalRouting) { + this.globalRouting = globalRouting; + return this; + } + + public Builder setGlobalPipeline(String globalPipeline) { + this.globalPipeline = globalPipeline; + return this; + } + /** * Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally * in case they have failed due to resource constraints (i.e. a thread pool was full). @@ -156,8 +181,14 @@ public class BulkProcessor implements Closeable { * Builds a new bulk processor. */ public BulkProcessor build() { - return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, - scheduler, onClose); + return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, + bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults()); + } + + private Supplier createBulkRequestWithGlobalDefaults() { + return () -> new BulkRequest(globalIndex, globalType) + .pipeline(globalPipeline) + .routing(globalRouting); } } @@ -184,6 +215,7 @@ public class BulkProcessor implements Closeable { private final AtomicLong executionIdGen = new AtomicLong(); private BulkRequest bulkRequest; + private final Supplier bulkRequestSupplier; private final BulkRequestHandler bulkRequestHandler; private final Runnable onClose; @@ -191,10 +223,11 @@ public class BulkProcessor implements Closeable { BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, - Scheduler scheduler, Runnable onClose) { + Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); - this.bulkRequest = new BulkRequest(); + this.bulkRequest = bulkRequestSupplier.get(); + this.bulkRequestSupplier = bulkRequestSupplier; this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); // Start period flushing task after everything is setup this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); @@ -335,7 +368,7 @@ public class BulkProcessor implements Closeable { final BulkRequest bulkRequest = this.bulkRequest; final long executionId = executionIdGen.incrementAndGet(); - this.bulkRequest = new BulkRequest(); + this.bulkRequest = bulkRequestSupplier.get(); this.bulkRequestHandler.execute(bulkRequest, executionId); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index c10dcc020823..d2929a2dbc56 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -90,12 +90,21 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + private String globalPipeline; + private String globalRouting; + private String globalIndex; + private String globalType; private long sizeInBytes = 0; public BulkRequest() { } + public BulkRequest(@Nullable String globalIndex, @Nullable String globalType) { + this.globalIndex = globalIndex; + this.globalType = globalType; + } + /** * Adds a list of requests to be executed. Either index or delete requests. */ @@ -154,6 +163,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) { Objects.requireNonNull(request, "'request' must not be null"); + applyGlobalMandatoryParameters(request); + requests.add(request); addPayload(payload); // lack of source is validated in validate() method @@ -175,6 +186,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) { Objects.requireNonNull(request, "'request' must not be null"); + applyGlobalMandatoryParameters(request); + requests.add(request); addPayload(payload); if (request.doc() != null) { @@ -199,6 +212,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques public BulkRequest add(DeleteRequest request, @Nullable Object payload) { Objects.requireNonNull(request, "'request' must not be null"); + applyGlobalMandatoryParameters(request); + requests.add(request); addPayload(payload); sizeInBytes += REQUEST_OVERHEAD; @@ -327,13 +342,13 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques String index = defaultIndex; String type = defaultType; String id = null; - String routing = defaultRouting; + String routing = valueOrDefault(defaultRouting, globalRouting); FetchSourceContext fetchSourceContext = defaultFetchSourceContext; String opType = null; long version = Versions.MATCH_ANY; VersionType versionType = VersionType.INTERNAL; int retryOnConflict = 0; - String pipeline = defaultPipeline; + String pipeline = valueOrDefault(defaultPipeline, globalPipeline); // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) // or START_OBJECT which will have another set of parameters @@ -503,6 +518,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return this; } + public final BulkRequest pipeline(String globalPipeline) { + this.globalPipeline = globalPipeline; + return this; + } + + public final BulkRequest routing(String globalRouting){ + this.globalRouting = globalRouting; + return this; + } /** * A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}. */ @@ -514,6 +538,14 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return timeout; } + public String pipeline() { + return globalPipeline; + } + + public String routing() { + return globalRouting; + } + private int findNextMarker(byte marker, int from, BytesReference data, int length) { for (int i = from; i < length; i++) { if (data.get(i) == marker) { @@ -579,4 +611,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]"; } + private void applyGlobalMandatoryParameters(DocWriteRequest request) { + request.index(valueOrDefault(request.index(), globalIndex)); + request.type(valueOrDefault(request.type(), globalType)); + } + + private static String valueOrDefault(String value, String globalDefault) { + if (Strings.isNullOrEmpty(value) && !Strings.isNullOrEmpty(globalDefault)) { + return globalDefault; + } + return value; + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java index a577569476c8..fc91f4f907ee 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java @@ -41,6 +41,10 @@ import org.elasticsearch.common.xcontent.XContentType; public class BulkRequestBuilder extends ActionRequestBuilder implements WriteRequestBuilder { + public BulkRequestBuilder(ElasticsearchClient client, BulkAction action, @Nullable String globalIndex, @Nullable String globalType) { + super(client, action, new BulkRequest(globalIndex, globalType)); + } + public BulkRequestBuilder(ElasticsearchClient client, BulkAction action) { super(client, action, new BulkRequest()); } @@ -153,4 +157,14 @@ public class BulkRequestBuilder extends ActionRequestBuilder /** * Sets the type of the document to delete. */ + @Override public DeleteRequest type(String type) { this.type = type; return this; diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 157ec3511100..8f5fd156018a 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -214,6 +214,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement /** * Sets the type of the indexed document. */ + @Override public IndexRequest type(String type) { this.type = type; return this; diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java index f97f618347af..d2be1fba086d 100644 --- a/server/src/main/java/org/elasticsearch/client/Client.java +++ b/server/src/main/java/org/elasticsearch/client/Client.java @@ -232,6 +232,11 @@ public interface Client extends ElasticsearchClient, Releasable { */ BulkRequestBuilder prepareBulk(); + /** + * Executes a bulk of index / delete operations with default index and/or type + */ + BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType); + /** * Gets the document that was indexed from an index with a type and id. * diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 7f2e6681294a..e5450c320f45 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -471,6 +471,11 @@ public abstract class AbstractClient extends AbstractComponent implements Client return new BulkRequestBuilder(this, BulkAction.INSTANCE); } + @Override + public BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType) { + return new BulkRequestBuilder(this, BulkAction.INSTANCE, globalIndex, globalType); + } + @Override public ActionFuture get(final GetRequest request) { return execute(GetAction.INSTANCE, request); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index 1fd912e72a42..3e61557869da 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -20,22 +20,40 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.ingest.IngestTestPlugin; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ExecutionException; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; public class BulkIntegrationIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Arrays.asList(IngestTestPlugin.class); + } + public void testBulkIndexCreatesMapping() throws Exception { String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/bulk-log.json"); BulkRequestBuilder bulkBuilder = client().prepareBulk(); @@ -81,4 +99,52 @@ public class BulkIntegrationIT extends ESIntegTestCase { assertFalse(bulkResponse.hasFailures()); assertFalse(client().prepareGet("index3", "type", "id").setRouting("1").get().isExists()); } + + public void testBulkWithGlobalDefaults() throws Exception { + // all requests in the json are missing index and type parameters: "_index" : "test", "_type" : "type1", + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json"); + { + BulkRequestBuilder bulkBuilder = client().prepareBulk(); + bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON); + ActionRequestValidationException ex = expectThrows(ActionRequestValidationException.class, bulkBuilder::get); + + assertThat(ex.validationErrors(), containsInAnyOrder( + "index is missing", + "index is missing", + "index is missing", + "type is missing", + "type is missing", + "type is missing")); + } + + { + createSamplePipeline("pipeline"); + BulkRequestBuilder bulkBuilder = client().prepareBulk("test","type1") + .routing("routing") + .pipeline("pipeline"); + + bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON); + BulkResponse bulkItemResponses = bulkBuilder.get(); + assertFalse(bulkItemResponses.hasFailures()); + } + } + + private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException { + XContentBuilder pipeline = jsonBuilder() + .startObject() + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject(); + + AcknowledgedResponse acknowledgedResponse = client().admin() + .cluster() + .putPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(pipeline), XContentType.JSON)) + .get(); + + assertTrue(acknowledgedResponse.isAcknowledged()); + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 3fbfa381ad35..6a7d9bc02ec3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -72,19 +72,9 @@ public class BulkProcessorTests extends ESTestCase { try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { threadPool.getThreadContext().putHeader(headerKey, headerValue); threadPool.getThreadContext().putTransient(transientKey, transientValue); - bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - } - }, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool, () -> {}); + bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), + 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, + threadPool, () -> {}, BulkRequest::new); } assertNull(threadPool.getThreadContext().getHeader(headerKey)); assertNull(threadPool.getThreadContext().getTransient(transientKey)); @@ -100,28 +90,32 @@ public class BulkProcessorTests extends ESTestCase { bulkProcessor.close(); } + public void testAwaitOnCloseCallsOnClose() throws Exception { final AtomicBoolean called = new AtomicBoolean(false); - BulkProcessor bulkProcessor = new BulkProcessor((request, listener) -> { - }, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - - } - }, 0, 10, new ByteSizeValue(1000), null, (delay, executor, command) -> null, () -> called.set(true)); + BiConsumer> consumer = (request, listener) -> {}; + BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), + 0, 10, new ByteSizeValue(1000), null, + (delay, executor, command) -> null, () -> called.set(true), BulkRequest::new); assertFalse(called.get()); bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); assertTrue(called.get()); } + + private BulkProcessor.Listener emptyListener() { + return new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + } + }; + } } diff --git a/server/src/test/resources/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json b/server/src/test/resources/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json new file mode 100644 index 000000000000..2edb45742b74 --- /dev/null +++ b/server/src/test/resources/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json @@ -0,0 +1,5 @@ +{ "index":{"_id":"1"} } +{ "field1" : "value1" } +{ "delete" : { "_id" : "2" } } +{ "create" : { "_id" : "3" } } +{ "field1" : "value3" } diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 65ed746accac..1a7e1c16f7b6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -58,6 +58,7 @@ import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.NotEqualMessageBuilder; import org.hamcrest.CoreMatchers; import org.hamcrest.Matcher; +import org.hamcrest.core.CombinableMatcher; import java.io.IOException; import java.nio.file.Files; @@ -70,6 +71,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.function.Function; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -472,6 +474,14 @@ public class ElasticsearchAssertions { return new ElasticsearchMatchers.SearchHitHasScoreMatcher(score); } + public static CombinableMatcher hasProperty(Function property, Matcher valueMatcher) { + return ElasticsearchMatchers.HasPropertyLambdaMatcher.hasProperty(property, valueMatcher); + } + + public static Function fieldFromSource(String fieldName) { + return (response) -> response.getSourceAsMap().get(fieldName); + } + public static T assertBooleanSubQuery(Query query, Class subqueryType, int i) { assertThat(query, instanceOf(BooleanQuery.class)); BooleanQuery q = (BooleanQuery) query; diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java index f49cc3bd39ee..333205864810 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java @@ -20,7 +20,12 @@ package org.elasticsearch.test.hamcrest; import org.elasticsearch.search.SearchHit; import org.hamcrest.Description; +import org.hamcrest.FeatureMatcher; +import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import org.hamcrest.core.CombinableMatcher; + +import java.util.function.Function; public class ElasticsearchMatchers { @@ -115,4 +120,27 @@ public class ElasticsearchMatchers { description.appendText("searchHit score should be ").appendValue(score); } } + + public static class HasPropertyLambdaMatcher extends FeatureMatcher { + + private final Function property; + + private HasPropertyLambdaMatcher(Matcher subMatcher, Function property) { + super(subMatcher, "object with", "lambda"); + this.property = property; + } + + @Override + protected V featureValueOf(T actual) { + return property.apply(actual); + } + + /** + * @param valueMatcher The matcher to apply to the property + * @param property The lambda to fetch property + */ + public static CombinableMatcher hasProperty(Function property, Matcher valueMatcher) { + return new CombinableMatcher<>(new HasPropertyLambdaMatcher<>(valueMatcher, property)); + } + } }