diff --git a/libs/core/src/main/java/org/elasticsearch/core/CheckedConsumer.java b/libs/core/src/main/java/org/elasticsearch/core/CheckedConsumer.java index 6698b47f62f3..56325dc21bb4 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/CheckedConsumer.java +++ b/libs/core/src/main/java/org/elasticsearch/core/CheckedConsumer.java @@ -8,12 +8,20 @@ package org.elasticsearch.core; -import java.util.function.Consumer; +import java.util.Objects; /** - * A {@link Consumer}-like interface which allows throwing checked exceptions. + * A {@link java.util.function.Consumer}-like interface which allows throwing checked exceptions. */ @FunctionalInterface public interface CheckedConsumer { void accept(T t) throws E; + + default CheckedConsumer andThen(CheckedConsumer after) throws E { + Objects.requireNonNull(after); + return (T t) -> { + accept(t); + after.accept(t); + }; + } } diff --git a/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java b/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java index 275a41849d35..ac7db532882f 100644 --- a/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java +++ b/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java @@ -17,31 +17,18 @@ import org.apache.http.nio.entity.NByteArrayEntity; import org.apache.lucene.search.join.ScoreMode; import org.apache.lucene.tests.util.TimeUnits; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.bulk.BulkProcessor2; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.client.Request; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.asyncsearch.AsyncSearchResponse; +import org.elasticsearch.client.ResponseListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; -import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; @@ -55,9 +42,7 @@ import org.elasticsearch.join.query.HasParentQueryBuilder; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; -import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; @@ -77,22 +62,22 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; import org.elasticsearch.search.suggest.phrase.DirectCandidateGeneratorBuilder; -import org.elasticsearch.search.suggest.phrase.PhraseSuggestion; import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder; -import org.elasticsearch.search.suggest.term.TermSuggestion; import org.elasticsearch.search.suggest.term.TermSuggestionBuilder; import org.elasticsearch.test.NotEqualMessageBuilder; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; -import org.junit.AfterClass; +import org.hamcrest.Matchers; import org.junit.Before; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URLEncoder; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -100,7 +85,6 @@ import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -110,16 +94,14 @@ import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import static java.util.stream.Collectors.toList; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.Matchers.empty; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.hamcrest.Matchers.not; /** * This test class executes twice, first against the remote cluster, and then against another cluster that has the remote cluster @@ -137,13 +119,13 @@ public class CCSDuelIT extends ESRestTestCase { private static final String REMOTE_INDEX_NAME = "my_remote_cluster:" + INDEX_NAME; private static final String[] TAGS = new String[] { "java", "xml", "sql", "html", "php", "ruby", "python", "perl" }; - private static RestHighLevelClient restHighLevelClient; + private static boolean init = false; @Before public void init() throws Exception { super.initClient(); - if (restHighLevelClient == null) { - restHighLevelClient = new HighLevelClient(client()); + if (init == false) { + init = true; String destinationCluster = System.getProperty("tests.rest.suite"); // we index docs with private randomness otherwise the two clusters end up with exactly the same documents // given that this test class is run twice with same seed. @@ -155,18 +137,6 @@ public class CCSDuelIT extends ESRestTestCase { } } - private static class HighLevelClient extends RestHighLevelClient { - private HighLevelClient(RestClient restClient) { - super(restClient, (client) -> {}, Collections.emptyList()); - } - } - - @AfterClass - public static void cleanupClient() throws IOException { - IOUtils.close(restHighLevelClient); - restHighLevelClient = null; - } - @Override protected boolean preserveIndicesUponCompletion() { return true; @@ -177,14 +147,13 @@ public class CCSDuelIT extends ESRestTestCase { return true; } - private static void indexDocuments(String idPrefix) throws IOException, InterruptedException { + private void indexDocuments(String idPrefix) throws IOException, InterruptedException { // this index with a single document is used to test partial failures - IndexRequest indexRequest = new IndexRequest(INDEX_NAME + "_err"); - indexRequest.id("id"); - indexRequest.source("id", "id", "creationDate", "err"); - indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); - assertEquals(201, indexResponse.status().getStatus()); + Request request = new Request("POST", "/" + INDEX_NAME + "_err/_doc"); + request.addParameter("refresh", "wait_for"); + request.setJsonEntity("{ \"id\" : \"id\", \"creationDate\" : \"err\" }"); + Response response = client().performRequest(request); + assertEquals(201, response.getStatusLine().getStatusCode()); ElasticsearchAssertions.assertAcked(createIndex(INDEX_NAME + "_empty")); @@ -209,82 +178,98 @@ public class CCSDuelIT extends ESRestTestCase { }"""; ElasticsearchAssertions.assertAcked(createIndex(INDEX_NAME, settings, mapping)); - BulkProcessor2 bulkProcessor = BulkProcessor2.builder( - (r, l) -> restHighLevelClient.bulkAsync(r, RequestOptions.DEFAULT, l), - new BulkProcessor2.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) {} - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - assertFalse(response.hasFailures()); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Exception failure) { - throw new AssertionError("Failed to execute bulk", failure); - } - }, - new DeterministicTaskQueue(random()).getThreadPool() - ).build(); + CountDownLatch latch = new CountDownLatch(2); int numQuestions = randomIntBetween(50, 100); - for (int i = 0; i < numQuestions; i++) { - bulkProcessor.add(buildIndexRequest(idPrefix + i, "question", null)); + { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < numQuestions; i++) { + buildIndexRequest(builder, idPrefix + i, "question", null); + } + executeBulkAsync(builder.toString(), latch); } - int numAnswers = randomIntBetween(100, 150); - for (int i = 0; i < numAnswers; i++) { - bulkProcessor.add(buildIndexRequest(idPrefix + (i + 1000), "answer", idPrefix + randomIntBetween(0, numQuestions - 1))); + { + StringBuilder builder = new StringBuilder(); + int numAnswers = randomIntBetween(100, 150); + for (int i = 0; i < numAnswers; i++) { + buildIndexRequest(builder, idPrefix + (i + 1000), "answer", idPrefix + randomIntBetween(0, numQuestions - 1)); + } + executeBulkAsync(builder.toString(), latch); } - assertTrue(bulkProcessor.awaitClose(30, TimeUnit.SECONDS)); + + assertTrue(latch.await(30, TimeUnit.SECONDS)); RefreshResponse refreshResponse = refresh(INDEX_NAME); ElasticsearchAssertions.assertNoFailures(refreshResponse); } - private static IndexRequest buildIndexRequest(String id, String type, String questionId) { - IndexRequest indexRequest = new IndexRequest(INDEX_NAME); - indexRequest.id(id); + private void executeBulkAsync(String body, CountDownLatch latch) { + Request bulk = new Request("POST", "/_bulk"); + bulk.setJsonEntity(body); + client().performRequestAsync(bulk, new ResponseListener() { + @Override + public void onSuccess(Response response) { + try { + ObjectPath objectPath = ObjectPath.createFromResponse(response); + assertThat(objectPath.evaluate("errors"), Matchers.equalTo(false)); + } catch (IOException ioException) { + throw new UncheckedIOException(ioException); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exception) { + try { + fail(exception.getMessage()); + } finally { + latch.countDown(); + } + } + }); + } + + private static void buildIndexRequest(StringBuilder buffer, String id, String type, String questionId) { + // { "index" : { "_index" : "test", "_id" : "1" } }/n + buffer.append("{ \"index\" : { \"_index\" : \"").append(INDEX_NAME).append("\", \"_id\" : \"").append(id).append("\""); if (questionId != null) { - indexRequest.routing(questionId); + buffer.append(", \"routing\" : \"").append(questionId).append("\""); } - indexRequest.create(true); + buffer.append(" } }\n"); int numTags = randomIntBetween(1, 3); Set tags = new HashSet<>(); if (questionId == null) { for (int i = 0; i < numTags; i++) { - tags.add(randomFrom(TAGS)); + tags.add("\"" + randomFrom(TAGS) + "\""); } } String[] tagsArray = tags.toArray(new String[0]); String date = LocalDate.of(2019, 1, randomIntBetween(1, 31)).format(DateTimeFormatter.ofPattern("yyyy/MM/dd", Locale.ROOT)); - Map joinField = new HashMap<>(); - joinField.put("name", type); + + buffer.append("{ "); + buffer.append("\"id\" : \"").append(id).append("\","); + buffer.append("\"type\" : \"").append(type).append("\","); + buffer.append("\"votes\" : ").append(randomIntBetween(0, 30)).append(","); if (questionId != null) { - joinField.put("parent", questionId); + buffer.append("\"questionId\" : \"").append(questionId).append("\","); + } else { + buffer.append("\"questionId\" : ").append(questionId).append(","); } - indexRequest.source( - XContentType.JSON, - "id", - id, - "type", - type, - "votes", - randomIntBetween(0, 30), - "questionId", - questionId, - "tags", - tagsArray, - "user", - "user" + randomIntBetween(1, 10), - "suggest", - Collections.singletonMap("input", tagsArray), - "creationDate", - date, - "join", - joinField - ); - return indexRequest; + buffer.append("\"tags\" : [").append(String.join(",", Arrays.asList(tagsArray))).append("],"); + buffer.append("\"user\" : \"").append("user").append(randomIntBetween(1, 10)).append("\","); + buffer.append("\"suggest\" : ") + .append("{") + .append("\"input\" : [") + .append(String.join(",", Arrays.asList(tagsArray))) + .append("]},"); + buffer.append("\"creationDate\" : \"").append(date).append("\","); + buffer.append("\"join\" : {"); + buffer.append("\"name\" : \"").append(type).append("\""); + if (questionId != null) { + buffer.append(", \"parent\" : \"").append(questionId).append("\""); + } + buffer.append("}}\n"); } public void testMatchAll() throws Exception { @@ -376,9 +361,9 @@ public class CCSDuelIT extends ESRestTestCase { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.highlighter(new HighlightBuilder().field("tags")); sourceBuilder.query(QueryBuilders.matchQuery("tags", "xml")); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - assertFalse(response.getHits().getHits()[0].getHighlightFields().isEmpty()); + assertFalse(response.evaluateMapKeys("hits.hits.0.highlight").isEmpty()); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); @@ -398,9 +383,9 @@ public class CCSDuelIT extends ESRestTestCase { sourceBuilder.fetchSource(new String[] { "tags" }, Strings.EMPTY_ARRAY); sourceBuilder.query(QueryBuilders.matchQuery("tags", "ruby")); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - assertEquals(1, response.getHits().getHits()[0].getSourceAsMap().size()); + assertThat(response.evaluateMapKeys("hits.hits.0._source").size(), equalTo(1)); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); @@ -419,10 +404,10 @@ public class CCSDuelIT extends ESRestTestCase { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.docValueField("user.keyword"); sourceBuilder.query(QueryBuilders.matchQuery("tags", "xml")); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - assertEquals(1, response.getHits().getHits()[0].getFields().size()); - assertNotNull(response.getHits().getHits()[0].getFields().get("user.keyword")); + assertThat(response.evaluateMapKeys("hits.hits.0.fields").size(), equalTo(1)); + assertTrue(response.evaluateMapKeys("hits.hits.0.fields").contains("user.keyword")); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); @@ -440,10 +425,10 @@ public class CCSDuelIT extends ESRestTestCase { assumeMultiClusterSetup(); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.scriptField("parent", new Script(ScriptType.INLINE, "painless", "doc['join#question']", Collections.emptyMap())); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - assertEquals(1, response.getHits().getHits()[0].getFields().size()); - assertNotNull(response.getHits().getHits()[0].getFields().get("parent")); + assertThat(response.evaluateMapKeys("hits.hits.0.fields").size(), equalTo(1)); + assertTrue(response.evaluateMapKeys("hits.hits.0.fields").contains("parent")); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); @@ -462,9 +447,9 @@ public class CCSDuelIT extends ESRestTestCase { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.explain(true); sourceBuilder.query(QueryBuilders.matchQuery("tags", "sql")); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - assertNotNull(response.getHits().getHits()[0].getExplanation()); + assertNotNull(response.evaluate("hits.hits.0._explanation")); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); @@ -486,7 +471,6 @@ public class CCSDuelIT extends ESRestTestCase { rescorerBuilder.setScoreMode(QueryRescoreMode.Multiply); rescorerBuilder.setRescoreQueryWeight(5); sourceBuilder.addRescorer(rescorerBuilder); - { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); searchRequest.source(sourceBuilder); @@ -541,13 +525,18 @@ public class CCSDuelIT extends ESRestTestCase { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.profile(true); sourceBuilder.query(QueryBuilders.matchQuery("tags", "html")); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - assertFalse(response.getProfileResults().isEmpty()); - assertThat( - response.getProfileResults().values().stream().filter(sr -> sr.getFetchPhase() != null).collect(toList()), - not(empty()) - ); + assertFalse(response.evaluateMapKeys("profile").isEmpty()); + int size = response.evaluateArraySize("profile.shards"); + boolean fail = true; + for (int i = 0; i < size; i++) { + if (response.evaluate("profile.shards." + i + ".fetch") != null) { + fail = false; + break; + } + } + assertFalse("profile might be incomplete", fail); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); @@ -570,10 +559,11 @@ public class CCSDuelIT extends ESRestTestCase { sourceBuilder.sort("type.keyword", SortOrder.ASC); sourceBuilder.sort("creationDate", SortOrder.DESC); sourceBuilder.sort("user.keyword", SortOrder.ASC); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response, 30); - if (response.getHits().getTotalHits().value > 30) { - assertEquals(3, response.getHits().getHits()[0].getSortValues().length); + int total = response.evaluate("hits.total.value"); + if (total > 30) { + assertThat(response.evaluateArraySize("hits.hits.0.sort"), equalTo(3)); } }; { @@ -597,16 +587,16 @@ public class CCSDuelIT extends ESRestTestCase { sourceBuilder.sort("type.keyword", SortOrder.ASC); sourceBuilder.sort("creationDate", SortOrder.DESC); sourceBuilder.sort("user.keyword", SortOrder.ASC); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - SearchHit[] hits = response.getHits().getHits(); - for (SearchHit hit : hits) { - assertEquals(3, hit.getSortValues().length); - assertEquals(INDEX_NAME, hit.getIndex()); + int size = response.evaluateArraySize("hits.hits"); + for (int i = 0; i < size; i++) { + String hit = "hits.hits." + i; + assertThat(response.evaluateArraySize(hit + ".sort"), equalTo(3)); if (onlyRemote) { - assertEquals("my_remote_cluster", hit.getClusterAlias()); + assertThat(response.evaluate(hit + "._index"), equalTo(REMOTE_INDEX_NAME)); } else { - assertNull(hit.getClusterAlias()); + assertThat(response.evaluate(hit + "._index"), equalTo(INDEX_NAME)); } } }; @@ -621,14 +611,15 @@ public class CCSDuelIT extends ESRestTestCase { boolean onlyRemote = randomBoolean(); sourceBuilder.query(new TermQueryBuilder("_index", onlyRemote ? REMOTE_INDEX_NAME : INDEX_NAME)); sourceBuilder.collapse(new CollapseBuilder("user.keyword")); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - for (SearchHit hit : response.getHits().getHits()) { - assertEquals(INDEX_NAME, hit.getIndex()); + int size = response.evaluateArraySize("hits.hits"); + for (int i = 0; i < size; i++) { + String hit = "hits.hits." + i; if (onlyRemote) { - assertEquals("my_remote_cluster", hit.getClusterAlias()); + assertThat(response.evaluate(hit + "._index"), equalTo(REMOTE_INDEX_NAME)); } else { - assertNull(hit.getClusterAlias()); + assertThat(response.evaluate(hit + "._index"), equalTo(INDEX_NAME)); } } }; @@ -661,9 +652,9 @@ public class CCSDuelIT extends ESRestTestCase { sourceBuilder.sort("creationDate", SortOrder.DESC); sourceBuilder.sort(new ScoreSortBuilder()); sourceBuilder.collapse(new CollapseBuilder("user.keyword")); - Consumer responseChecker = response -> { + CheckedConsumer responseChecker = response -> { assertHits(response); - assertEquals(2, response.getHits().getHits()[0].getSortValues().length); + assertThat(response.evaluateArraySize("hits.hits.0.sort"), equalTo(2)); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); @@ -804,7 +795,7 @@ public class CCSDuelIT extends ESRestTestCase { searchRequest.source(sourceBuilder); duelRequest(searchRequest, response -> { assertAggs(response); - assertNotNull(response.getAggregations().get("most_voted")); + assertTrue(response.evaluateMapKeys("aggregations").contains("bucket_metric_value#most_voted")); }); duelRequest(searchRequest, CCSDuelIT::assertAggs); } @@ -813,7 +804,7 @@ public class CCSDuelIT extends ESRestTestCase { searchRequest.source(sourceBuilder); duelRequest(searchRequest, response -> { assertAggs(response); - assertNotNull(response.getAggregations().get("most_voted")); + assertTrue(response.evaluateMapKeys("aggregations").contains("bucket_metric_value#most_voted")); }); duelRequest(searchRequest, CCSDuelIT::assertAggs); } @@ -847,12 +838,12 @@ public class CCSDuelIT extends ESRestTestCase { public void testTermsLookup() throws Exception { assumeMultiClusterSetup(); - IndexRequest indexRequest = new IndexRequest("lookup_index"); - indexRequest.id("id"); - indexRequest.source("tags", new String[] { "java", "sql", "html", "jax-ws" }); - indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); - assertEquals(201, indexResponse.status().getStatus()); + Request request = new Request("POST", "/lookup_index/_doc/id"); + request.addParameter("refresh", "wait_for"); + request.setJsonEntity("{ \"tags\" : [ \"java\", \"sql\", \"html\", \"jax-ws\" ] }"); + Response response = client().performRequest(request); + assertEquals(201, response.getStatusLine().getStatusCode()); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); TermsQueryBuilder termsQueryBuilder = new TermsQueryBuilder("tags", new TermsLookup("lookup_index", "id", "tags")); sourceBuilder.query(termsQueryBuilder); @@ -879,11 +870,11 @@ public class CCSDuelIT extends ESRestTestCase { boolean compareAsyncAndSyncResponses = false; duelRequest(searchRequest, response -> { assertMultiClusterSearchResponse(response); - assertThat(response.getHits().getTotalHits().value, greaterThan(0L)); - assertNull(response.getAggregations()); - assertNull(response.getSuggest()); - assertThat(response.getHits().getHits().length, greaterThan(0)); - assertThat(response.getFailedShards(), greaterThanOrEqualTo(2)); + assertThat(response.evaluate("hits.total.value"), greaterThan(0)); + assertNull(response.evaluate("aggregations")); + assertNull(response.evaluate("suggest")); + assertThat(response.evaluateArraySize("hits.hits"), greaterThan(0)); + assertThat(response.evaluate("_shards.failed"), greaterThan(2)); }, compareAsyncAndSyncResponses); } @@ -894,24 +885,21 @@ public class CCSDuelIT extends ESRestTestCase { suggestBuilder.setGlobalText("jva hml"); suggestBuilder.addSuggestion("tags", new TermSuggestionBuilder("tags").suggestMode(TermSuggestionBuilder.SuggestMode.POPULAR)); sourceBuilder.suggest(suggestBuilder); - Consumer responseChecker = response -> { - assertEquals(1, response.getSuggest().size()); - TermSuggestion tags = response.getSuggest().getSuggestion("tags"); - assertThat(tags.getEntries().size(), greaterThan(0)); + CheckedConsumer responseChecker = response -> { + assertThat(response.evaluateMapKeys("suggest").size(), equalTo(1)); + assertThat(response.evaluateArraySize("suggest.term#tags"), greaterThan(0)); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); searchRequest.source(sourceBuilder); - responseChecker.andThen(CCSDuelIT::assertMultiClusterSearchResponse); // suggest-only queries are not supported by _async_search, so only test against sync search API - duelSearchSync(searchRequest, responseChecker); + duelSearchSync(searchRequest, responseChecker.andThen(CCSDuelIT::assertMultiClusterSearchResponse)); } { SearchRequest searchRequest = initRemoteOnlySearchRequest(); searchRequest.source(sourceBuilder); - responseChecker.andThen(CCSDuelIT::assertSingleRemoteClusterSearchResponse); // suggest-only queries are not supported by _async_search, so only test against sync search API - duelSearchSync(searchRequest, responseChecker); + duelSearchSync(searchRequest, responseChecker.andThen(CCSDuelIT::assertSingleRemoteClusterSearchResponse)); } } @@ -926,24 +914,21 @@ public class CCSDuelIT extends ESRestTestCase { .highlight("", "") ); sourceBuilder.suggest(suggestBuilder); - Consumer responseChecker = response -> { - assertEquals(1, response.getSuggest().size()); - PhraseSuggestion tags = response.getSuggest().getSuggestion("tags"); - assertThat(tags.getEntries().size(), greaterThan(0)); + CheckedConsumer responseChecker = response -> { + assertEquals(1, response.evaluateMapKeys("suggest").size()); + assertThat(response.evaluateArraySize("suggest.phrase#tags"), greaterThan(0)); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); searchRequest.source(sourceBuilder); // suggest-only queries are not supported by _async_search, so only test against sync search API - responseChecker.andThen(CCSDuelIT::assertMultiClusterSearchResponse); - duelSearchSync(searchRequest, responseChecker); + duelSearchSync(searchRequest, responseChecker.andThen(CCSDuelIT::assertMultiClusterSearchResponse)); } { SearchRequest searchRequest = initRemoteOnlySearchRequest(); searchRequest.source(sourceBuilder); - responseChecker.andThen(CCSDuelIT::assertSingleRemoteClusterSearchResponse); // suggest-only queries are not supported by _async_search, so only test against sync search API - duelSearchSync(searchRequest, responseChecker); + duelSearchSync(searchRequest, responseChecker.andThen(CCSDuelIT::assertSingleRemoteClusterSearchResponse)); } } @@ -955,25 +940,23 @@ public class CCSDuelIT extends ESRestTestCase { suggestBuilder.addSuggestion("java", new CompletionSuggestionBuilder("suggest").size(20).text("jav")); suggestBuilder.addSuggestion("ruby", new CompletionSuggestionBuilder("suggest").size(30).text("rub")); sourceBuilder.suggest(suggestBuilder); - Consumer responseChecker = response -> { - assertEquals(Strings.toString(response, true, true), 3, response.getSuggest().size()); - assertThat(response.getSuggest().getSuggestion("python").getEntries().size(), greaterThan(0)); - assertThat(response.getSuggest().getSuggestion("java").getEntries().size(), greaterThan(0)); - assertThat(response.getSuggest().getSuggestion("ruby").getEntries().size(), greaterThan(0)); + CheckedConsumer responseChecker = response -> { + assertThat(response.evaluateMapKeys("suggest").size(), equalTo(3)); + assertThat(response.evaluateArraySize("suggest.completion#python"), greaterThan(0)); + assertThat(response.evaluateArraySize("suggest.completion#java"), greaterThan(0)); + assertThat(response.evaluateArraySize("suggest.completion#ruby"), greaterThan(0)); }; { SearchRequest searchRequest = initLocalAndRemoteSearchRequest(); searchRequest.source(sourceBuilder); - responseChecker.andThen(CCSDuelIT::assertMultiClusterSearchResponse); // suggest-only queries are not supported by _async_search, so only test against sync search API - duelSearchSync(searchRequest, responseChecker); + duelSearchSync(searchRequest, responseChecker.andThen(CCSDuelIT::assertMultiClusterSearchResponse)); } { SearchRequest searchRequest = initRemoteOnlySearchRequest(); searchRequest.source(sourceBuilder); - responseChecker.andThen(CCSDuelIT::assertSingleRemoteClusterSearchResponse); // suggest-only queries are not supported by _async_search, so only test against sync search API - duelSearchSync(searchRequest, responseChecker); + duelSearchSync(searchRequest, responseChecker.andThen(CCSDuelIT::assertSingleRemoteClusterSearchResponse)); } } @@ -992,7 +975,7 @@ public class CCSDuelIT extends ESRestTestCase { } private static SearchRequest initRemoteOnlySearchRequest() { - List indices = Arrays.asList("my_remote_cluster:" + INDEX_NAME); + List indices = List.of("my_remote_cluster:" + INDEX_NAME); final SearchRequest request = new SearchRequest(indices.toArray(new String[0])); if (randomBoolean()) { request.setPreFilterShardSize(between(1, 20)); @@ -1000,12 +983,15 @@ public class CCSDuelIT extends ESRestTestCase { return request; } - private void duelRequest(SearchRequest searchRequest, Consumer responseChecker) throws Exception { + private void duelRequest(SearchRequest searchRequest, CheckedConsumer responseChecker) throws Exception { duelRequest(searchRequest, responseChecker, true); } - private void duelRequest(SearchRequest searchRequest, Consumer responseChecker, boolean compareAsyncToSyncResponses) - throws Exception { + private void duelRequest( + SearchRequest searchRequest, + CheckedConsumer responseChecker, + boolean compareAsyncToSyncResponses + ) throws Exception { Map syncResponseMap = duelSearchSync(searchRequest, responseChecker); Map asyncResponseMap = duelSearchAsync(searchRequest, responseChecker); if (compareAsyncToSyncResponses) { @@ -1016,26 +1002,17 @@ public class CCSDuelIT extends ESRestTestCase { /** * @return responseMap from one of the Synchronous Search Requests */ - private static Map duelSearchSync(SearchRequest searchRequest, Consumer responseChecker) + private static Map duelSearchSync(SearchRequest searchRequest, CheckedConsumer responseChecker) throws Exception { CountDownLatch latch = new CountDownLatch(2); AtomicReference exception1 = new AtomicReference<>(); - AtomicReference minimizeRoundtripsResponse = new AtomicReference<>(); + AtomicReference minimizeRoundtripsResponse = new AtomicReference<>(); searchRequest.setCcsMinimizeRoundtrips(true); - restHighLevelClient.searchAsync( - searchRequest, - RequestOptions.DEFAULT, - new LatchedActionListener<>(ActionListener.wrap(minimizeRoundtripsResponse::set, exception1::set), latch) - ); - + submitSyncSearch(searchRequest, minimizeRoundtripsResponse, exception1, latch); AtomicReference exception2 = new AtomicReference<>(); - AtomicReference fanOutResponse = new AtomicReference<>(); + AtomicReference fanOutResponse = new AtomicReference<>(); searchRequest.setCcsMinimizeRoundtrips(false); - restHighLevelClient.searchAsync( - searchRequest, - RequestOptions.DEFAULT, - new LatchedActionListener<>(ActionListener.wrap(fanOutResponse::set, exception2::set), latch) - ); + submitSyncSearch(searchRequest, fanOutResponse, exception2, latch); latch.await(); @@ -1049,114 +1026,7 @@ public class CCSDuelIT extends ESRestTestCase { if (exception2.get() != null) { throw new AssertionError("one of the two requests returned an exception", exception2.get()); } - SearchResponse minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.get(); - SearchResponse fanOutSearchResponse = null; - try { - responseChecker.accept(minimizeRoundtripsSearchResponse); - - // if only the remote cluster was searched, then only one reduce phase is expected - int expectedReducePhasesMinRoundTrip = 1; - if (searchRequest.indices().length > 1) { - expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1; - } - - assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases()); - fanOutSearchResponse = fanOutResponse.get(); - responseChecker.accept(fanOutSearchResponse); - assertEquals(1, fanOutSearchResponse.getNumReducePhases()); - - // compare Clusters objects - SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters(); - SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters(); - - assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal()); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL) - ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED) - ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING) - ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL) - ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED) - ); - - Map minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse); - if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) { - Map fanOutResponseMap = responseToMap(fanOutSearchResponse); - compareResponseMaps( - minimizeRoundtripsResponseMap, - fanOutResponseMap, - "Comparing sync_search minimizeRoundTrip vs. fanOut" - ); - assertThat( - minimizeRoundtripsSearchResponse.getSkippedShards(), - lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()) - ); - } - return minimizeRoundtripsResponseMap; - } finally { - if (fanOutSearchResponse != null) fanOutSearchResponse.decRef(); - if (minimizeRoundtripsSearchResponse != null) minimizeRoundtripsSearchResponse.decRef(); - } - } - } - - /** - * @return responseMap from one of the async searches - */ - private static Map duelSearchAsync(SearchRequest searchRequest, Consumer responseChecker) - throws Exception { - searchRequest.setCcsMinimizeRoundtrips(true); - AsyncSearchResponse minimizeRoundtripsResponse = submitAsyncSearch( - searchRequest, - TimeValue.timeValueSeconds(1), - restHighLevelClient.getParserConfig() - ); - - try { - final String responseId = minimizeRoundtripsResponse.getId(); - assertBusy(() -> { - AsyncSearchResponse resp = getAsyncSearch(responseId, restHighLevelClient.getParserConfig()); - assertThat(resp.isRunning(), equalTo(false)); - }); - minimizeRoundtripsResponse = getAsyncSearch(responseId, restHighLevelClient.getParserConfig()); - } finally { - deleteAsyncSearch(minimizeRoundtripsResponse.getId()); - } - - searchRequest.setCcsMinimizeRoundtrips(false); - AsyncSearchResponse fanOutResponse = submitAsyncSearch( - searchRequest, - TimeValue.timeValueSeconds(1), - restHighLevelClient.getParserConfig() - ); - try { - final String responseId = fanOutResponse.getId(); - assertBusy(() -> { - AsyncSearchResponse resp = getAsyncSearch(responseId, restHighLevelClient.getParserConfig()); - assertThat(resp.isRunning(), equalTo(false)); - }); - fanOutResponse = getAsyncSearch(responseId, restHighLevelClient.getParserConfig()); - } finally { - deleteAsyncSearch(fanOutResponse.getId()); - } - SearchResponse minimizeRoundtripsSearchResponse = null; - SearchResponse fanOutSearchResponse = null; - try { - fanOutSearchResponse = fanOutResponse.getSearchResponse(); - minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.getSearchResponse(); - + ObjectPath minimizeRoundtripsSearchResponse = ObjectPath.createFromResponse(minimizeRoundtripsResponse.get()); responseChecker.accept(minimizeRoundtripsSearchResponse); // if only the remote cluster was searched, then only one reduce phase is expected @@ -1164,54 +1034,164 @@ public class CCSDuelIT extends ESRestTestCase { if (searchRequest.indices().length > 1) { expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1; } - assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases()); - + if (expectedReducePhasesMinRoundTrip == 1) { + assertThat( + minimizeRoundtripsSearchResponse.evaluate("num_reduce_phases"), + anyOf(equalTo(expectedReducePhasesMinRoundTrip), nullValue()) + ); + } else { + assertThat(minimizeRoundtripsSearchResponse.evaluate("num_reduce_phases"), equalTo(expectedReducePhasesMinRoundTrip)); + } + ObjectPath fanOutSearchResponse = ObjectPath.createFromResponse(fanOutResponse.get()); responseChecker.accept(fanOutSearchResponse); - assertEquals(1, fanOutSearchResponse.getNumReducePhases()); + assertThat(fanOutSearchResponse.evaluate("num_reduce_phases"), anyOf(equalTo(1), nullValue())); // default value is 1? // compare Clusters objects - SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters(); - SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters(); - - assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal()); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL) + assertThat( + minimizeRoundtripsSearchResponse.evaluate("_cluster.total"), + equalTo(fanOutSearchResponse.evaluate("_cluster.total")) ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED) + assertThat( + minimizeRoundtripsSearchResponse.evaluate("_cluster.successful"), + equalTo(fanOutSearchResponse.evaluate("_cluster.successful")) ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING) + assertThat( + minimizeRoundtripsSearchResponse.evaluate("_cluster.skipped"), + equalTo(fanOutSearchResponse.evaluate("_cluster.skipped")) ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL) + assertThat( + minimizeRoundtripsSearchResponse.evaluate("_cluster.running"), + equalTo(fanOutSearchResponse.evaluate("_cluster.running")) ); - assertEquals( - clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), - clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED) + assertThat( + minimizeRoundtripsSearchResponse.evaluate("_cluster.partial"), + equalTo(fanOutSearchResponse.evaluate("_cluster.partial")) + ); + assertThat( + minimizeRoundtripsSearchResponse.evaluate("_cluster.failed"), + equalTo(fanOutSearchResponse.evaluate("_cluster.failed")) ); Map minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse); - if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) { + if (minimizeRoundtripsSearchResponse.evaluate("_clusters") != null && fanOutSearchResponse.evaluate("_clusters") != null) { Map fanOutResponseMap = responseToMap(fanOutSearchResponse); - compareResponseMaps( - minimizeRoundtripsResponseMap, - fanOutResponseMap, - "Comparing async_search minimizeRoundTrip vs. fanOut" + compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing sync_search minimizeRoundTrip vs. fanOut"); + assertThat( + minimizeRoundtripsSearchResponse.evaluate("_shards.skipped"), + lessThanOrEqualTo((Integer) fanOutSearchResponse.evaluate("_shards.skipped")) ); - assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards())); } return minimizeRoundtripsResponseMap; - } finally { - if (minimizeRoundtripsSearchResponse != null) minimizeRoundtripsSearchResponse.decRef(); - if (fanOutSearchResponse != null) fanOutSearchResponse.decRef(); } } + private static void submitSyncSearch( + SearchRequest searchRequest, + AtomicReference responseRef, + AtomicReference exceptionRef, + CountDownLatch latch + ) throws IOException { + String indices = Strings.collectionToDelimitedString(List.of(searchRequest.indices()), ","); + final Request request = new Request("POST", URLEncoder.encode(indices, StandardCharsets.UTF_8) + "/_search"); + request.addParameter("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); + request.addParameter(RestSearchAction.TYPED_KEYS_PARAM, "true"); + request.setEntity(createEntity(searchRequest.source(), XContentType.JSON, ToXContent.EMPTY_PARAMS)); + client().performRequestAsync(request, new ResponseListener() { + @Override + public void onSuccess(Response response) { + try { + responseRef.set(response); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exception) { + try { + exceptionRef.set(exception); + } finally { + latch.countDown(); + } + } + }); + } + + /** + * @return responseMap from one of the async searches + */ + private static Map duelSearchAsync( + SearchRequest searchRequest, + CheckedConsumer responseChecker + ) throws Exception { + searchRequest.setCcsMinimizeRoundtrips(true); + ObjectPath minimizeRoundtripsResponse = submitAsyncSearch(searchRequest, TimeValue.timeValueSeconds(1)); + + try { + final String responseId = minimizeRoundtripsResponse.evaluate("id");// minimizeRoundtripsResponse.getId(); + assertBusy(() -> { + ObjectPath resp = getAsyncSearch(responseId); + assertThat(resp.evaluate("is_running"), equalTo(false)); + }); + minimizeRoundtripsResponse = getAsyncSearch(responseId); + } finally { + deleteAsyncSearch(minimizeRoundtripsResponse.evaluate("id")); + } + + searchRequest.setCcsMinimizeRoundtrips(false); + ObjectPath fanOutResponse = submitAsyncSearch(searchRequest, TimeValue.timeValueSeconds(1)); + try { + final String responseId = fanOutResponse.evaluate("id"); + assertBusy(() -> { + ObjectPath resp = getAsyncSearch(responseId); + assertThat(resp.evaluate("is_running"), equalTo(false)); + }); + fanOutResponse = getAsyncSearch(responseId); + } finally { + deleteAsyncSearch(fanOutResponse.evaluate("id")); + } + + // extract the response + minimizeRoundtripsResponse = new ObjectPath(minimizeRoundtripsResponse.evaluate("response")); + fanOutResponse = new ObjectPath(fanOutResponse.evaluate("response")); + + responseChecker.accept(minimizeRoundtripsResponse); + + // if only the remote cluster was searched, then only one reduce phase is expected + int expectedReducePhasesMinRoundTrip = 1; + if (searchRequest.indices().length > 1) { + expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1; + } + if (expectedReducePhasesMinRoundTrip == 1) { + assertThat( + minimizeRoundtripsResponse.evaluate("num_reduce_phases"), + anyOf(equalTo(expectedReducePhasesMinRoundTrip), nullValue()) + ); + } else { + assertThat(minimizeRoundtripsResponse.evaluate("num_reduce_phases"), equalTo(expectedReducePhasesMinRoundTrip)); + } + + responseChecker.accept(fanOutResponse); + assertThat(fanOutResponse.evaluate("num_reduce_phases"), anyOf(equalTo(1), nullValue())); // default value is 1? + + assertThat(minimizeRoundtripsResponse.evaluate("_cluster.total"), equalTo(fanOutResponse.evaluate("_cluster.total"))); + assertThat(minimizeRoundtripsResponse.evaluate("_cluster.successful"), equalTo(fanOutResponse.evaluate("_cluster.successful"))); + assertThat(minimizeRoundtripsResponse.evaluate("_cluster.skipped"), equalTo(fanOutResponse.evaluate("_cluster.skipped"))); + assertThat(minimizeRoundtripsResponse.evaluate("_cluster.running"), equalTo(fanOutResponse.evaluate("_cluster.running"))); + assertThat(minimizeRoundtripsResponse.evaluate("_cluster.partial"), equalTo(fanOutResponse.evaluate("_cluster.partial"))); + assertThat(minimizeRoundtripsResponse.evaluate("_cluster.failed"), equalTo(fanOutResponse.evaluate("_cluster.failed"))); + Map minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsResponse); + if (minimizeRoundtripsResponse.evaluate("_clusters") != null && fanOutResponse.evaluate("_clusters") != null) { + Map fanOutResponseMap = responseToMap(fanOutResponse); + compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing async_search minimizeRoundTrip vs. fanOut"); + assertThat( + minimizeRoundtripsResponse.evaluate("_shards.skipped"), + lessThanOrEqualTo((Integer) fanOutResponse.evaluate("_shards.skipped")) + ); + } + return minimizeRoundtripsResponseMap; + } + private static void compareResponseMaps(Map responseMap1, Map responseMap2, String info) { String diff = XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder(responseMap1, responseMap2); if (diff != null) { @@ -1222,11 +1202,7 @@ public class CCSDuelIT extends ESRestTestCase { } } - private static AsyncSearchResponse submitAsyncSearch( - SearchRequest searchRequest, - TimeValue waitForCompletion, - XContentParserConfiguration parserConfig - ) throws IOException { + private static ObjectPath submitAsyncSearch(SearchRequest searchRequest, TimeValue waitForCompletion) throws IOException { String indices = Strings.collectionToDelimitedString(List.of(searchRequest.indices()), ","); final Request request = new Request("POST", URLEncoder.encode(indices, StandardCharsets.UTF_8) + "/_async_search"); @@ -1236,21 +1212,21 @@ public class CCSDuelIT extends ESRestTestCase { request.addParameter("keep_on_completion", "true"); request.addParameter(RestSearchAction.TYPED_KEYS_PARAM, "true"); request.setEntity(createEntity(searchRequest.source(), XContentType.JSON, ToXContent.EMPTY_PARAMS)); - Response resp = restHighLevelClient.getLowLevelClient().performRequest(request); - return parseEntity(resp.getEntity(), AsyncSearchResponse::fromXContent, parserConfig); + Response resp = client().performRequest(request); + return ObjectPath.createFromResponse(resp); } - private static AsyncSearchResponse getAsyncSearch(String id, XContentParserConfiguration parserConfig) throws IOException { + private static ObjectPath getAsyncSearch(String id) throws IOException { final Request request = new Request("GET", "/_async_search/" + id); request.addParameter("wait_for_completion_timeout", "0ms"); request.addParameter(RestSearchAction.TYPED_KEYS_PARAM, "true"); - Response resp = restHighLevelClient.getLowLevelClient().performRequest(request); - return parseEntity(resp.getEntity(), AsyncSearchResponse::fromXContent, parserConfig); + Response resp = client().performRequest(request); + return ObjectPath.createFromResponse(resp); } private static Response deleteAsyncSearch(String id) throws IOException { final Request request = new Request("DELETE", "/_async_search/" + id); - return restHighLevelClient.getLowLevelClient().performRequest(request); + return client().performRequest(request); } private static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType, ToXContent.Params toXContentParams) @@ -1283,63 +1259,66 @@ public class CCSDuelIT extends ESRestTestCase { return ContentType.create(xContentType.mediaTypeWithoutParameters(), (Charset) null); } - private static void assertMultiClusterSearchResponse(SearchResponse searchResponse) { - assertEquals(2, searchResponse.getClusters().getTotal()); + private static void assertMultiClusterSearchResponse(ObjectPath searchResponse) throws IOException { + assertThat(searchResponse.evaluate("_clusters.total"), equalTo(2)); // for bwc checks we expect SUCCESSFUL + PARTIAL to be equal to 2 - int bwcSuccessful = searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL); - bwcSuccessful += searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL); + int bwcSuccessful = searchResponse.evaluate("_clusters.successful"); + bwcSuccessful += (Integer) searchResponse.evaluate("_clusters.partial"); assertEquals(2, bwcSuccessful); - assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)); - assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)); - assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED)); - assertThat(searchResponse.getTotalShards(), greaterThan(1)); - assertThat(searchResponse.getSuccessfulShards(), greaterThan(1)); + assertThat(searchResponse.evaluate("_clusters.skipped"), equalTo(0)); + assertThat(searchResponse.evaluate("_clusters.running"), equalTo(0)); + assertThat(searchResponse.evaluate("_clusters.failed"), equalTo(0)); + assertThat(searchResponse.evaluate("_shards.total"), greaterThan(1)); + assertThat(searchResponse.evaluate("_shards.successful"), greaterThan(1)); } - private static void assertSingleRemoteClusterSearchResponse(SearchResponse searchResponse) { - assertEquals(1, searchResponse.getClusters().getTotal()); - assertEquals(1, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)); - assertThat(searchResponse.getTotalShards(), greaterThanOrEqualTo(1)); - assertThat(searchResponse.getSuccessfulShards(), greaterThanOrEqualTo(1)); + private static void assertSingleRemoteClusterSearchResponse(ObjectPath searchResponse) throws IOException { + assertThat(searchResponse.evaluate("_clusters.total"), equalTo(1)); + assertThat(searchResponse.evaluate("_clusters.successful"), equalTo(1)); + assertThat(searchResponse.evaluate("_shards.total"), greaterThanOrEqualTo(1)); + assertThat(searchResponse.evaluate("_shards.successful"), greaterThanOrEqualTo(1)); } - private static void assertHits(SearchResponse response) { + private static void assertHits(ObjectPath response) throws IOException { assertHits(response, 0); } - private static void assertHits(SearchResponse response, int from) { - if (response.getClusters().getTotal() == 1) { + private static void assertHits(ObjectPath response, int from) throws IOException { + int totalClusters = response.evaluate("_clusters.total"); + if (totalClusters == 1) { assertSingleRemoteClusterSearchResponse(response); } else { assertMultiClusterSearchResponse(response); } - assertThat(response.getHits().getTotalHits().value, greaterThan(0L)); - assertEquals(0, response.getFailedShards()); - assertNull(response.getAggregations()); - assertNull(response.getSuggest()); - if (response.getHits().getTotalHits().value > from) { - assertThat(response.getHits().getHits().length, greaterThan(0)); + int totalHits = response.evaluate("hits.total.value"); + assertThat(totalHits, greaterThan(0)); + assertThat(response.evaluate("_shards.failed"), Matchers.equalTo(0)); + assertNull(response.evaluate("hits.aggregations")); + assertNull(response.evaluate("hits.suggest")); + if (totalHits > from) { + assertThat(response.evaluateArraySize("hits.hits"), greaterThan(0)); } else { - assertThat(response.getHits().getHits().length, equalTo(0)); + assertThat(response.evaluateArraySize("hits.hits"), equalTo(0)); } } - private static void assertAggs(SearchResponse response) { - if (response.getClusters().getTotal() == 1) { + private static void assertAggs(ObjectPath response) throws IOException { + int totalHits = response.evaluate("_clusters.total"); + if (totalHits == 1) { assertSingleRemoteClusterSearchResponse(response); } else { assertMultiClusterSearchResponse(response); } - assertThat(response.getHits().getTotalHits().value, greaterThan(0L)); - assertEquals(0, response.getHits().getHits().length); - assertNull(response.getSuggest()); - assertNotNull(response.getAggregations()); - List aggregations = response.getAggregations().asList(); - for (Aggregation aggregation : aggregations) { - if (aggregation instanceof MultiBucketsAggregation multiBucketsAggregation) { + assertThat(response.evaluate("hits.total.value"), greaterThan(0)); + assertThat(response.evaluateArraySize("hits.hits"), equalTo(0)); + assertNull(response.evaluate("suggest")); + assertNotNull(response.evaluate("aggregations")); + Set aggregations = response.evaluateMapKeys("aggregations"); + for (String aggregation : aggregations) { + if (aggregation.startsWith("date_histogram") || aggregation.startsWith("sterms")) { assertThat( - "agg " + multiBucketsAggregation.getName() + " has 0 buckets", - multiBucketsAggregation.getBuckets().size(), + aggregation + " has 0 buckets", + response.evaluateArraySize("aggregations." + aggregation + ".buckets"), greaterThan(0) ); } @@ -1347,8 +1326,8 @@ public class CCSDuelIT extends ESRestTestCase { } @SuppressWarnings("unchecked") - private static Map responseToMap(SearchResponse response) throws IOException { - BytesReference bytesReference = XContentHelper.toXContent(response, XContentType.JSON, false); + private static Map responseToMap(ObjectPath response) throws IOException { + BytesReference bytesReference = BytesReference.bytes(response.toXContentBuilder(XContentType.JSON.xContent())); Map responseMap = XContentHelper.convertToMap(bytesReference, false, XContentType.JSON).v2(); assertNotNull(responseMap.put("took", -1)); responseMap.remove("num_reduce_phases");