Force typed keys in the HLRC get async search (#78992) (#79245)

This change ensures that the HLRC can parse the search response
that the get async search returns.
The aggregations must be typed by keys but the param to enable this
mode was missing.

Closes #77608
This commit is contained in:
Jim Ferenczi 2021-10-15 16:06:54 +02:00 committed by GitHub
parent fbf216ab82
commit 34b3a9a144
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 12 deletions

View file

@ -77,6 +77,7 @@ final class AsyncSearchRequestConverters {
.build(); .build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint); Request request = new Request(HttpGet.METHOD_NAME, endpoint);
Params params = new RequestConverters.Params(); Params params = new RequestConverters.Params();
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
if (asyncSearchRequest.getKeepAlive() != null) { if (asyncSearchRequest.getKeepAlive() != null) {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep()); params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
} }

View file

@ -37,6 +37,7 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
SearchSourceBuilder searchSourceBuilder = createTestSearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = createTestSearchSourceBuilder();
SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(searchSourceBuilder, indices); SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(searchSourceBuilder, indices);
expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true");
// the following parameters might be overwritten by random ones later, // the following parameters might be overwritten by random ones later,
// but we need to set these since they are the default we send over http // but we need to set these since they are the default we send over http
setRandomSearchParams(submitRequest, expectedParams); setRandomSearchParams(submitRequest, expectedParams);
@ -72,7 +73,6 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
} }
private static void setRandomSearchParams(SubmitAsyncSearchRequest request, Map<String, String> expectedParams) { private static void setRandomSearchParams(SubmitAsyncSearchRequest request, Map<String, String> expectedParams) {
expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true");
if (randomBoolean()) { if (randomBoolean()) {
request.setRouting(randomAlphaOfLengthBetween(3, 10)); request.setRouting(randomAlphaOfLengthBetween(3, 10));
expectedParams.put("routing", request.getRouting()); expectedParams.put("routing", request.getRouting());
@ -107,6 +107,7 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
String id = randomAlphaOfLengthBetween(5, 10); String id = randomAlphaOfLengthBetween(5, 10);
Map<String, String> expectedParams = new HashMap<>(); Map<String, String> expectedParams = new HashMap<>();
GetAsyncSearchRequest submitRequest = new GetAsyncSearchRequest(id); GetAsyncSearchRequest submitRequest = new GetAsyncSearchRequest(id);
expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true");
if (randomBoolean()) { if (randomBoolean()) {
TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test"); TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setKeepAlive(keepAlive); submitRequest.setKeepAlive(keepAlive);

View file

@ -8,47 +8,77 @@
package org.elasticsearch.client.asyncsearch; package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class AsyncSearchIT extends ESRestHighLevelClientTestCase { public class AsyncSearchIT extends ESRestHighLevelClientTestCase {
public void testAsyncSearch() throws IOException { public void testAsyncSearch() throws IOException {
String index = "test-index"; String index = "test-index";
createIndex(index, Settings.EMPTY); createIndex(index, Settings.EMPTY);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.aggregation(AggregationBuilders.terms("1").field("foo.keyword"));
SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(sourceBuilder, index); SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(sourceBuilder, index);
submitRequest.setKeepOnCompletion(true); submitRequest.setKeepOnCompletion(true);
submitRequest.setWaitForCompletionTimeout(TimeValue.MAX_VALUE);
AsyncSearchResponse submitResponse = highLevelClient().asyncSearch().submit(submitRequest, RequestOptions.DEFAULT); AsyncSearchResponse submitResponse = highLevelClient().asyncSearch().submit(submitRequest, RequestOptions.DEFAULT);
assertNotNull(submitResponse.getId()); assertNotNull(submitResponse.getId());
assertFalse(submitResponse.isRunning());
assertFalse(submitResponse.isPartial()); assertFalse(submitResponse.isPartial());
assertTrue(submitResponse.getStartTime() > 0); assertTrue(submitResponse.getStartTime() > 0);
assertTrue(submitResponse.getExpirationTime() > 0); assertTrue(submitResponse.getExpirationTime() > 0);
assertNotNull(submitResponse.getSearchResponse()); assertNotNull(submitResponse.getSearchResponse());
if (submitResponse.isRunning() == false) { assertThat(submitResponse.getSearchResponse().getHits().getTotalHits().value, equalTo(2L));
assertFalse(submitResponse.isPartial()); ParsedStringTerms terms = submitResponse.getSearchResponse().getAggregations().get("1");
} else { assertThat(terms.getBuckets().size(), equalTo(2));
assertTrue(submitResponse.isPartial()); assertThat(terms.getBuckets().get(0).getKeyAsString(), equalTo("bar"));
} assertThat(terms.getBuckets().get(0).getDocCount(), equalTo(1L));
assertThat(terms.getBuckets().get(1).getKeyAsString(), equalTo("bar2"));
assertThat(terms.getBuckets().get(1).getDocCount(), equalTo(1L));
GetAsyncSearchRequest getRequest = new GetAsyncSearchRequest(submitResponse.getId()); GetAsyncSearchRequest getRequest = new GetAsyncSearchRequest(submitResponse.getId());
AsyncSearchResponse getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT); AsyncSearchResponse getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT);
while (getResponse.isRunning()) {
getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT);
}
assertFalse(getResponse.isRunning()); assertFalse(getResponse.isRunning());
assertFalse(getResponse.isPartial()); assertFalse(getResponse.isPartial());
assertTrue(getResponse.getStartTime() > 0); assertTrue(getResponse.getStartTime() > 0);
assertTrue(getResponse.getExpirationTime() > 0); assertTrue(getResponse.getExpirationTime() > 0);
assertNotNull(getResponse.getSearchResponse()); assertThat(getResponse.getSearchResponse().getHits().getTotalHits().value, equalTo(2L));
terms = getResponse.getSearchResponse().getAggregations().get("1");
assertThat(terms.getBuckets().size(), equalTo(2));
assertThat(terms.getBuckets().get(0).getKeyAsString(), equalTo("bar"));
assertThat(terms.getBuckets().get(0).getDocCount(), equalTo(1L));
assertThat(terms.getBuckets().get(1).getKeyAsString(), equalTo("bar2"));
assertThat(terms.getBuckets().get(1).getDocCount(), equalTo(1L));
DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(submitResponse.getId()); DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(submitResponse.getId());
AcknowledgedResponse deleteAsyncSearchResponse = highLevelClient().asyncSearch().delete(deleteRequest, AcknowledgedResponse deleteAsyncSearchResponse = highLevelClient().asyncSearch().delete(deleteRequest,