Minor improvements for Cross-Cluster Search code (#98405)

Several minor structural and test improvements for cross-cluster search

These changes set the stage for a follow-on ticket to add _cluster/details to
cross-cluster searches with minimize_roundtrips = false. To help keep that
PR from being too large some of the simpler required changes and tests
are added in this PR.
This commit is contained in:
Michael Peterson 2023-08-13 19:30:17 -04:00 committed by GitHub
parent d0f64941f0
commit 6420924e31
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1545 additions and 544 deletions

View file

@ -1055,7 +1055,6 @@ public class CCSDuelIT extends ESRestTestCase {
SearchResponse minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.get(); SearchResponse minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.get();
responseChecker.accept(minimizeRoundtripsSearchResponse); responseChecker.accept(minimizeRoundtripsSearchResponse);
SearchResponse.Clusters clusters = minimizeRoundtripsSearchResponse.getClusters();
// if only the remote cluster was searched, then only one reduce phase is expected // if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1; int expectedReducePhasesMinRoundTrip = 1;
@ -1067,10 +1066,22 @@ public class CCSDuelIT extends ESRestTestCase {
SearchResponse fanOutSearchResponse = fanOutResponse.get(); SearchResponse fanOutSearchResponse = fanOutResponse.get();
responseChecker.accept(fanOutSearchResponse); responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases()); assertEquals(1, fanOutSearchResponse.getNumReducePhases());
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(clustersMRT.getSuccessful(), clustersMRTFalse.getSuccessful());
assertEquals(clustersMRT.getSkipped(), clustersMRTFalse.getSkipped());
boolean removeSkipped = searchRequest.source().collapse() != null;
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse); Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse); if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing sync_search minimizeRoundTrip vs. fanOut"); Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards())); compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing sync_search minimizeRoundTrip vs. fanOut");
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
}
return minimizeRoundtripsResponseMap; return minimizeRoundtripsResponseMap;
} }
} }
@ -1128,10 +1139,22 @@ public class CCSDuelIT extends ESRestTestCase {
responseChecker.accept(fanOutSearchResponse); responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases()); assertEquals(1, fanOutSearchResponse.getNumReducePhases());
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(clustersMRT.getSuccessful(), clustersMRTFalse.getSuccessful());
assertEquals(clustersMRT.getSkipped(), clustersMRTFalse.getSkipped());
boolean removeSkipped = searchRequest.source().collapse() != null;
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse); Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse); if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing async_search minimizeRoundTrip vs. fanOut"); Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards())); compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing async_search minimizeRoundTrip vs. fanOut");
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
}
return minimizeRoundtripsResponseMap; return minimizeRoundtripsResponseMap;
} }
@ -1276,7 +1299,7 @@ public class CCSDuelIT extends ESRestTestCase {
replaceProfileTime(shard); replaceProfileTime(shard);
/* /*
* The way we try to reduce round trips is by fetching all * The way we try to reduce round trips is by fetching all
* of the results we could possibly need from the remote * the results we could possibly need from the remote
* cluster and then merging *those* together locally. This * cluster and then merging *those* together locally. This
* will end up fetching more documents total. So we can't * will end up fetching more documents total. So we can't
* really compare the fetch profiles here. * really compare the fetch profiles here.
@ -1288,6 +1311,8 @@ public class CCSDuelIT extends ESRestTestCase {
if (shards != null) { if (shards != null) {
shards.remove("skipped"); shards.remove("skipped");
} }
Map<String, Object> clusters = (Map<String, Object>) responseMap.get("_clusters");
homogenizeClustersEntries(clusters);
return responseMap; return responseMap;
} }
@ -1317,4 +1342,52 @@ public class CCSDuelIT extends ESRestTestCase {
} }
} }
} }
private static void homogenizeClustersEntries(Map<String, Object> map) {
replaceTookTime(map);
replaceSkippedEntries(map);
}
@SuppressWarnings("unchecked")
private static void replaceSkippedEntries(Map<String, Object> map) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (entry.getKey().contains("skipped")) {
assertThat(entry.getValue(), instanceOf(Number.class));
assertNotNull(entry.setValue(0));
}
if (entry.getValue() instanceof Map) {
replaceSkippedEntries((Map<String, Object>) entry.getValue());
}
if (entry.getValue() instanceof List) {
List<Object> list = (List<Object>) entry.getValue();
for (Object obj : list) {
if (obj instanceof Map) {
replaceSkippedEntries((Map<String, Object>) obj);
}
}
}
}
}
@SuppressWarnings("unchecked")
private static void replaceTookTime(Map<String, Object> map) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (entry.getKey().contains("took")) {
assertThat(entry.getValue(), instanceOf(Number.class));
assertNotNull(entry.setValue(-1));
}
if (entry.getValue() instanceof Map) {
replaceTookTime((Map<String, Object>) entry.getValue());
}
if (entry.getValue() instanceof List) {
List<Object> list = (List<Object>) entry.getValue();
for (Object obj : list) {
if (obj instanceof Map) {
replaceTookTime((Map<String, Object>) obj);
}
}
}
}
}
} }

View file

@ -0,0 +1,562 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search.ccs;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchShardsAction;
import org.elasticsearch.action.search.SearchShardsGroup;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.LegacyReaderContext;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeRoles;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.hamcrest.Matchers;
import org.junit.Before;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
// formerly called CrossClusterSearchIT, but since this one mostly does
// actions besides searching, it was renamed so a new CrossClusterSearchIT
// can focus on several cross cluster search scenarios.
public class CrossClusterIT extends AbstractMultiClustersTestCase {
@Override
protected Collection<String> remoteClusterAlias() {
return List.of("cluster_a");
}
@Override
protected boolean reuseClusters() {
return false;
}
private int indexDocs(Client client, String index) {
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
client.prepareIndex(index).setSource("f", "v").get();
}
client.admin().indices().prepareRefresh(index).get();
return numDocs;
}
public void testRemoteClusterClientRole() throws Exception {
assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo"));
final int demoDocs = indexDocs(client(LOCAL_CLUSTER), "demo");
assertAcked(client("cluster_a").admin().indices().prepareCreate("prod"));
final int prodDocs = indexDocs(client("cluster_a"), "prod");
final InternalTestCluster localCluster = cluster(LOCAL_CLUSTER);
final String pureDataNode = randomBoolean() ? localCluster.startDataOnlyNode() : null;
final String nodeWithoutRemoteClusterClientRole = localCluster.startNode(NodeRoles.onlyRole(DiscoveryNodeRole.DATA_ROLE));
ElasticsearchAssertions.assertFutureThrows(
localCluster.client(nodeWithoutRemoteClusterClientRole)
.prepareSearch("demo", "cluster_a:prod")
.setQuery(new MatchAllQueryBuilder())
.setAllowPartialSearchResults(false)
.setSize(1000)
.execute(),
IllegalArgumentException.class,
RestStatus.BAD_REQUEST,
"node [" + nodeWithoutRemoteClusterClientRole + "] does not have the remote cluster client role enabled"
);
final String nodeWithRemoteClusterClientRole = randomFrom(
localCluster.clusterService()
.state()
.nodes()
.stream()
.map(DiscoveryNode::getName)
.filter(nodeName -> nodeWithoutRemoteClusterClientRole.equals(nodeName) == false)
.filter(nodeName -> nodeName.equals(pureDataNode) == false)
.toList()
);
final SearchResponse resp = localCluster.client(nodeWithRemoteClusterClientRole)
.prepareSearch("demo", "cluster_a:prod")
.setQuery(new MatchAllQueryBuilder())
.setAllowPartialSearchResults(false)
.setSize(1000)
.get();
assertHitCount(resp, demoDocs + prodDocs);
}
public void testProxyConnectionDisconnect() throws Exception {
assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo"));
indexDocs(client(LOCAL_CLUSTER), "demo");
final String remoteNode = cluster("cluster_a").startDataOnlyNode();
assertAcked(
client("cluster_a").admin()
.indices()
.prepareCreate("prod")
.setSettings(
Settings.builder()
.put("index.routing.allocation.require._name", remoteNode)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
)
);
indexDocs(client("cluster_a"), "prod");
SearchListenerPlugin.blockQueryPhase();
try {
PlainActionFuture<SearchResponse> future = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest("demo", "cluster_a:prod");
searchRequest.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(false);
searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
client(LOCAL_CLUSTER).search(searchRequest, future);
SearchListenerPlugin.waitSearchStarted();
disconnectFromRemoteClusters();
// Cancellable tasks on the remote cluster should be cancelled
assertBusy(() -> {
final Iterable<TransportService> transportServices = cluster("cluster_a").getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
Collection<CancellableTask> cancellableTasks = transportService.getTaskManager().getCancellableTasks().values();
for (CancellableTask cancellableTask : cancellableTasks) {
if (TransportActionProxy.isProxyAction(cancellableTask.getAction())) {
assertTrue(cancellableTask.getDescription(), cancellableTask.isCancelled());
}
}
}
});
assertBusy(() -> assertTrue(future.isDone()));
configureAndConnectsToRemoteClusters();
} finally {
SearchListenerPlugin.allowQueryPhase();
}
}
public void testCancel() throws Exception {
assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo"));
indexDocs(client(LOCAL_CLUSTER), "demo");
final InternalTestCluster remoteCluster = cluster("cluster_a");
remoteCluster.ensureAtLeastNumDataNodes(1);
final Settings.Builder allocationFilter = Settings.builder();
if (randomBoolean()) {
remoteCluster.ensureAtLeastNumDataNodes(3);
List<String> remoteDataNodes = remoteCluster.clusterService()
.state()
.nodes()
.stream()
.filter(DiscoveryNode::canContainData)
.map(DiscoveryNode::getName)
.toList();
assertThat(remoteDataNodes.size(), Matchers.greaterThanOrEqualTo(3));
List<String> seedNodes = randomSubsetOf(between(1, remoteDataNodes.size() - 1), remoteDataNodes);
disconnectFromRemoteClusters();
configureRemoteCluster("cluster_a", seedNodes);
if (randomBoolean()) {
// Using proxy connections
allocationFilter.put("index.routing.allocation.exclude._name", String.join(",", seedNodes));
} else {
allocationFilter.put("index.routing.allocation.include._name", String.join(",", seedNodes));
}
}
assertAcked(
client("cluster_a").admin()
.indices()
.prepareCreate("prod")
.setSettings(Settings.builder().put(allocationFilter.build()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);
assertFalse(
client("cluster_a").admin()
.cluster()
.prepareHealth("prod")
.setWaitForYellowStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.get()
.isTimedOut()
);
indexDocs(client("cluster_a"), "prod");
SearchListenerPlugin.blockQueryPhase();
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest("demo", "cluster_a:prod");
searchRequest.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
SearchListenerPlugin.waitSearchStarted();
// Get the search task and cancelled
final TaskInfo rootTask = client().admin()
.cluster()
.prepareListTasks()
.setActions(SearchAction.INSTANCE.name())
.get()
.getTasks()
.stream()
.filter(t -> t.parentTaskId().isSet() == false)
.findFirst()
.get();
AtomicReference<List<TaskInfo>> remoteClusterSearchTasks = new AtomicReference<>();
assertBusy(() -> {
List<TaskInfo> remoteSearchTasks = client("cluster_a").admin()
.cluster()
.prepareListTasks()
.get()
.getTasks()
.stream()
.filter(t -> t.action().startsWith("indices:data/read/search"))
.collect(Collectors.toList());
assertThat(remoteSearchTasks.size(), greaterThan(0));
remoteClusterSearchTasks.set(remoteSearchTasks);
});
for (TaskInfo taskInfo : remoteClusterSearchTasks.get()) {
assertFalse("taskInfo is cancelled: " + taskInfo, taskInfo.cancelled());
}
final CancelTasksRequest cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTask.taskId());
cancelRequest.setWaitForCompletion(randomBoolean());
final ActionFuture<CancelTasksResponse> cancelFuture = client().admin().cluster().cancelTasks(cancelRequest);
assertBusy(() -> {
final Iterable<TransportService> transportServices = cluster("cluster_a").getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
Collection<CancellableTask> cancellableTasks = transportService.getTaskManager().getCancellableTasks().values();
for (CancellableTask cancellableTask : cancellableTasks) {
if (cancellableTask.getAction().contains(SearchAction.INSTANCE.name())) {
assertTrue(cancellableTask.getDescription(), cancellableTask.isCancelled());
}
}
}
});
List<TaskInfo> remoteSearchTasksAfterCancellation = client("cluster_a").admin()
.cluster()
.prepareListTasks()
.get()
.getTasks()
.stream()
.filter(t -> t.action().startsWith("indices:data/read/search"))
.collect(Collectors.toList());
for (TaskInfo taskInfo : remoteSearchTasksAfterCancellation) {
assertTrue(taskInfo.description(), taskInfo.cancelled());
}
SearchListenerPlugin.allowQueryPhase();
assertBusy(() -> assertTrue(queryFuture.isDone()));
assertBusy(() -> assertTrue(cancelFuture.isDone()));
assertBusy(() -> {
final Iterable<TransportService> transportServices = cluster("cluster_a").getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
assertThat(transportService.getTaskManager().getBannedTaskIds(), Matchers.empty());
}
});
RuntimeException e = expectThrows(RuntimeException.class, () -> queryFuture.result());
assertNotNull(e);
assertNotNull(e.getCause());
Throwable t = ExceptionsHelper.unwrap(e, TaskCancelledException.class);
assertNotNull(t);
}
/**
* Makes sure that lookup fields are resolved using the lookup index on each cluster.
*/
public void testLookupFields() throws Exception {
cluster("cluster_a").client()
.admin()
.indices()
.prepareCreate("users")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.get();
cluster("cluster_a").client()
.prepareBulk("users")
.add(new IndexRequest().id("a").source("name", "Remote A"))
.add(new IndexRequest().id("b").source("name", "Remote B"))
.add(new IndexRequest().id("c").source("name", "Remote C"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().admin()
.indices()
.prepareCreate("users")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.get();
client().prepareBulk("users")
.add(new IndexRequest().id("a").source("name", "Local A"))
.add(new IndexRequest().id("b").source("name", "Local B"))
.add(new IndexRequest().id("c").source("name", "Local C"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
// Setup calls on the local cluster
client().admin()
.indices()
.prepareCreate("local_calls")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.setMapping("from_user", "type=keyword", "to_user", "type=keyword")
.get();
client().prepareBulk("local_calls")
.add(new IndexRequest().source("from_user", "a", "to_user", List.of("b", "c"), "duration", 95))
.add(new IndexRequest().source("from_user", "a", "to_user", "b", "duration", 25))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
// Setup calls on the remote cluster
cluster("cluster_a").client()
.admin()
.indices()
.prepareCreate("remote_calls")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.setMapping("from_user", "type=keyword", "to_user", "type=keyword")
.get();
cluster("cluster_a").client()
.prepareBulk("remote_calls")
.add(new IndexRequest().source("from_user", "a", "to_user", "b", "duration", 45))
.add(new IndexRequest().source("from_user", "unknown_caller", "to_user", "c", "duration", 50))
.add(new IndexRequest().source("from_user", List.of("a", "b"), "to_user", "c", "duration", 60))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
final String runtimeMappingSource = """
{
"from": {
"type": "lookup",
"target_index": "users",
"input_field": "from_user",
"target_field": "_id",
"fetch_fields": ["name"]
},
"to": {
"type": "lookup",
"target_index": "users",
"input_field": "to_user",
"target_field": "_id",
"fetch_fields": ["name"]
}
}
""";
final Map<String, Object> runtimeMappings;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, runtimeMappingSource)) {
runtimeMappings = parser.map();
}
// Search on the remote cluster only
{
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c"))
.runtimeMappings(runtimeMappings)
.sort(new FieldSortBuilder("duration"))
.fetchField("from")
.fetchField("to");
SearchRequest request = new SearchRequest("cluster_a:remote_calls").source(searchSourceBuilder);
request.setCcsMinimizeRoundtrips(randomBoolean());
SearchResponse searchResponse = client().search(request).actionGet();
ElasticsearchAssertions.assertHitCount(searchResponse, 2);
SearchHit hit0 = searchResponse.getHits().getHits()[0];
assertThat(hit0.getIndex(), equalTo("remote_calls"));
assertThat(hit0.field("from"), nullValue());
assertThat(hit0.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
SearchHit hit1 = searchResponse.getHits().getHits()[1];
assertThat(hit1.getIndex(), equalTo("remote_calls"));
assertThat(hit1.field("from").getValues(), contains(Map.of("name", List.of("Remote A")), Map.of("name", List.of("Remote B"))));
assertThat(hit1.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
}
// Search on both clusters
{
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c"))
.runtimeMappings(runtimeMappings)
.sort(new FieldSortBuilder("duration"))
.fetchField("from")
.fetchField("to");
SearchRequest request = new SearchRequest("local_calls", "cluster_a:remote_calls").source(searchSourceBuilder);
request.setCcsMinimizeRoundtrips(randomBoolean());
SearchResponse searchResponse = client().search(request).actionGet();
ElasticsearchAssertions.assertHitCount(searchResponse, 3);
SearchHit hit0 = searchResponse.getHits().getHits()[0];
assertThat(hit0.getIndex(), equalTo("remote_calls"));
assertThat(hit0.field("from"), nullValue());
assertThat(hit0.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
SearchHit hit1 = searchResponse.getHits().getHits()[1];
assertThat(hit1.getIndex(), equalTo("remote_calls"));
assertThat(hit1.field("from").getValues(), contains(Map.of("name", List.of("Remote A")), Map.of("name", List.of("Remote B"))));
assertThat(hit1.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
SearchHit hit2 = searchResponse.getHits().getHits()[2];
assertThat(hit2.getIndex(), equalTo("local_calls"));
assertThat(hit2.field("from").getValues(), contains(Map.of("name", List.of("Local A"))));
assertThat(hit2.field("to").getValues(), contains(Map.of("name", List.of("Local B")), Map.of("name", List.of("Local C"))));
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
if (clusterAlias.equals(LOCAL_CLUSTER)) {
return super.nodePlugins(clusterAlias);
} else {
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), SearchListenerPlugin.class);
}
}
@Before
public void resetSearchListenerPlugin() throws Exception {
SearchListenerPlugin.reset();
}
public static class SearchListenerPlugin extends Plugin {
private static final AtomicReference<CountDownLatch> startedLatch = new AtomicReference<>();
private static final AtomicReference<CountDownLatch> queryLatch = new AtomicReference<>();
static void reset() {
startedLatch.set(new CountDownLatch(1));
}
static void blockQueryPhase() {
queryLatch.set(new CountDownLatch(1));
}
static void allowQueryPhase() {
final CountDownLatch latch = queryLatch.get();
if (latch != null) {
latch.countDown();
}
}
static void waitSearchStarted() throws InterruptedException {
assertTrue(startedLatch.get().await(60, TimeUnit.SECONDS));
}
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addSearchOperationListener(new SearchOperationListener() {
@Override
public void onNewReaderContext(ReaderContext readerContext) {
assertThat(readerContext, not(instanceOf(LegacyReaderContext.class)));
}
@Override
public void onPreQueryPhase(SearchContext searchContext) {
startedLatch.get().countDown();
final CountDownLatch latch = queryLatch.get();
if (latch != null) {
try {
assertTrue(latch.await(60, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
});
super.onIndexModule(indexModule);
}
}
public void testSearchShardsWithIndexNameQuery() {
int numShards = randomIntBetween(1, 10);
Client remoteClient = client("cluster_a");
remoteClient.admin()
.indices()
.prepareCreate("my_index")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards))
.get();
int numDocs = randomIntBetween(100, 500);
for (int i = 0; i < numDocs; i++) {
remoteClient.prepareIndex("my_index").setSource("f", "v").get();
}
remoteClient.admin().indices().prepareRefresh("my_index").get();
String[] indices = new String[] { "my_index" };
IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
{
QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:my_index");
SearchShardsRequest request = new SearchShardsRequest(indices, indicesOptions, query, null, null, randomBoolean(), "cluster_a");
SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
assertThat(resp.getGroups(), hasSize(numShards));
for (SearchShardsGroup group : resp.getGroups()) {
assertFalse(group.skipped());
}
}
{
QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:my_index");
SearchShardsRequest request = new SearchShardsRequest(
indices,
indicesOptions,
query,
null,
null,
randomBoolean(),
randomFrom("cluster_b", null)
);
SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
assertThat(resp.getGroups(), hasSize(numShards));
for (SearchShardsGroup group : resp.getGroups()) {
assertTrue(group.skipped());
}
}
{
QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:not_my_index");
SearchShardsRequest request = new SearchShardsRequest(
indices,
indicesOptions,
query,
null,
null,
randomBoolean(),
randomFrom("cluster_a", "cluster_b", null)
);
SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
assertThat(resp.getGroups(), hasSize(numShards));
for (SearchShardsGroup group : resp.getGroups()) {
assertTrue(group.skipped());
}
}
}
}

View file

@ -9,77 +9,57 @@
package org.elasticsearch.search.ccs; package org.elasticsearch.search.ccs;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchShardsAction; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.search.SearchShardsGroup;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings;
import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.LegacyReaderContext; import org.elasticsearch.search.query.ThrowingQueryBuilder;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeRoles; import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.hamcrest.Matchers;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
public class CrossClusterSearchIT extends AbstractMultiClustersTestCase { public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER = "cluster_a";
private static long EARLIEST_TIMESTAMP = 1691348810000L;
private static long LATEST_TIMESTAMP = 1691348820000L;
@Override @Override
protected Collection<String> remoteClusterAlias() { protected Collection<String> remoteClusterAlias() {
return List.of("cluster_a"); return List.of(REMOTE_CLUSTER);
}
@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
return Map.of(REMOTE_CLUSTER, randomBoolean());
} }
@Override @Override
@ -87,473 +67,451 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
return false; return false;
} }
private int indexDocs(Client client, String index) { @Override
int numDocs = between(1, 10); protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
for (int i = 0; i < numDocs; i++) { List<Class<? extends Plugin>> plugs = Arrays.asList(TestQueryBuilderPlugin.class);
client.prepareIndex(index).setSource("f", "v").get(); return Stream.concat(super.nodePlugins(clusterAlias).stream(), plugs.stream()).collect(Collectors.toList());
}
client.admin().indices().prepareRefresh(index).get();
return numDocs;
} }
public void testRemoteClusterClientRole() throws Exception { public static class TestQueryBuilderPlugin extends Plugin implements SearchPlugin {
assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo")); public TestQueryBuilderPlugin() {}
final int demoDocs = indexDocs(client(LOCAL_CLUSTER), "demo");
assertAcked(client("cluster_a").admin().indices().prepareCreate("prod"));
final int prodDocs = indexDocs(client("cluster_a"), "prod");
final InternalTestCluster localCluster = cluster(LOCAL_CLUSTER);
final String pureDataNode = randomBoolean() ? localCluster.startDataOnlyNode() : null;
final String nodeWithoutRemoteClusterClientRole = localCluster.startNode(NodeRoles.onlyRole(DiscoveryNodeRole.DATA_ROLE));
ElasticsearchAssertions.assertFutureThrows(
localCluster.client(nodeWithoutRemoteClusterClientRole)
.prepareSearch("demo", "cluster_a:prod")
.setQuery(new MatchAllQueryBuilder())
.setAllowPartialSearchResults(false)
.setSize(1000)
.execute(),
IllegalArgumentException.class,
RestStatus.BAD_REQUEST,
"node [" + nodeWithoutRemoteClusterClientRole + "] does not have the remote cluster client role enabled"
);
final String nodeWithRemoteClusterClientRole = randomFrom( @Override
localCluster.clusterService() public List<QuerySpec<?>> getQueries() {
.state() QuerySpec<ThrowingQueryBuilder> throwingSpec = new QuerySpec<>(ThrowingQueryBuilder.NAME, ThrowingQueryBuilder::new, p -> {
.nodes() throw new IllegalStateException("not implemented");
.stream()
.map(DiscoveryNode::getName)
.filter(nodeName -> nodeWithoutRemoteClusterClientRole.equals(nodeName) == false)
.filter(nodeName -> nodeName.equals(pureDataNode) == false)
.toList()
);
final SearchResponse resp = localCluster.client(nodeWithRemoteClusterClientRole)
.prepareSearch("demo", "cluster_a:prod")
.setQuery(new MatchAllQueryBuilder())
.setAllowPartialSearchResults(false)
.setSize(1000)
.get();
assertHitCount(resp, demoDocs + prodDocs);
}
public void testProxyConnectionDisconnect() throws Exception {
assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo"));
indexDocs(client(LOCAL_CLUSTER), "demo");
final String remoteNode = cluster("cluster_a").startDataOnlyNode();
assertAcked(
client("cluster_a").admin()
.indices()
.prepareCreate("prod")
.setSettings(
Settings.builder()
.put("index.routing.allocation.require._name", remoteNode)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
)
);
indexDocs(client("cluster_a"), "prod");
SearchListenerPlugin.blockQueryPhase();
try {
PlainActionFuture<SearchResponse> future = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest("demo", "cluster_a:prod");
searchRequest.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(false);
searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
client(LOCAL_CLUSTER).search(searchRequest, future);
SearchListenerPlugin.waitSearchStarted();
disconnectFromRemoteClusters();
// Cancellable tasks on the remote cluster should be cancelled
assertBusy(() -> {
final Iterable<TransportService> transportServices = cluster("cluster_a").getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
Collection<CancellableTask> cancellableTasks = transportService.getTaskManager().getCancellableTasks().values();
for (CancellableTask cancellableTask : cancellableTasks) {
if (TransportActionProxy.isProxyAction(cancellableTask.getAction())) {
assertTrue(cancellableTask.getDescription(), cancellableTask.isCancelled());
}
}
}
}); });
assertBusy(() -> assertTrue(future.isDone()));
configureAndConnectsToRemoteClusters(); return List.of(throwingSpec);
} finally {
SearchListenerPlugin.allowQueryPhase();
} }
} }
public void testCancel() throws Exception { public void testClusterDetailsAfterSuccessfulCCS() throws Exception {
assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo")); Map<String, Object> testClusterInfo = setupTwoClusters();
indexDocs(client(LOCAL_CLUSTER), "demo"); String localIndex = (String) testClusterInfo.get("local.index");
final InternalTestCluster remoteCluster = cluster("cluster_a"); String remoteIndex = (String) testClusterInfo.get("remote.index");
remoteCluster.ensureAtLeastNumDataNodes(1); int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
final Settings.Builder allocationFilter = Settings.builder(); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
if (randomBoolean()) {
remoteCluster.ensureAtLeastNumDataNodes(3); PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
List<String> remoteDataNodes = remoteCluster.clusterService() SearchRequest searchRequest = new SearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
.state() searchRequest.allowPartialSearchResults(false);
.nodes() boolean minimizeRoundtrips = true; // TODO: support MRT=false
.stream() searchRequest.setCcsMinimizeRoundtrips(minimizeRoundtrips);
.filter(DiscoveryNode::canContainData)
.map(DiscoveryNode::getName) searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
.toList(); client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
assertThat(remoteDataNodes.size(), Matchers.greaterThanOrEqualTo(3));
List<String> seedNodes = randomSubsetOf(between(1, remoteDataNodes.size() - 1), remoteDataNodes); assertBusy(() -> assertTrue(queryFuture.isDone()));
disconnectFromRemoteClusters();
configureRemoteCluster("cluster_a", seedNodes); SearchResponse searchResponse = queryFuture.get();
if (randomBoolean()) { assertNotNull(searchResponse);
// Using proxy connections
allocationFilter.put("index.routing.allocation.exclude._name", String.join(",", seedNodes)); SearchResponse.Clusters clusters = searchResponse.getClusters();
} else { assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
allocationFilter.put("index.routing.allocation.include._name", String.join(",", seedNodes)); assertThat(clusters.getTotal(), equalTo(2));
} assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(localClusterSearchInfo.getIndexExpression(), equalTo(localIndex));
assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
assertNotNull(remoteClusterSearchInfo);
assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(remoteClusterSearchInfo.getIndexExpression(), equalTo(remoteIndex));
assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
}
// CCS with a search where the timestamp of the query cannot match so should be SUCCESSFUL with all shards skipped
// during can-match
public void testCCSClusterDetailsWhereAllShardsSkippedInCanMatch() throws Exception {
Map<String, Object> testClusterInfo = setupTwoClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remoteIndex = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
searchRequest.allowPartialSearchResults(false);
boolean minimizeRoundtrips = true; // TODO support MRT=false
searchRequest.setCcsMinimizeRoundtrips(minimizeRoundtrips);
RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("@timestamp").from(EARLIEST_TIMESTAMP - 2000)
.to(EARLIEST_TIMESTAMP - 1000);
searchRequest.source(new SearchSourceBuilder().query(rangeQueryBuilder).size(1000));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
assertBusy(() -> assertTrue(queryFuture.isDone()));
SearchResponse searchResponse = queryFuture.get();
assertNotNull(searchResponse);
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
assertNotNull(remoteClusterSearchInfo);
assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
assertThat(localClusterSearchInfo.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThanOrEqualTo(0L));
}
public void testClusterDetailsAfterCCSWithFailuresOnOneShardOnly() throws Exception {
Map<String, Object> testClusterInfo = setupTwoClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remoteIndex = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
searchRequest.allowPartialSearchResults(true);
boolean minimizeRoundtrips = true; // TODO support MRT=false
searchRequest.setCcsMinimizeRoundtrips(minimizeRoundtrips);
// shardId 0 means to throw the Exception only on shard 0; all others should work
ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), 0);
searchRequest.source(new SearchSourceBuilder().query(queryBuilder).size(10));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
assertBusy(() -> assertTrue(queryFuture.isDone()));
SearchResponse searchResponse = queryFuture.get();
assertNotNull(searchResponse);
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards - 1));
assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(localClusterSearchInfo.getFailedShards(), equalTo(1));
assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
assertNotNull(remoteClusterSearchInfo);
assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
}
public void testClusterDetailsAfterCCSWithFailuresOnRemoteClusterOnly() throws Exception {
Map<String, Object> testClusterInfo = setupTwoClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remoteIndex = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
searchRequest.allowPartialSearchResults(true);
boolean minimizeRoundtrips = true; // TODO support MRT=false
searchRequest.setCcsMinimizeRoundtrips(minimizeRoundtrips);
// throw Exception on all shards of remoteIndex, but not against localIndex
ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(
randomLong(),
new IllegalStateException("index corrupted"),
remoteIndex
);
searchRequest.source(new SearchSourceBuilder().query(queryBuilder).size(10));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
assertBusy(() -> assertTrue(queryFuture.isDone()));
if (skipUnavailable == false) {
ExecutionException ee = expectThrows(ExecutionException.class, () -> queryFuture.get());
assertNotNull(ee.getCause());
assertThat(ee.getCause(), instanceOf(RemoteTransportException.class));
Throwable rootCause = ExceptionsHelper.unwrap(ee.getCause(), IllegalStateException.class);
assertThat(rootCause.getMessage(), containsString("index corrupted"));
} else {
SearchResponse searchResponse = queryFuture.get();
assertNotNull(searchResponse);
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.isCcsMinimizeRoundtrips(), equalTo(minimizeRoundtrips));
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(1));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
assertNotNull(remoteClusterSearchInfo);
SearchResponse.Cluster.Status expectedStatus = skipUnavailable
? SearchResponse.Cluster.Status.SKIPPED
: SearchResponse.Cluster.Status.FAILED;
assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
assertNull(remoteClusterSearchInfo.getTotalShards());
assertNull(remoteClusterSearchInfo.getSuccessfulShards());
assertNull(remoteClusterSearchInfo.getSkippedShards());
assertNull(remoteClusterSearchInfo.getFailedShards());
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
assertNull(remoteClusterSearchInfo.getTook());
assertFalse(remoteClusterSearchInfo.isTimedOut());
ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
} }
}
public void testRemoteClusterOnlyCCSSuccessfulResult() throws Exception {
Map<String, Object> testClusterInfo = setupTwoClusters();
String remoteIndex = (String) testClusterInfo.get("remote.index");
int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
searchRequest.allowPartialSearchResults(false);
boolean minimizeRoundtrips = true; // TODO support MRT=false
searchRequest.setCcsMinimizeRoundtrips(minimizeRoundtrips);
searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
assertBusy(() -> assertTrue(queryFuture.isDone()));
SearchResponse searchResponse = queryFuture.get();
assertNotNull(searchResponse);
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
assertNotNull(remoteClusterSearchInfo);
assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
}
public void testRemoteClusterOnlyCCSWithFailuresOnOneShardOnly() throws Exception {
Map<String, Object> testClusterInfo = setupTwoClusters();
String remoteIndex = (String) testClusterInfo.get("remote.index");
int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
searchRequest.allowPartialSearchResults(true);
boolean minimizeRoundtrips = true; // TODO support MRT=false
searchRequest.setCcsMinimizeRoundtrips(minimizeRoundtrips);
// shardId 0 means to throw the Exception only on shard 0; all others should work
ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), 0);
searchRequest.source(new SearchSourceBuilder().query(queryBuilder).size(10));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
assertBusy(() -> assertTrue(queryFuture.isDone()));
SearchResponse searchResponse = queryFuture.get();
assertNotNull(searchResponse);
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
assertNotNull(remoteClusterSearchInfo);
assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
}
public void testRemoteClusterOnlyCCSWithFailuresOnAllShards() throws Exception {
Map<String, Object> testClusterInfo = setupTwoClusters();
String remoteIndex = (String) testClusterInfo.get("remote.index");
int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
searchRequest.allowPartialSearchResults(true);
boolean minimizeRoundtrips = true; // TODO support MRT=false
searchRequest.setCcsMinimizeRoundtrips(minimizeRoundtrips);
// shardId -1 means to throw the Exception on all shards, so should result in complete search failure
ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), -1);
searchRequest.source(new SearchSourceBuilder().query(queryBuilder).size(10));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
assertBusy(() -> assertTrue(queryFuture.isDone()));
if (skipUnavailable == false) {
ExecutionException ee = expectThrows(ExecutionException.class, () -> queryFuture.get());
assertNotNull(ee.getCause());
Throwable rootCause = ExceptionsHelper.unwrap(ee, IllegalStateException.class);
assertThat(rootCause.getMessage(), containsString("index corrupted"));
} else {
SearchResponse searchResponse = queryFuture.get();
assertNotNull(searchResponse);
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(0));
assertThat(clusters.getSkipped(), equalTo(1));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
assertNotNull(remoteClusterSearchInfo);
SearchResponse.Cluster.Status expectedStatus = skipUnavailable
? SearchResponse.Cluster.Status.SKIPPED
: SearchResponse.Cluster.Status.FAILED;
assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
if (clusters.isCcsMinimizeRoundtrips()) {
assertNull(remoteClusterSearchInfo.getTotalShards());
assertNull(remoteClusterSearchInfo.getSuccessfulShards());
assertNull(remoteClusterSearchInfo.getSkippedShards());
assertNull(remoteClusterSearchInfo.getFailedShards());
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
} else {
assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(remoteNumShards));
assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(remoteNumShards));
}
assertNull(remoteClusterSearchInfo.getTook());
assertFalse(remoteClusterSearchInfo.isTimedOut());
ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
}
}
private Map<String, Object> setupTwoClusters() {
String localIndex = "demo";
int numShardsLocal = randomIntBetween(3, 6);
Settings localSettings = indexSettings(numShardsLocal, 0).build();
assertAcked( assertAcked(
client("cluster_a").admin() client(LOCAL_CLUSTER).admin()
.indices() .indices()
.prepareCreate("prod") .prepareCreate(localIndex)
.setSettings(Settings.builder().put(allocationFilter.build()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) .setSettings(localSettings)
.setMapping("@timestamp", "type=date", "f", "type=text")
);
indexDocs(client(LOCAL_CLUSTER), localIndex);
String remoteIndex = "prod";
int numShardsRemote = randomIntBetween(3, 6);
final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER);
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3));
final Settings.Builder remoteSettings = Settings.builder();
remoteSettings.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsRemote);
assertAcked(
client(REMOTE_CLUSTER).admin()
.indices()
.prepareCreate(remoteIndex)
.setSettings(Settings.builder().put(remoteSettings.build()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
.setMapping("@timestamp", "type=date", "f", "type=text")
); );
assertFalse( assertFalse(
client("cluster_a").admin() client(REMOTE_CLUSTER).admin()
.cluster() .cluster()
.prepareHealth("prod") .prepareHealth(remoteIndex)
.setWaitForYellowStatus() .setWaitForYellowStatus()
.setTimeout(TimeValue.timeValueSeconds(10)) .setTimeout(TimeValue.timeValueSeconds(10))
.get() .get()
.isTimedOut() .isTimedOut()
); );
indexDocs(client("cluster_a"), "prod"); indexDocs(client(REMOTE_CLUSTER), remoteIndex);
SearchListenerPlugin.blockQueryPhase();
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
SearchRequest searchRequest = new SearchRequest("demo", "cluster_a:prod");
searchRequest.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
SearchListenerPlugin.waitSearchStarted();
// Get the search task and cancelled
final TaskInfo rootTask = client().admin()
.cluster()
.prepareListTasks()
.setActions(SearchAction.INSTANCE.name())
.get()
.getTasks()
.stream()
.filter(t -> t.parentTaskId().isSet() == false)
.findFirst()
.get();
AtomicReference<List<TaskInfo>> remoteClusterSearchTasks = new AtomicReference<>(); String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
assertBusy(() -> { Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
List<TaskInfo> remoteSearchTasks = client("cluster_a").admin() boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
.cluster() .getClusterSettings()
.prepareListTasks() .get(skipUnavailableSetting);
.get()
.getTasks()
.stream()
.filter(t -> t.action().startsWith("indices:data/read/search"))
.collect(Collectors.toList());
assertThat(remoteSearchTasks.size(), greaterThan(0));
remoteClusterSearchTasks.set(remoteSearchTasks);
});
for (TaskInfo taskInfo : remoteClusterSearchTasks.get()) { Map<String, Object> clusterInfo = new HashMap<>();
assertFalse("taskInfo is cancelled: " + taskInfo, taskInfo.cancelled()); clusterInfo.put("local.num_shards", numShardsLocal);
} clusterInfo.put("local.index", localIndex);
clusterInfo.put("remote.num_shards", numShardsRemote);
final CancelTasksRequest cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTask.taskId()); clusterInfo.put("remote.index", remoteIndex);
cancelRequest.setWaitForCompletion(randomBoolean()); clusterInfo.put("remote.skip_unavailable", skipUnavailable);
final ActionFuture<CancelTasksResponse> cancelFuture = client().admin().cluster().cancelTasks(cancelRequest); return clusterInfo;
assertBusy(() -> {
final Iterable<TransportService> transportServices = cluster("cluster_a").getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
Collection<CancellableTask> cancellableTasks = transportService.getTaskManager().getCancellableTasks().values();
for (CancellableTask cancellableTask : cancellableTasks) {
if (cancellableTask.getAction().contains(SearchAction.INSTANCE.name())) {
assertTrue(cancellableTask.getDescription(), cancellableTask.isCancelled());
}
}
}
});
List<TaskInfo> remoteSearchTasksAfterCancellation = client("cluster_a").admin()
.cluster()
.prepareListTasks()
.get()
.getTasks()
.stream()
.filter(t -> t.action().startsWith("indices:data/read/search"))
.collect(Collectors.toList());
for (TaskInfo taskInfo : remoteSearchTasksAfterCancellation) {
assertTrue(taskInfo.description(), taskInfo.cancelled());
}
SearchListenerPlugin.allowQueryPhase();
assertBusy(() -> assertTrue(queryFuture.isDone()));
assertBusy(() -> assertTrue(cancelFuture.isDone()));
assertBusy(() -> {
final Iterable<TransportService> transportServices = cluster("cluster_a").getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
assertThat(transportService.getTaskManager().getBannedTaskIds(), Matchers.empty());
}
});
RuntimeException e = expectThrows(RuntimeException.class, () -> queryFuture.result());
assertNotNull(e);
assertNotNull(e.getCause());
Throwable t = ExceptionsHelper.unwrap(e, TaskCancelledException.class);
assertNotNull(t);
} }
/** private int indexDocs(Client client, String index) {
* Makes sure that lookup fields are resolved using the lookup index on each cluster. int numDocs = between(50, 100);
*/
public void testLookupFields() throws Exception {
cluster("cluster_a").client()
.admin()
.indices()
.prepareCreate("users")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.get();
cluster("cluster_a").client()
.prepareBulk("users")
.add(new IndexRequest().id("a").source("name", "Remote A"))
.add(new IndexRequest().id("b").source("name", "Remote B"))
.add(new IndexRequest().id("c").source("name", "Remote C"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().admin()
.indices()
.prepareCreate("users")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.get();
client().prepareBulk("users")
.add(new IndexRequest().id("a").source("name", "Local A"))
.add(new IndexRequest().id("b").source("name", "Local B"))
.add(new IndexRequest().id("c").source("name", "Local C"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
// Setup calls on the local cluster
client().admin()
.indices()
.prepareCreate("local_calls")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.setMapping("from_user", "type=keyword", "to_user", "type=keyword")
.get();
client().prepareBulk("local_calls")
.add(new IndexRequest().source("from_user", "a", "to_user", List.of("b", "c"), "duration", 95))
.add(new IndexRequest().source("from_user", "a", "to_user", "b", "duration", 25))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
// Setup calls on the remote cluster
cluster("cluster_a").client()
.admin()
.indices()
.prepareCreate("remote_calls")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.setMapping("from_user", "type=keyword", "to_user", "type=keyword")
.get();
cluster("cluster_a").client()
.prepareBulk("remote_calls")
.add(new IndexRequest().source("from_user", "a", "to_user", "b", "duration", 45))
.add(new IndexRequest().source("from_user", "unknown_caller", "to_user", "c", "duration", 50))
.add(new IndexRequest().source("from_user", List.of("a", "b"), "to_user", "c", "duration", 60))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
final String runtimeMappingSource = """
{
"from": {
"type": "lookup",
"target_index": "users",
"input_field": "from_user",
"target_field": "_id",
"fetch_fields": ["name"]
},
"to": {
"type": "lookup",
"target_index": "users",
"input_field": "to_user",
"target_field": "_id",
"fetch_fields": ["name"]
}
}
""";
final Map<String, Object> runtimeMappings;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, runtimeMappingSource)) {
runtimeMappings = parser.map();
}
// Search on the remote cluster only
{
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c"))
.runtimeMappings(runtimeMappings)
.sort(new FieldSortBuilder("duration"))
.fetchField("from")
.fetchField("to");
SearchRequest request = new SearchRequest("cluster_a:remote_calls").source(searchSourceBuilder);
request.setCcsMinimizeRoundtrips(randomBoolean());
SearchResponse searchResponse = client().search(request).actionGet();
ElasticsearchAssertions.assertHitCount(searchResponse, 2);
SearchHit hit0 = searchResponse.getHits().getHits()[0];
assertThat(hit0.getIndex(), equalTo("remote_calls"));
assertThat(hit0.field("from"), nullValue());
assertThat(hit0.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
SearchHit hit1 = searchResponse.getHits().getHits()[1];
assertThat(hit1.getIndex(), equalTo("remote_calls"));
assertThat(hit1.field("from").getValues(), contains(Map.of("name", List.of("Remote A")), Map.of("name", List.of("Remote B"))));
assertThat(hit1.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
}
// Search on both clusters
{
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c"))
.runtimeMappings(runtimeMappings)
.sort(new FieldSortBuilder("duration"))
.fetchField("from")
.fetchField("to");
SearchRequest request = new SearchRequest("local_calls", "cluster_a:remote_calls").source(searchSourceBuilder);
request.setCcsMinimizeRoundtrips(randomBoolean());
SearchResponse searchResponse = client().search(request).actionGet();
ElasticsearchAssertions.assertHitCount(searchResponse, 3);
SearchHit hit0 = searchResponse.getHits().getHits()[0];
assertThat(hit0.getIndex(), equalTo("remote_calls"));
assertThat(hit0.field("from"), nullValue());
assertThat(hit0.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
SearchHit hit1 = searchResponse.getHits().getHits()[1];
assertThat(hit1.getIndex(), equalTo("remote_calls"));
assertThat(hit1.field("from").getValues(), contains(Map.of("name", List.of("Remote A")), Map.of("name", List.of("Remote B"))));
assertThat(hit1.field("to").getValues(), contains(Map.of("name", List.of("Remote C"))));
SearchHit hit2 = searchResponse.getHits().getHits()[2];
assertThat(hit2.getIndex(), equalTo("local_calls"));
assertThat(hit2.field("from").getValues(), contains(Map.of("name", List.of("Local A"))));
assertThat(hit2.field("to").getValues(), contains(Map.of("name", List.of("Local B")), Map.of("name", List.of("Local C"))));
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
if (clusterAlias.equals(LOCAL_CLUSTER)) {
return super.nodePlugins(clusterAlias);
} else {
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), SearchListenerPlugin.class);
}
}
@Before
public void resetSearchListenerPlugin() throws Exception {
SearchListenerPlugin.reset();
}
public static class SearchListenerPlugin extends Plugin {
private static final AtomicReference<CountDownLatch> startedLatch = new AtomicReference<>();
private static final AtomicReference<CountDownLatch> queryLatch = new AtomicReference<>();
static void reset() {
startedLatch.set(new CountDownLatch(1));
}
static void blockQueryPhase() {
queryLatch.set(new CountDownLatch(1));
}
static void allowQueryPhase() {
final CountDownLatch latch = queryLatch.get();
if (latch != null) {
latch.countDown();
}
}
static void waitSearchStarted() throws InterruptedException {
assertTrue(startedLatch.get().await(60, TimeUnit.SECONDS));
}
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addSearchOperationListener(new SearchOperationListener() {
@Override
public void onNewReaderContext(ReaderContext readerContext) {
assertThat(readerContext, not(instanceOf(LegacyReaderContext.class)));
}
@Override
public void onPreQueryPhase(SearchContext searchContext) {
startedLatch.get().countDown();
final CountDownLatch latch = queryLatch.get();
if (latch != null) {
try {
assertTrue(latch.await(60, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
});
super.onIndexModule(indexModule);
}
}
public void testSearchShardsWithIndexNameQuery() {
int numShards = randomIntBetween(1, 10);
Client remoteClient = client("cluster_a");
remoteClient.admin()
.indices()
.prepareCreate("my_index")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards))
.get();
int numDocs = randomIntBetween(100, 500);
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
remoteClient.prepareIndex("my_index").setSource("f", "v").get(); long ts = EARLIEST_TIMESTAMP + i;
} if (i == numDocs - 1) {
remoteClient.admin().indices().prepareRefresh("my_index").get(); ts = LATEST_TIMESTAMP;
String[] indices = new String[] { "my_index" };
IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
{
QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:my_index");
SearchShardsRequest request = new SearchShardsRequest(indices, indicesOptions, query, null, null, randomBoolean(), "cluster_a");
SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
assertThat(resp.getGroups(), hasSize(numShards));
for (SearchShardsGroup group : resp.getGroups()) {
assertFalse(group.skipped());
}
}
{
QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:my_index");
SearchShardsRequest request = new SearchShardsRequest(
indices,
indicesOptions,
query,
null,
null,
randomBoolean(),
randomFrom("cluster_b", null)
);
SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
assertThat(resp.getGroups(), hasSize(numShards));
for (SearchShardsGroup group : resp.getGroups()) {
assertTrue(group.skipped());
}
}
{
QueryBuilder query = new TermQueryBuilder("_index", "cluster_a:not_my_index");
SearchShardsRequest request = new SearchShardsRequest(
indices,
indicesOptions,
query,
null,
null,
randomBoolean(),
randomFrom("cluster_a", "cluster_b", null)
);
SearchShardsResponse resp = remoteClient.execute(SearchShardsAction.INSTANCE, request).actionGet();
assertThat(resp.getGroups(), hasSize(numShards));
for (SearchShardsGroup group : resp.getGroups()) {
assertTrue(group.skipped());
} }
client.prepareIndex(index).setSource("f", "v", "@timestamp", ts).get();
} }
client.admin().indices().prepareRefresh(index).get();
return numDocs;
} }
} }

View file

@ -398,27 +398,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
} }
} }
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { } else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
int successful = -1; clusters = Clusters.fromXContent(parser);
int total = -1;
int skipped = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
successful = parser.intValue();
} else if (Clusters.TOTAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
total = parser.intValue();
} else if (Clusters.SKIPPED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
skipped = parser.intValue();
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
clusters = new Clusters(total, successful, skipped);
} else { } else {
parser.skipChildren(); parser.skipChildren();
} }
@ -483,6 +463,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
static final ParseField SUCCESSFUL_FIELD = new ParseField("successful"); static final ParseField SUCCESSFUL_FIELD = new ParseField("successful");
static final ParseField SKIPPED_FIELD = new ParseField("skipped"); static final ParseField SKIPPED_FIELD = new ParseField("skipped");
static final ParseField TOTAL_FIELD = new ParseField("total"); static final ParseField TOTAL_FIELD = new ParseField("total");
static final ParseField DETAILS_FIELD = new ParseField("details");
private final int total; private final int total;
private final int successful; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map private final int successful; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
@ -569,6 +550,16 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
: "successful + skipped is larger than total. total: " + total + " successful: " + successful + " skipped: " + skipped; : "successful + skipped is larger than total. total: " + total + " successful: " + successful + " skipped: " + skipped;
} }
private Clusters(Map<String, AtomicReference<Cluster>> clusterInfoMap) {
assert clusterInfoMap.size() > 0 : "this constructor should not be called with an empty Cluster info map";
this.total = clusterInfoMap.size();
this.clusterInfo = clusterInfoMap;
this.successful = 0; // calculated from clusterInfo map for minimize_roundtrips
this.skipped = 0; // calculated from clusterInfo map for minimize_roundtrips
// should only be called if "details" section of fromXContent is present (for ccsMinimizeRoundtrips)
this.ccsMinimizeRoundtrips = true;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total); out.writeVInt(total);
@ -604,6 +595,54 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
return builder; return builder;
} }
public static Clusters fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
int successful = -1;
int total = -1;
int skipped = -1;
Map<String, AtomicReference<Cluster>> clusterInfoMap = new HashMap<>();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
successful = parser.intValue();
} else if (Clusters.TOTAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
total = parser.intValue();
} else if (Clusters.SKIPPED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
skipped = parser.intValue();
} else {
parser.skipChildren();
}
} else if (token == Token.START_OBJECT) {
if (Clusters.DETAILS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
String currentDetailsFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentDetailsFieldName = parser.currentName(); // cluster alias
} else if (token == Token.START_OBJECT) {
Cluster c = Cluster.fromXContent(currentDetailsFieldName, parser);
clusterInfoMap.put(currentDetailsFieldName, new AtomicReference<>(c));
} else {
parser.skipChildren();
}
}
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
if (clusterInfoMap.isEmpty()) {
return new Clusters(total, successful, skipped);
} else {
return new Clusters(clusterInfoMap);
}
}
/** /**
* @return how many total clusters the search was requested to be executed on * @return how many total clusters the search was requested to be executed on
*/ */
@ -698,6 +737,15 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
} }
return false; return false;
} }
/**
* @return true if this Clusters object was initialized with underlying Cluster objects
* for tracking search Cluster details.
*/
public boolean hasClusterObjects() {
return clusterInfo.keySet().size() > 0;
}
} }
/** /**
@ -710,6 +758,9 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
* See the Clusters clusterInfo Map for details. * See the Clusters clusterInfo Map for details.
*/ */
public static class Cluster implements ToXContentFragment, Writeable { public static class Cluster implements ToXContentFragment, Writeable {
static final ParseField INDICES_FIELD = new ParseField("indices");
static final ParseField STATUS_FIELD = new ParseField("status");
private final String clusterAlias; private final String clusterAlias;
private final String indexExpression; // original index expression from the user for this cluster private final String indexExpression; // original index expression from the user for this cluster
private final Status status; private final Status status;
@ -829,28 +880,28 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
} }
builder.startObject(name); builder.startObject(name);
{ {
builder.field("status", getStatus().toString()); builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
builder.field("indices", indexExpression); builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
if (took != null) { if (took != null) {
builder.field("took", took.millis()); builder.field(TOOK.getPreferredName(), took.millis());
} }
builder.field("timed_out", timedOut); builder.field(TIMED_OUT.getPreferredName(), timedOut);
if (totalShards != null) { if (totalShards != null) {
builder.startObject("_shards"); builder.startObject(RestActions._SHARDS_FIELD.getPreferredName());
builder.field("total", totalShards); builder.field(RestActions.TOTAL_FIELD.getPreferredName(), totalShards);
if (successfulShards != null) { if (successfulShards != null) {
builder.field("successful", successfulShards); builder.field(RestActions.SUCCESSFUL_FIELD.getPreferredName(), successfulShards);
} }
if (skippedShards != null) { if (skippedShards != null) {
builder.field("skipped", skippedShards); builder.field(RestActions.SKIPPED_FIELD.getPreferredName(), skippedShards);
} }
if (failedShards != null) { if (failedShards != null) {
builder.field("failed", failedShards); builder.field(RestActions.FAILED_FIELD.getPreferredName(), failedShards);
} }
builder.endObject(); builder.endObject();
} }
if (failures != null && failures.size() > 0) { if (failures != null && failures.size() > 0) {
builder.startArray("failures"); builder.startArray(RestActions.FAILURES_FIELD.getPreferredName());
for (ShardSearchFailure failure : failures) { for (ShardSearchFailure failure : failures) {
failure.toXContent(builder, params); failure.toXContent(builder, params);
} }
@ -861,6 +912,94 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
return builder; return builder;
} }
public static Cluster fromXContent(String clusterAlias, XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
String clusterName = clusterAlias;
if (clusterAlias.equals("(local)")) {
clusterName = "";
}
String indexExpression = null;
String status = "running";
boolean timedOut = false;
long took = -1L;
// these are all from the _shards section
int totalShards = -1;
int successfulShards = -1;
int skippedShards = -1;
int failedShards = -1;
List<ShardSearchFailure> failures = new ArrayList<>();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Cluster.INDICES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
indexExpression = parser.text();
} else if (Cluster.STATUS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
status = parser.text();
} else if (TIMED_OUT.match(currentFieldName, parser.getDeprecationHandler())) {
timedOut = parser.booleanValue();
} else if (TOOK.match(currentFieldName, parser.getDeprecationHandler())) {
took = parser.longValue();
} else {
parser.skipChildren();
}
} else if (RestActions._SHARDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (RestActions.FAILED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
failedShards = parser.intValue();
} else if (RestActions.SUCCESSFUL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
successfulShards = parser.intValue();
} else if (RestActions.TOTAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
totalShards = parser.intValue();
} else if (RestActions.SKIPPED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
skippedShards = parser.intValue();
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
} else if (token == Token.START_ARRAY) {
if (RestActions.FAILURES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != Token.END_ARRAY) {
failures.add(ShardSearchFailure.fromXContent(parser));
}
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
Integer totalShardsFinal = totalShards == -1 ? null : totalShards;
Integer successfulShardsFinal = successfulShards == -1 ? null : successfulShards;
Integer skippedShardsFinal = skippedShards == -1 ? null : skippedShards;
Integer failedShardsFinal = failedShards == -1 ? null : failedShards;
TimeValue tookTimeValue = took == -1L ? null : new TimeValue(took);
return new Cluster(
clusterName,
indexExpression,
SearchResponse.Cluster.Status.valueOf(status.toUpperCase(Locale.ROOT)),
totalShardsFinal,
successfulShardsFinal,
skippedShardsFinal,
failedShardsFinal,
failures,
tookTimeValue,
timedOut
);
}
public String getClusterAlias() { public String getClusterAlias() {
return clusterAlias; return clusterAlias;
} }

View file

@ -37,7 +37,7 @@ public class MultiSearchResponseTests extends ESTestCase {
int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards); int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards; int skippedShards = totalShards - successfulShards;
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse.Clusters clusters = SearchResponseTests.randomSimpleClusters();
InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS; InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
SearchResponse searchResponse = new SearchResponse( SearchResponse searchResponse = new SearchResponse(
internalSearchResponse, internalSearchResponse,
@ -65,7 +65,7 @@ public class MultiSearchResponseTests extends ESTestCase {
int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards); int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards; int skippedShards = totalShards - successfulShards;
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse.Clusters clusters = SearchResponseTests.randomSimpleClusters();
InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS; InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
SearchResponse searchResponse = new SearchResponse( SearchResponse searchResponse = new SearchResponse(
internalSearchResponse, internalSearchResponse,

View file

@ -10,6 +10,8 @@ package org.elasticsearch.action.search;
import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits;
import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -17,6 +19,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
@ -41,7 +44,12 @@ import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
@ -119,6 +127,13 @@ public class SearchResponseTests extends ESTestCase {
internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS; internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
} }
SearchResponse.Clusters clusters;
if (minimal) {
clusters = randomSimpleClusters();
} else {
clusters = randomClusters();
}
return new SearchResponse( return new SearchResponse(
internalSearchResponse, internalSearchResponse,
null, null,
@ -127,15 +142,128 @@ public class SearchResponseTests extends ESTestCase {
skippedShards, skippedShards,
tookInMillis, tookInMillis,
shardSearchFailures, shardSearchFailures,
randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY clusters
); );
} }
/**
* This test method is typically called when needing "final" Cluster objects, not intermediate CCS state.
* So all clusters returned (whether for local only or remote cluster searches are in a final state
* where all clusters are either successful or skipped.
* @return Randomly chooses between a simple non-CCS Clusters object (1 cluster total with no details)
* and a "CCS" Cluster that has multiple clusters with details (underlying Cluster objects)
*/
static SearchResponse.Clusters randomClusters() { static SearchResponse.Clusters randomClusters() {
int totalClusters = randomIntBetween(0, 10); if (randomBoolean()) {
int successfulClusters = randomIntBetween(0, totalClusters); return randomSimpleClusters();
int skippedClusters = totalClusters - successfulClusters; } else {
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); return createCCSClusterObject(3, 2, true, 2, 1, 0, 0, new ShardSearchFailure[0]);
}
}
/**
* @return non-CCS Clusters - either SearchResponse.Clusters.EMPTY or a Clusters object
* with total=1, and it is marked as either successful or skipped.
*/
static SearchResponse.Clusters randomSimpleClusters() {
if (randomBoolean()) {
return SearchResponse.Clusters.EMPTY;
} else {
int totalClusters = 1;
int successfulClusters = randomIntBetween(0, totalClusters);
int skippedClusters = totalClusters - successfulClusters;
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
}
}
static SearchResponse.Clusters createCCSClusterObject(
int totalClusters,
int remoteClusters,
boolean ccsMinimizeRoundtrips,
int successfulClusters,
int skippedClusters,
int partialClusters,
int failedClusters,
ShardSearchFailure[] failures
) {
assert successfulClusters + skippedClusters <= totalClusters : "successful + skipped > totalClusters";
assert totalClusters == remoteClusters || totalClusters - remoteClusters == 1
: "totalClusters and remoteClusters must be same or total = remote + 1";
OriginalIndices localIndices = null;
if (totalClusters > remoteClusters) {
localIndices = new OriginalIndices(new String[] { "foo", "bar*" }, IndicesOptions.lenientExpand());
}
assert remoteClusters > 0 : "CCS Cluster must have at least one remote cluster";
Map<String, OriginalIndices> remoteClusterIndices = new HashMap<>();
for (int i = 0; i < remoteClusters; i++) {
remoteClusterIndices.put("cluster_" + i, new OriginalIndices(new String[] { "foo", "bar*" }, IndicesOptions.lenientExpand()));
}
SearchResponse.Clusters clusters = new SearchResponse.Clusters(localIndices, remoteClusterIndices, ccsMinimizeRoundtrips);
int successful = successfulClusters;
int skipped = skippedClusters;
int partial = partialClusters;
int failed = failedClusters;
int i = totalClusters > remoteClusters ? -1 : 0;
for (; i < remoteClusters; i++) {
SearchResponse.Cluster.Status status;
int totalShards = 5;
int successfulShards;
int skippedShards;
int failedShards = 0;
List<ShardSearchFailure> failureList = Arrays.asList(failures);
TimeValue took = new TimeValue(1000L);
if (successful > 0) {
status = SearchResponse.Cluster.Status.SUCCESSFUL;
successfulShards = 5;
skippedShards = 1;
failureList = Collections.emptyList();
successful--;
} else if (partial > 0) {
status = SearchResponse.Cluster.Status.PARTIAL;
successfulShards = 4;
skippedShards = 1;
failedShards = 1;
partial--;
} else if (skipped > 0) {
status = SearchResponse.Cluster.Status.SKIPPED;
successfulShards = 0;
skippedShards = 0;
failedShards = 5;
skipped--;
} else if (failed > 0) {
status = SearchResponse.Cluster.Status.FAILED;
successfulShards = 0;
skippedShards = 0;
failedShards = 5;
failed--;
} else {
throw new IllegalStateException("Test setup coding error - should not get here");
}
String clusterAlias = "";
if (i >= 0) {
clusterAlias = "cluster_" + i;
}
AtomicReference<SearchResponse.Cluster> clusterRef = clusters.getCluster(clusterAlias);
SearchResponse.Cluster cluster = clusterRef.get();
SearchResponse.Cluster update = new SearchResponse.Cluster(
cluster.getClusterAlias(),
cluster.getIndexExpression(),
status,
totalShards,
successfulShards,
skippedShards,
failedShards,
failureList,
took,
false
);
assertTrue(clusterRef.compareAndSet(cluster, update));
}
return clusters;
} }
/** /**
@ -314,6 +442,145 @@ public class SearchResponseTests extends ESTestCase {
}"""); }""");
assertEquals(expectedString, Strings.toString(response)); assertEquals(expectedString, Strings.toString(response));
} }
{
SearchResponse response = new SearchResponse(
new InternalSearchResponse(
new SearchHits(hits, new TotalHits(100, TotalHits.Relation.EQUAL_TO), 1.5f),
null,
null,
null,
false,
null,
1
),
null,
20,
9,
2,
0,
ShardSearchFailure.EMPTY_ARRAY,
createCCSClusterObject(
4,
3,
true,
1,
1,
1,
1,
new ShardSearchFailure[] { new ShardSearchFailure(new IllegalStateException("corrupt index")) }
)
);
String expectedString = XContentHelper.stripWhitespace("""
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 20,
"successful": 9,
"skipped": 2,
"failed": 0
},
"_clusters": {
"total": 4,
"successful": 2,
"skipped": 2,
"details": {
"(local)": {
"status": "successful",
"indices": "foo,bar*",
"took": 1000,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 1,
"failed": 0
}
},
"cluster_1": {
"status": "skipped",
"indices": "foo,bar*",
"took": 1000,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 0,
"skipped": 0,
"failed": 5
},
"failures": [
{
"shard": -1,
"index": null,
"reason": {
"type": "illegal_state_exception",
"reason": "corrupt index"
}
}
]
},
"cluster_2": {
"status": "failed",
"indices": "foo,bar*",
"took": 1000,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 0,
"skipped": 0,
"failed": 5
},
"failures": [
{
"shard": -1,
"index": null,
"reason": {
"type": "illegal_state_exception",
"reason": "corrupt index"
}
}
]
},
"cluster_0": {
"status": "partial",
"indices": "foo,bar*",
"took": 1000,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 4,
"skipped": 1,
"failed": 1
},
"failures": [
{
"shard": -1,
"index": null,
"reason": {
"type": "illegal_state_exception",
"reason": "corrupt index"
}
}
]
}
}
},
"hits": {
"total": {
"value": 100,
"relation": "eq"
},
"max_score": 1.5,
"hits": [
{
"_id": "id1",
"_score": 2.0
}
]
}
}""");
assertEquals(expectedString, Strings.toString(response));
}
} }
public void testSerialization() throws IOException { public void testSerialization() throws IOException {

View file

@ -1,10 +1,11 @@
/* /*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License * or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License * 2.0 and the Server Side Public License, v 1; you may not use this file except
* 2.0. * in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/ */
package org.elasticsearch.xpack.search; package org.elasticsearch.search.query;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
@ -21,7 +22,8 @@ import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> { // copied from x-pack to server module
public class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
public static final String NAME = "throw"; public static final String NAME = "throw";
private final long randomUID; private final long randomUID;
@ -36,7 +38,7 @@ class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
* @param failure what exception to throw * @param failure what exception to throw
* @param shardId what shardId to throw the exception. If shardId is less than 0, it will throw for all shards. * @param shardId what shardId to throw the exception. If shardId is less than 0, it will throw for all shards.
*/ */
ThrowingQueryBuilder(long randomUID, RuntimeException failure, int shardId) { public ThrowingQueryBuilder(long randomUID, RuntimeException failure, int shardId) {
super(); super();
this.randomUID = randomUID; this.randomUID = randomUID;
this.failure = failure; this.failure = failure;
@ -51,7 +53,7 @@ class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
* @param failure what exception to throw * @param failure what exception to throw
* @param index what index to throw the exception against (all shards of that index) * @param index what index to throw the exception against (all shards of that index)
*/ */
ThrowingQueryBuilder(long randomUID, RuntimeException failure, String index) { public ThrowingQueryBuilder(long randomUID, RuntimeException failure, String index) {
super(); super();
this.randomUID = randomUID; this.randomUID = randomUID;
this.failure = failure; this.failure = failure;
@ -59,7 +61,7 @@ class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
this.index = index; this.index = index;
} }
ThrowingQueryBuilder(StreamInput in) throws IOException { public ThrowingQueryBuilder(StreamInput in) throws IOException {
super(in); super(in);
this.randomUID = in.readLong(); this.randomUID = in.readLong();
this.failure = in.readException(); this.failure = in.readException();

View file

@ -14,6 +14,7 @@ addQaCheckDependencies(project)
dependencies { dependencies {
compileOnly project(":server") compileOnly project(":server")
testImplementation testArtifact(project(':server'))
compileOnly project(path: xpackModule('core')) compileOnly project(path: xpackModule('core'))
testImplementation(testArtifact(project(xpackModule('core')))) testImplementation(testArtifact(project(xpackModule('core'))))

View file

@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.Min; import org.elasticsearch.search.aggregations.metrics.Min;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.ThrowingQueryBuilder;
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;

View file

@ -29,6 +29,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.ThrowingQueryBuilder;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;

View file

@ -34,6 +34,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.LegacyReaderContext; import org.elasticsearch.search.internal.LegacyReaderContext;
import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.ThrowingQueryBuilder;
import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
@ -79,10 +80,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
/**
* This IT test copies the setup and general approach that the {@code CrossClusterSearchIT} test
* used for testing synchronous CCS.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98272") @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98272")
public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase { public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {