Removing explicit SearchResponse usages in tests - v2 (#102021)

This commit is contained in:
Panagiotis Bailis 2023-11-13 16:56:14 +02:00 committed by GitHub
parent 57ae1509c5
commit 25b80acb38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 244 additions and 193 deletions

View file

@ -15,7 +15,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -53,6 +52,7 @@ import java.util.function.Function;
import static org.elasticsearch.common.lucene.uid.Versions.MATCH_DELETED;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -201,16 +201,18 @@ public class BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests extends
// Ensure that the write thread blocking task is currently executing
barrier.await();
final SearchResponse searchResponse = prepareSearch(sourceIndex).setSize(numDocs) // Get all indexed docs
.addSort(SORTING_FIELD, SortOrder.DESC)
.execute()
.actionGet();
// Modify a subset of the target documents concurrently
final List<SearchHit> originalDocs = Arrays.asList(searchResponse.getHits().getHits());
int conflictingOps = randomIntBetween(maxDocs, numDocs);
final List<SearchHit> docsModifiedConcurrently = randomSubsetOf(conflictingOps, originalDocs);
final int finalConflictingOps = conflictingOps;
final List<SearchHit> docsModifiedConcurrently = new ArrayList<>();
assertResponse(
prepareSearch(sourceIndex).setSize(numDocs) // Get all indexed docs
.addSort(SORTING_FIELD, SortOrder.DESC),
response -> {
// Modify a subset of the target documents concurrently
final List<SearchHit> originalDocs = Arrays.asList(response.getHits().getHits());
docsModifiedConcurrently.addAll(randomSubsetOf(finalConflictingOps, originalDocs));
}
);
BulkRequest conflictingUpdatesBulkRequest = new BulkRequest();
for (SearchHit searchHit : docsModifiedConcurrently) {
if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) {

View file

@ -111,26 +111,30 @@ public class ClientScrollableHitSourceTests extends ESTestCase {
}
client.validateRequest(SearchAction.INSTANCE, (SearchRequest r) -> assertTrue(r.allowPartialSearchResults() == Boolean.FALSE));
SearchResponse searchResponse = createSearchResponse();
client.respond(SearchAction.INSTANCE, searchResponse);
try {
client.respond(SearchAction.INSTANCE, searchResponse);
for (int i = 0; i < randomIntBetween(1, 10); ++i) {
ScrollableHitSource.AsyncResponse asyncResponse = responses.poll(10, TimeUnit.SECONDS);
assertNotNull(asyncResponse);
assertEquals(responses.size(), 0);
assertSameHits(asyncResponse.response().getHits(), searchResponse.getHits().getHits());
asyncResponse.done(TimeValue.ZERO);
for (int i = 0; i < randomIntBetween(1, 10); ++i) {
ScrollableHitSource.AsyncResponse asyncResponse = responses.poll(10, TimeUnit.SECONDS);
assertNotNull(asyncResponse);
assertEquals(responses.size(), 0);
assertSameHits(asyncResponse.response().getHits(), searchResponse.getHits().getHits());
asyncResponse.done(TimeValue.ZERO);
for (int retry = 0; retry < randomIntBetween(minFailures, maxFailures); ++retry) {
client.fail(SearchScrollAction.INSTANCE, new EsRejectedExecutionException());
client.awaitOperation();
++expectedSearchRetries;
for (int retry = 0; retry < randomIntBetween(minFailures, maxFailures); ++retry) {
client.fail(SearchScrollAction.INSTANCE, new EsRejectedExecutionException());
client.awaitOperation();
++expectedSearchRetries;
}
searchResponse = createSearchResponse();
client.respond(SearchScrollAction.INSTANCE, searchResponse);
}
searchResponse = createSearchResponse();
client.respond(SearchScrollAction.INSTANCE, searchResponse);
assertEquals(actualSearchRetries.get(), expectedSearchRetries);
} finally {
searchResponse.decRef();
}
assertEquals(actualSearchRetries.get(), expectedSearchRetries);
}
public void testScrollKeepAlive() {

View file

@ -1050,53 +1050,65 @@ public class CCSDuelIT extends ESRestTestCase {
throw new AssertionError("one of the two requests returned an exception", exception2.get());
}
SearchResponse minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.get();
SearchResponse fanOutSearchResponse = null;
try {
responseChecker.accept(minimizeRoundtripsSearchResponse);
responseChecker.accept(minimizeRoundtripsSearchResponse);
// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
}
// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());
fanOutSearchResponse = fanOutResponse.get();
responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(
minimizeRoundtripsResponseMap,
fanOutResponseMap,
"Comparing sync_search minimizeRoundTrip vs. fanOut"
);
assertThat(
minimizeRoundtripsSearchResponse.getSkippedShards(),
lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards())
);
}
return minimizeRoundtripsResponseMap;
} finally {
if (fanOutSearchResponse != null) fanOutSearchResponse.decRef();
if (minimizeRoundtripsSearchResponse != null) minimizeRoundtripsSearchResponse.decRef();
}
assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());
SearchResponse fanOutSearchResponse = fanOutResponse.get();
responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing sync_search minimizeRoundTrip vs. fanOut");
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
}
return minimizeRoundtripsResponseMap;
}
}
@ -1139,54 +1151,65 @@ public class CCSDuelIT extends ESRestTestCase {
} finally {
deleteAsyncSearch(fanOutResponse.getId());
}
SearchResponse minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.getSearchResponse();
SearchResponse fanOutSearchResponse = fanOutResponse.getSearchResponse();
SearchResponse minimizeRoundtripsSearchResponse = null;
SearchResponse fanOutSearchResponse = null;
try {
fanOutSearchResponse = fanOutResponse.getSearchResponse();
minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.getSearchResponse();
responseChecker.accept(minimizeRoundtripsSearchResponse);
responseChecker.accept(minimizeRoundtripsSearchResponse);
// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
}
assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());
responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(
minimizeRoundtripsResponseMap,
fanOutResponseMap,
"Comparing async_search minimizeRoundTrip vs. fanOut"
);
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
}
return minimizeRoundtripsResponseMap;
} finally {
if (minimizeRoundtripsSearchResponse != null) minimizeRoundtripsSearchResponse.decRef();
if (fanOutSearchResponse != null) fanOutSearchResponse.decRef();
}
assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());
responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing async_search minimizeRoundTrip vs. fanOut");
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
}
return minimizeRoundtripsResponseMap;
}
private static void compareResponseMaps(Map<String, Object> responseMap1, Map<String, Object> responseMap2, String info) {

View file

@ -35,6 +35,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
public class HistogramPercentileAggregationTests extends ESSingleNodeTestCase {
public void testHDRHistogram() throws Exception {
@ -103,11 +106,9 @@ public class HistogramPercentileAggregationTests extends ESSingleNodeTestCase {
}
client().admin().indices().refresh(new RefreshRequest("raw", "pre_agg")).get();
SearchResponse response = client().prepareSearch("raw").setTrackTotalHits(true).get();
assertEquals(numDocs, response.getHits().getTotalHits().value);
assertHitCount(client().prepareSearch("raw").setTrackTotalHits(true), numDocs);
response = client().prepareSearch("pre_agg").get();
assertEquals(numDocs / frq, response.getHits().getTotalHits().value);
assertHitCount(client().prepareSearch("pre_agg"), numDocs / frq);
PercentilesAggregationBuilder builder = AggregationBuilders.percentiles("agg")
.field("data")
@ -115,17 +116,21 @@ public class HistogramPercentileAggregationTests extends ESSingleNodeTestCase {
.numberOfSignificantValueDigits(numberOfSignificantValueDigits)
.percentiles(10);
SearchResponse responseRaw = client().prepareSearch("raw").addAggregation(builder).get();
SearchResponse responsePreAgg = client().prepareSearch("pre_agg").addAggregation(builder).get();
SearchResponse responseBoth = client().prepareSearch("pre_agg", "raw").addAggregation(builder).get();
InternalHDRPercentiles percentilesRaw = responseRaw.getAggregations().get("agg");
InternalHDRPercentiles percentilesPreAgg = responsePreAgg.getAggregations().get("agg");
InternalHDRPercentiles percentilesBoth = responseBoth.getAggregations().get("agg");
for (int i = 1; i < 100; i++) {
assertEquals(percentilesRaw.percentile(i), percentilesPreAgg.percentile(i), 0.0);
assertEquals(percentilesRaw.percentile(i), percentilesBoth.percentile(i), 0.0);
}
assertResponse(
client().prepareSearch("raw").addAggregation(builder),
responseRaw -> assertResponse(
client().prepareSearch("pre_agg").addAggregation(builder),
responsePreAgg -> assertResponse(client().prepareSearch("pre_agg", "raw").addAggregation(builder), responseBoth -> {
InternalHDRPercentiles percentilesRaw = responseRaw.getAggregations().get("agg");
InternalHDRPercentiles percentilesPreAgg = responsePreAgg.getAggregations().get("agg");
InternalHDRPercentiles percentilesBoth = responseBoth.getAggregations().get("agg");
for (int i = 1; i < 100; i++) {
assertEquals(percentilesRaw.percentile(i), percentilesPreAgg.percentile(i), 0.0);
assertEquals(percentilesRaw.percentile(i), percentilesBoth.percentile(i), 0.0);
}
})
)
);
}
private void setupTDigestHistogram(int compression) throws Exception {

View file

@ -19,7 +19,6 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
@ -122,6 +121,7 @@ import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVE
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ -706,9 +706,10 @@ public abstract class CcrIntegTestCase extends ESTestCase {
refresh(client, index);
SearchRequest request = new SearchRequest(index);
request.source(new SearchSourceBuilder().size(0));
SearchResponse response = client.search(request).actionGet();
assertNotNull(response.getHits().getTotalHits());
assertThat(response.getHits().getTotalHits().value, greaterThanOrEqualTo(numDocsReplicated));
assertResponse(client.search(request), response -> {
assertNotNull(response.getHits().getTotalHits());
assertThat(response.getHits().getTotalHits().value, greaterThanOrEqualTo(numDocsReplicated));
});
}, 60, TimeUnit.SECONDS);
}

View file

@ -18,7 +18,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
@ -51,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomInterval;
@ -369,20 +369,26 @@ public class DownsampleClusterDisruptionIT extends ESIntegTestCase {
.getIndex(new GetIndexRequest().indices(targetIndex))
.actionGet();
assertEquals(1, getIndexResponse.indices().length);
final SearchResponse sourceIndexSearch = cluster.client()
.prepareSearch(sourceIndex)
.setQuery(new MatchAllQueryBuilder())
.setSize(Math.min(DOC_COUNT, indexedDocs))
.setTrackTotalHitsUpTo(Integer.MAX_VALUE)
.get();
assertEquals(indexedDocs, sourceIndexSearch.getHits().getHits().length);
final SearchResponse targetIndexSearch = cluster.client()
.prepareSearch(targetIndex)
.setQuery(new MatchAllQueryBuilder())
.setSize(Math.min(DOC_COUNT, indexedDocs))
.setTrackTotalHitsUpTo(Integer.MAX_VALUE)
.get();
assertTrue(targetIndexSearch.getHits().getHits().length > 0);
assertResponse(
cluster.client()
.prepareSearch(sourceIndex)
.setQuery(new MatchAllQueryBuilder())
.setSize(Math.min(DOC_COUNT, indexedDocs))
.setTrackTotalHitsUpTo(Integer.MAX_VALUE),
sourceIndexSearch -> {
assertEquals(indexedDocs, sourceIndexSearch.getHits().getHits().length);
}
);
assertResponse(
cluster.client()
.prepareSearch(targetIndex)
.setQuery(new MatchAllQueryBuilder())
.setSize(Math.min(DOC_COUNT, indexedDocs))
.setTrackTotalHitsUpTo(Integer.MAX_VALUE),
targetIndexSearch -> {
assertTrue(targetIndexSearch.getHits().getHits().length > 0);
}
);
}
private int bulkIndex(final String indexName, final DownsampleActionSingleNodeTests.SourceSupplier sourceSupplier, int docCount)

View file

@ -15,7 +15,6 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
@ -52,6 +51,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, supportsDedicatedMasters = false)
public class DownsampleTransportFailureIT extends ESIntegTestCase {
@ -228,12 +228,15 @@ public class DownsampleTransportFailureIT extends ESIntegTestCase {
}
private void assertDocumentsExist(final String nodeName, final String indexName) {
final SearchResponse searchResponse = client(nodeName).prepareSearch(indexName)
.setQuery(new MatchAllQueryBuilder())
.setTrackTotalHitsUpTo(Integer.MAX_VALUE)
.setSize(DOCUMENTS.size())
.get();
assertEquals(DOCUMENTS.size(), searchResponse.getHits().getHits().length);
assertResponse(
client(nodeName).prepareSearch(indexName)
.setQuery(new MatchAllQueryBuilder())
.setTrackTotalHitsUpTo(Integer.MAX_VALUE)
.setSize(DOCUMENTS.size()),
searchResponse -> {
assertEquals(DOCUMENTS.size(), searchResponse.getHits().getHits().length);
}
);
}
private void assertIndexExists(final String nodeName, final String indexName) {

View file

@ -19,7 +19,6 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
@ -61,6 +60,7 @@ import java.util.function.Consumer;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomInterval;
import static org.hamcrest.Matchers.equalTo;
@ -239,13 +239,16 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase {
.getIndex(new GetIndexRequest().indices(targetIndex))
.actionGet();
assertEquals(1, getIndexResponse.indices().length);
final SearchResponse targetIndexSearch = cluster.client()
.prepareSearch(targetIndex)
.setQuery(new MatchAllQueryBuilder())
.setSize(Math.min(DOC_COUNT, indexedDocs))
.setTrackTotalHitsUpTo(Integer.MAX_VALUE)
.get();
assertTrue(targetIndexSearch.getHits().getHits().length > 0);
assertResponse(
cluster.client()
.prepareSearch(targetIndex)
.setQuery(new MatchAllQueryBuilder())
.setSize(Math.min(DOC_COUNT, indexedDocs))
.setTrackTotalHitsUpTo(Integer.MAX_VALUE),
targetIndexSearch -> {
assertTrue(targetIndexSearch.getHits().getHits().length > 0);
}
);
}
private int bulkIndex(final String indexName, final SourceSupplier sourceSupplier, int docCount) throws IOException {

View file

@ -24,7 +24,6 @@ import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
@ -62,6 +61,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
public class DownsampleDataStreamTests extends ESSingleNodeTestCase {
@ -145,35 +145,39 @@ public class DownsampleDataStreamTests extends ESSingleNodeTestCase {
new DateHistogramAggregationBuilder("dateHistogram").field("@timestamp").fixedInterval(DateHistogramInterval.MINUTE)
)
);
final SearchResponse searchResponse = client().search(searchRequest).actionGet();
Arrays.stream(searchResponse.getHits().getHits())
.limit(10)
.forEach(hit -> assertThat(hit.getIndex(), equalTo(rolloverResponse.getNewIndex())));
assertThat(searchResponse.getHits().getHits()[10].getIndex(), equalTo(downsampleTargetIndex));
final InternalDateHistogram dateHistogram = searchResponse.getAggregations().get("dateHistogram");
// NOTE: due to unpredictable values for the @timestamp field we don't know how many buckets we have in the
// date histogram. We know, anyway, that we will have 10 documents in the first two buckets, 10 documents in the last two buckets.
// The actual number of documents on each of the first two and last two buckets depends on the timestamp value generated when
// indexing
// documents, which might cross the minute boundary of the fixed_interval date histogram aggregation.
// Then we check there is a variable number of intermediate buckets with exactly 0 documents. This is a result of the way
// downsampling
// deals with a fixed interval granularity that is larger than the date histogram fixed interval (1 minute (date histogram
// fixed_interval)
// < 1 hour (downsample fixed_interval)).
final int totalBuckets = dateHistogram.getBuckets().size();
assertThat(dateHistogram.getBuckets().get(0).getDocCount() + dateHistogram.getBuckets().get(1).getDocCount(), equalTo(10L));
dateHistogram.getBuckets()
.stream()
.skip(2)
.limit(totalBuckets - 3)
.map(InternalDateHistogram.Bucket::getDocCount)
.toList()
.forEach(docCount -> assertThat(docCount, equalTo(0L)));
assertThat(
dateHistogram.getBuckets().get(totalBuckets - 2).getDocCount() + dateHistogram.getBuckets().get(totalBuckets - 1).getDocCount(),
equalTo(10L)
);
assertResponse(client().search(searchRequest), searchResponse -> {
Arrays.stream(searchResponse.getHits().getHits())
.limit(10)
.forEach(hit -> assertThat(hit.getIndex(), equalTo(rolloverResponse.getNewIndex())));
assertThat(searchResponse.getHits().getHits()[10].getIndex(), equalTo(downsampleTargetIndex));
final InternalDateHistogram dateHistogram = searchResponse.getAggregations().get("dateHistogram");
// NOTE: due to unpredictable values for the @timestamp field we don't know how many buckets we have in the
// date histogram. We know, anyway, that we will have 10 documents in the first two buckets, 10 documents in the last two
// buckets.
// The actual number of documents on each of the first two and last two buckets depends on the timestamp value generated when
// indexing
// documents, which might cross the minute boundary of the fixed_interval date histogram aggregation.
// Then we check there is a variable number of intermediate buckets with exactly 0 documents. This is a result of the way
// downsampling
// deals with a fixed interval granularity that is larger than the date histogram fixed interval (1 minute (date histogram
// fixed_interval)
// < 1 hour (downsample fixed_interval)).
final int totalBuckets = dateHistogram.getBuckets().size();
assertThat(dateHistogram.getBuckets().get(0).getDocCount() + dateHistogram.getBuckets().get(1).getDocCount(), equalTo(10L));
dateHistogram.getBuckets()
.stream()
.skip(2)
.limit(totalBuckets - 3)
.map(InternalDateHistogram.Bucket::getDocCount)
.toList()
.forEach(docCount -> assertThat(docCount, equalTo(0L)));
assertThat(
dateHistogram.getBuckets().get(totalBuckets - 2).getDocCount() + dateHistogram.getBuckets()
.get(totalBuckets - 1)
.getDocCount(),
equalTo(10L)
);
});
}
private void putComposableIndexTemplate(final String id, final List<String> patterns) throws IOException {