Add counters to _clusters response for all states (#99566)

To help the user know what the possible cluster states are and to 
provide an accurate accounting, we added counters summarising
`running`, `partial` and `failed` clusters to the `_clusters` section.
Changes:
- Now in the response is present the number of `running` clusters.
- We split up `partial` and `successful` (before was summed up in the 
`successful` counter).
- We now have a counter for `failed` clusters.
- Now `total` is always equal to `running` + `skipped` + `failed` + 
`partial` + `successful`.
This commit is contained in:
Matteo Piergiovanni 2023-09-28 09:28:45 +02:00 committed by GitHub
parent 292722d1fe
commit d9c15c526e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 507 additions and 206 deletions

View file

@ -0,0 +1,6 @@
pr: 99566
summary: Add additional counters to `_clusters` response for all Cluster search states
area: Search
type: enhancement
issues:
- 98927

View file

@ -146,12 +146,15 @@ included to provide information about the search on each cluster.
"skipped": 0
},
"_clusters": {
"total": 1,
"total": 1, <1>
"successful": 1,
"skipped": 0,
"running": 0,
"partial": 0,
"failed": 0,
"details": {
"cluster_one": { <1>
"status": "successful", <2>
"cluster_one": { <2>
"status": "successful",
"indices": "my-index-000001", <3>
"took": 148, <4>
"timed_out": false,
@ -200,12 +203,13 @@ included to provide information about the search on each cluster.
// TESTRESPONSE[s/"skipped": 0/"skipped": "$body._shards.skipped"/]
// TESTRESPONSE[s/"took": 148/"took": "$body._clusters.details.cluster_one.took"/]
<1> The `_clusters/details` section shows metadata about the search on each cluster.
<2> The cluster status can be one of: *running*, *successful* (searches on all shards
were successful), *partial* (searches on at least one shard of the cluster was successful
and at least one failed), *skipped* (the search failed on a cluster marked with
`skip_unavailable`=`true`) or *failed* (the search failed on a cluster marked with
`skip_unavailable`=`false`).
<1> This section of counters shows all possible cluster search states and how many cluster
searches are currently in that state. The clusters can be one of the following statuses: *running*,
*successful* (searches on all shards were successful), *partial* (searches on at least
one shard of the cluster was successful and at least one failed), *skipped* (the search
failed on a cluster marked with `skip_unavailable`=`true`) or *failed* (the search
failed on a cluster marked with `skip_unavailable`=`false`).
<2> The `_clusters/details` section shows metadata about the search on each cluster.
<3> The index expression supplied by the user. If you provide a wildcard such as `logs-*`,
this section will show the value with the wildcard, not the concrete indices being searched.
<4> How long (in milliseconds) the sub-search took on that cluster.
@ -258,6 +262,9 @@ The API returns the following response:
"total": 3,
"successful": 3,
"skipped": 0,
"running": 0,
"partial": 0,
"failed": 0,
"details": {
"(local)": { <1>
"status": "successful",
@ -434,6 +441,9 @@ The API returns the following response:
"total" : 3,
"successful" : 0,
"skipped": 0,
"running": 3,
"partial": 0,
"failed": 0,
"details": {
"(local)": {
"status": "running",
@ -473,7 +483,7 @@ across all clusters only when the search is completed. When
`ccs_minimize_roundtrips`= `false`, the total shard count is known up front and
will be correct.
<3> The `_clusters` section indicates that 3 clusters are in scope for the search
and all are currently running.
and all are currently in the "running" state.
If you query the <<get-async-search,get async search>> endpoint while the query is
still running, you will see an update in the `_clusters` and `_shards` section of
@ -509,6 +519,9 @@ Response:
"total": 3,
"successful": 1, <2>
"skipped": 0,
"running": 2,
"partial": 0,
"failed": 0,
"details": {
"(local)": {
"status": "successful",
@ -550,7 +563,7 @@ Response:
<1> All the local cluster shards have completed.
<2> The local cluster search has completed, so the "successful" clusters entry
is set to 1. The `_clusters` response metadata will be updated as each cluster
is set to 1 and "running" clusters entry reduced to 2. The `_clusters` response metadata will be updated as each cluster
finishes.
<3> Number of hits from the local cluster search. Final hits are not
shown until searches on all clusters have been completed and merged.
@ -593,6 +606,9 @@ Response:
"total": 3,
"successful": 3, <3>
"skipped": 0,
"running": 0,
"partial": 0,
"failed": 0,
"details": {
"(local)": {
"status": "successful",
@ -742,8 +758,11 @@ Response:
},
"_clusters": {
"total": 3,
"successful": 3, <3>
"successful": 2,
"skipped": 0,
"running": 0,
"partial": 1, <3>
"failed": 0,
"details": {
"(local)": {
"status": "successful",
@ -810,7 +829,7 @@ Response:
<1> The search results are marked as partial, since at least one shard search failed.
<2> The `_shards` section includes shard failure info.
<3> Clusters that have partial results are still marked as successful. They are
<3> Clusters that have partial results are still marked as "partial". They are
marked with status "skipped" (or "failed") only if no data was returned from the search.
<4> The `partial` status has been applied to the cluster with partial results.
<5> The failed shard count is shown.
@ -858,7 +877,10 @@ Response:
"_clusters": {
"total": 3,
"successful": 1,
"skipped": 2, <2>
"skipped": 1,
"running": 0,
"partial": 0,
"failed": 1,
"details": {
"(local)": {
"status": "successful",
@ -873,7 +895,7 @@ Response:
}
},
"cluster_one": {
"status": "skipped", <3>
"status": "skipped", <2>
"indices": "my-index-000001",
"timed_out": false,
"failures": [
@ -881,14 +903,14 @@ Response:
"shard": -1,
"index": null,
"reason": {
"type": "node_disconnected_exception", <4>
"type": "node_disconnected_exception", <3>
"reason": "[myhostname1][35.238.149.1:9300][indices:data/read/search] disconnected"
}
}
]
},
"cluster_two": {
"status": "failed", <5>
"status": "failed", <4>
"indices": "my-index-000001",
"timed_out": false,
"failures": [
@ -907,7 +929,7 @@ Response:
"hits": {
},
}
"error": { <6>
"error": { <5>
"type": "status_exception",
"reason": "error while executing search",
"caused_by": {
@ -921,15 +943,14 @@ Response:
<1> The shard accounting will often be only partial when errors like this occur,
since we need to be able to get shard info from remote clusters on each search.
<2> The skipped counter is used for both "skipped" and "failed" clusters.
<3> `cluster_one` disconnected during the search and it returned no results.
<2> `cluster_one` disconnected during the search and it returned no results.
Since it is marked in the remote cluster configuration as `skip_unavailable`=`true`,
its status is "skipped", which will not fail the entire search.
<4> The failures list shows that the remote cluster node disconnected from the
<3> The failures list shows that the remote cluster node disconnected from the
querying cluster.
<5> `cluster_two` status is "failed", since it is marked in the remote cluster
<4> `cluster_two` status is "failed", since it is marked in the remote cluster
configuration as `skip_unavailable`=`false`.
<6> A top level `error` entry is included when there is a "failed" cluster.
<5> A top level `error` entry is included when there is a "failed" cluster.
[discrete]
@ -1060,7 +1081,10 @@ the `wait_for_completion_timeout` duration (see <<async-search>>).
"total" : 3,
"successful": 0,
"skipped": 0,
"details": { <2>
"running": 3, <2>
"partial": 0,
"failed": 0,
"details": { <3>
"(local)": {
"status": "running",
"indices": "my-index-000001",
@ -1112,7 +1136,8 @@ the `wait_for_completion_timeout` duration (see <<async-search>>).
<1> All shards from all clusters in scope for the search are listed here. Watch this
section and/or the _clusters section for updates to monitor search progress.
<2> The `_clusters` section shows that shard information was successfully
<2> From the `_clusters` section we can see that all the clusters are in "running" state.
<3> The `_clusters` section shows that shard information was successfully
gathered from all 3 clusters and the total shard count on each cluster is listed.

View file

@ -190,16 +190,22 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index"), RequestOptions.DEFAULT);
assertEquals(2, response.getClusters().getTotal());
assertEquals(2, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(2, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
}
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("remote1:index"), RequestOptions.DEFAULT);
assertEquals(1, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(1, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(0, response.getHits().getTotalHits().value);
}
@ -209,8 +215,11 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
RequestOptions.DEFAULT
);
assertEquals(2, response.getClusters().getTotal());
assertEquals(2, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(2, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
String scrollId = response.getScrollId();
@ -227,16 +236,22 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index"), RequestOptions.DEFAULT);
assertEquals(2, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(1, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(1, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
}
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("remote1:index"), RequestOptions.DEFAULT);
assertEquals(1, response.getClusters().getTotal());
assertEquals(0, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(1, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(0, response.getHits().getTotalHits().value);
}
@ -246,8 +261,11 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
RequestOptions.DEFAULT
);
assertEquals(2, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(1, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(1, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, response.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
String scrollId = response.getScrollId();

View file

@ -1072,8 +1072,26 @@ public class CCSDuelIT extends ESRestTestCase {
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(clustersMRT.getSuccessful(), clustersMRTFalse.getSuccessful());
assertEquals(clustersMRT.getSkipped(), clustersMRTFalse.getSkipped());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
@ -1144,8 +1162,26 @@ public class CCSDuelIT extends ESRestTestCase {
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(clustersMRT.getSuccessful(), clustersMRTFalse.getSuccessful());
assertEquals(clustersMRT.getSkipped(), clustersMRTFalse.getSkipped());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
@ -1229,14 +1265,20 @@ public class CCSDuelIT extends ESRestTestCase {
private static void assertMultiClusterSearchResponse(SearchResponse searchResponse) {
assertEquals(2, searchResponse.getClusters().getTotal());
assertEquals(2, searchResponse.getClusters().getSuccessful());
// for bwc checks we expect SUCCESSFUL + PARTIAL to be equal to 2
int bwcSuccessful = searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL);
bwcSuccessful += searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL);
assertEquals(2, bwcSuccessful);
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertThat(searchResponse.getTotalShards(), greaterThan(1));
assertThat(searchResponse.getSuccessfulShards(), greaterThan(1));
}
private static void assertSingleRemoteClusterSearchResponse(SearchResponse searchResponse) {
assertEquals(1, searchResponse.getClusters().getTotal());
assertEquals(1, searchResponse.getClusters().getSuccessful());
assertEquals(1, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(searchResponse.getTotalShards(), greaterThanOrEqualTo(1));
assertThat(searchResponse.getSuccessfulShards(), greaterThanOrEqualTo(1));
}

View file

@ -108,8 +108,8 @@ public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = resp.getClusters();
int expectedNumClusters = 1 + (includeLocalIndex ? 1 : 0);
assertThat(clusters.getTotal(), equalTo(expectedNumClusters));
assertThat(clusters.getSuccessful(), equalTo(expectedNumClusters));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(expectedNumClusters));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
if (includeLocalIndex) {
AtomicReference<SearchResponse.Cluster> localClusterRef = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
@ -163,8 +163,9 @@ public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = searchResponse.getClusters();
int expectedNumClusters = 1 + (includeLocalIndex ? 1 : 0);
assertThat(clusters.getTotal(), equalTo(expectedNumClusters));
assertThat(clusters.getSuccessful(), equalTo(expectedNumClusters));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(expectedNumClusters));
if (includeLocalIndex) {
AtomicReference<SearchResponse.Cluster> localClusterRef = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

View file

@ -125,8 +125,11 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -186,8 +189,11 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -250,8 +256,11 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -314,8 +323,16 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
assertThat(clusters.isCcsMinimizeRoundtrips(), equalTo(minimizeRoundtrips));
}
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
}
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -386,8 +403,11 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -439,8 +459,11 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -482,8 +505,11 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -530,8 +556,16 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
assertNotNull(searchResponse);
SearchResponse.Clusters clusters = searchResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(0));
assertThat(clusters.getSkipped(), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
}
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));

View file

@ -460,14 +460,20 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
public static final Clusters EMPTY = new Clusters(0, 0, 0);
static final ParseField _CLUSTERS_FIELD = new ParseField("_clusters");
static final ParseField TOTAL_FIELD = new ParseField("total");
static final ParseField SUCCESSFUL_FIELD = new ParseField("successful");
static final ParseField SKIPPED_FIELD = new ParseField("skipped");
static final ParseField TOTAL_FIELD = new ParseField("total");
static final ParseField RUNNING_FIELD = new ParseField("running");
static final ParseField PARTIAL_FIELD = new ParseField("partial");
static final ParseField FAILED_FIELD = new ParseField("failed");
static final ParseField DETAILS_FIELD = new ParseField("details");
private final int total;
private final int successful; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
private final int skipped; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
private final int successful; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
private final int skipped; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
private final int running; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
private final int partial; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
private final int failed; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
// key to map is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
// the Map itself is immutable after construction - all Clusters will be accounted for at the start of the search
@ -479,9 +485,9 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
/**
* For use with cross-cluster searches.
* When minimizing roundtrips, the number of successful and skipped clusters is not known until
* the end of the search and it the information in SearchResponse.Cluster object will be updated
* as each cluster returns.
* When minimizing roundtrips, the number of successful, skipped, running, partial and failed clusters
* is not known until the end of the search and it the information in SearchResponse.Cluster object
* will be updated as each cluster returns.
* @param localIndices The localIndices to be searched - null if no local indices are to be searched
* @param remoteClusterIndices mapping of clusterAlias -> OriginalIndices for each remote cluster
* @param ccsMinimizeRoundtrips whether minimizing roundtrips for the CCS
@ -496,8 +502,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
) {
assert remoteClusterIndices.size() > 0 : "At least one remote cluster must be passed into this Cluster constructor";
this.total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1);
this.successful = 0; // calculated from clusterInfo map for minimize_roundtrips
this.skipped = 0; // calculated from clusterInfo map for minimize_roundtrips
assert total >= 1 : "No local indices or remote clusters passed in";
this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
Map<String, AtomicReference<Cluster>> m = new HashMap<>();
if (localIndices != null) {
@ -512,6 +517,11 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
m.put(clusterAlias, new AtomicReference<>(c));
}
this.clusterInfo = Collections.unmodifiableMap(m);
this.successful = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.SUCCESSFUL);
this.skipped = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.SKIPPED);
this.running = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.RUNNING);
this.partial = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.PARTIAL);
this.failed = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.FAILED);
}
/**
@ -529,6 +539,9 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
this.total = total;
this.successful = successful;
this.skipped = skipped;
this.running = 0;
this.partial = 0;
this.failed = 0;
this.ccsMinimizeRoundtrips = false;
this.clusterInfo = Collections.emptyMap(); // will never be used if created from this constructor
}
@ -549,17 +562,35 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
} else {
this.clusterInfo = Collections.emptyMap();
}
this.running = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.RUNNING);
this.partial = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.PARTIAL);
this.failed = determineCountFromClusterInfo(cluster -> cluster.getStatus() == Cluster.Status.FAILED);
this.ccsMinimizeRoundtrips = false;
assert total >= 0 : "total is negative: " + total;
assert total >= successful + skipped
: "successful + skipped is larger than total. total: " + total + " successful: " + successful + " skipped: " + skipped;
assert total >= successful + skipped + running + partial + failed
: "successful + skipped + running + partial + failed is larger than total. total: "
+ total
+ " successful: "
+ successful
+ " skipped: "
+ skipped
+ " running: "
+ running
+ " partial: "
+ partial
+ " failed: "
+ failed;
}
private Clusters(Map<String, AtomicReference<Cluster>> clusterInfoMap) {
assert clusterInfoMap.size() > 0 : "this constructor should not be called with an empty Cluster info map";
this.total = clusterInfoMap.size();
this.clusterInfo = clusterInfoMap;
this.successful = 0; // calculated from clusterInfo map for minimize_roundtrips
this.skipped = 0; // calculated from clusterInfo map for minimize_roundtrips
this.successful = 0; // calculated from clusterInfo map for minimize_roundtrips
this.skipped = 0; // calculated from clusterInfo map for minimize_roundtrips
this.running = 0; // calculated from clusterInfo map for minimize_roundtrips
this.partial = 0; // calculated from clusterInfo map for minimize_roundtrips
this.failed = 0; // calculated from clusterInfo map for minimize_roundtrips
// should only be called if "details" section of fromXContent is present (for ccsMinimizeRoundtrips)
this.ccsMinimizeRoundtrips = true;
}
@ -584,9 +615,11 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
if (total > 0) {
builder.startObject(_CLUSTERS_FIELD.getPreferredName());
builder.field(TOTAL_FIELD.getPreferredName(), total);
builder.field(SUCCESSFUL_FIELD.getPreferredName(), getSuccessful());
builder.field(SKIPPED_FIELD.getPreferredName(), getSkipped());
// TODO: add FAILED_FIELD
builder.field(SUCCESSFUL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SUCCESSFUL));
builder.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED));
builder.field(RUNNING_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.RUNNING));
builder.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL));
builder.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED));
if (clusterInfo.size() > 0) {
builder.startObject("details");
for (AtomicReference<Cluster> cluster : clusterInfo.values()) {
@ -602,21 +635,30 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
public static Clusters fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
int successful = -1;
int total = -1;
int successful = -1;
int skipped = -1;
int running = 0; // 0 for BWC
int partial = 0; // 0 for BWC
int failed = 0; // 0 for BWC
Map<String, AtomicReference<Cluster>> clusterInfoMap = new HashMap<>();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
successful = parser.intValue();
} else if (Clusters.TOTAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
if (Clusters.TOTAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
total = parser.intValue();
} else if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
successful = parser.intValue();
} else if (Clusters.SKIPPED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
skipped = parser.intValue();
} else if (Clusters.RUNNING_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
running = parser.intValue();
} else if (Clusters.PARTIAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
partial = parser.intValue();
} else if (Clusters.FAILED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
failed = parser.intValue();
} else {
parser.skipChildren();
}
@ -641,6 +683,8 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
}
}
if (clusterInfoMap.isEmpty()) {
assert running == 0 && partial == 0 && failed == 0
: "Non cross-cluster should have counter for running, partial and failed equal to 0";
return new Clusters(total, successful, skipped);
} else {
return new Clusters(clusterInfoMap);
@ -655,15 +699,20 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
}
/**
* @return how many total clusters the search was executed successfully on
* @param status the state you want to query
* @return how many clusters are currently in a specific state
*/
public int getSuccessful() {
public int getClusterStateCount(Cluster.Status status) {
if (clusterInfo.isEmpty()) {
return successful;
return switch (status) {
case RUNNING -> running;
case SUCCESSFUL -> successful;
case PARTIAL -> partial;
case SKIPPED -> skipped;
case FAILED -> failed;
};
} else {
return determineCountFromClusterInfo(
cluster -> cluster.getStatus() == Cluster.Status.SUCCESSFUL || cluster.getStatus() == Cluster.Status.PARTIAL
);
return determineCountFromClusterInfo(cluster -> cluster.getStatus() == status);
}
}
@ -678,19 +727,6 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
return (int) clusterInfo.values().stream().filter(c -> predicate.test(c.get())).count();
}
/**
* @return how many total clusters were used during the execution of the search request
*/
public int getSkipped() {
if (clusterInfo.isEmpty()) {
return skipped;
} else {
return determineCountFromClusterInfo(cluster ->
// TODO: change this after adding an XContent field for FAILED clusters
cluster.getStatus() == Cluster.Status.SKIPPED || cluster.getStatus() == Cluster.Status.FAILED);
}
}
/**
* @return whether this search was a cross cluster search done with ccsMinimizeRoundtrips=true
*/
@ -715,17 +751,34 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
return false;
}
Clusters clusters = (Clusters) o;
return total == clusters.total && successful == clusters.successful && skipped == clusters.skipped;
return total == clusters.total
&& successful == clusters.successful
&& skipped == clusters.skipped
&& running == clusters.running
&& partial == clusters.partial
&& failed == clusters.failed;
}
@Override
public int hashCode() {
return Objects.hash(total, successful, skipped);
return Objects.hash(total, successful, skipped, running, partial, failed);
}
@Override
public String toString() {
return "Clusters{total=" + total + ", successful=" + getSuccessful() + ", skipped=" + getSkipped() + '}';
return "Clusters{total="
+ total
+ ", successful="
+ getClusterStateCount(Cluster.Status.SUCCESSFUL)
+ ", skipped="
+ getClusterStateCount(Cluster.Status.SKIPPED)
+ ", running="
+ getClusterStateCount(Cluster.Status.RUNNING)
+ ", partial="
+ getClusterStateCount(Cluster.Status.PARTIAL)
+ ", failed="
+ getClusterStateCount(Cluster.Status.FAILED)
+ '}';
}
/**

View file

@ -430,7 +430,10 @@ public class SearchResponseTests extends ESTestCase {
"_clusters": {
"total": 5,
"successful": 3,
"skipped": 2
"skipped": 2,
"running":0,
"partial": 0,
"failed": 0
},
"hits": {
"total": {
@ -483,8 +486,11 @@ public class SearchResponseTests extends ESTestCase {
},
"_clusters": {
"total": 4,
"successful": 2,
"skipped": 2,
"successful": 1,
"skipped": 1,
"running":0,
"partial": 1,
"failed": 1,
"details": {
"(local)": {
"status": "successful",

View file

@ -613,9 +613,12 @@ public class TransportSearchActionTests extends ESTestCase {
awaitLatch(latch, 5, TimeUnit.SECONDS);
SearchResponse searchResponse = response.get();
assertEquals(0, searchResponse.getClusters().getSkipped());
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(totalClusters, searchResponse.getClusters().getTotal());
assertEquals(totalClusters, searchResponse.getClusters().getSuccessful());
assertEquals(totalClusters, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(totalClusters == 1 ? 1 : totalClusters + 1, searchResponse.getNumReducePhases());
}
{
@ -758,10 +761,16 @@ public class TransportSearchActionTests extends ESTestCase {
awaitLatch(latch, 5, TimeUnit.SECONDS);
SearchResponse searchResponse = response.get();
assertEquals(disconnectedNodesIndices.size(), searchResponse.getClusters().getSkipped());
assertEquals(
disconnectedNodesIndices.size(),
searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(totalClusters, searchResponse.getClusters().getTotal());
int successful = totalClusters - disconnectedNodesIndices.size();
assertEquals(successful, searchResponse.getClusters().getSuccessful());
assertEquals(successful, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(successful == 0 ? 0 : successful + 1, searchResponse.getNumReducePhases());
}
@ -816,9 +825,12 @@ public class TransportSearchActionTests extends ESTestCase {
awaitLatch(latch, 5, TimeUnit.SECONDS);
SearchResponse searchResponse = response.get();
assertEquals(0, searchResponse.getClusters().getSkipped());
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(totalClusters, searchResponse.getClusters().getTotal());
assertEquals(totalClusters, searchResponse.getClusters().getSuccessful());
assertEquals(totalClusters, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.RUNNING));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(0, searchResponse.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertEquals(totalClusters == 1 ? 1 : totalClusters + 1, searchResponse.getNumReducePhases());
});
assertEquals(0, service.getConnectionManager().size());
@ -877,7 +889,7 @@ public class TransportSearchActionTests extends ESTestCase {
SearchShardsResponse shardsResponse = map.get(clusterAlias);
assertThat(shardsResponse.getNodes(), hasSize(1));
}
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
}
{
final CountDownLatch latch = new CountDownLatch(1);
@ -897,7 +909,7 @@ public class TransportSearchActionTests extends ESTestCase {
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(numClusters, clusters.getSkipped());
assertEquals(numClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
@ -945,7 +957,7 @@ public class TransportSearchActionTests extends ESTestCase {
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(numDisconnectedClusters, clusters.getSkipped());
assertEquals(numDisconnectedClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
@ -978,7 +990,7 @@ public class TransportSearchActionTests extends ESTestCase {
assertNotNull(response.get());
Map<String, SearchShardsResponse> map = response.get();
assertEquals(numClusters - disconnectedNodesIndices.size(), map.size());
assertEquals(disconnectedNodesIndices.size(), clusters.getSkipped());
assertEquals(disconnectedNodesIndices.size(), clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
if (disconnectedNodesIndices.contains(i)) {
@ -1021,7 +1033,7 @@ public class TransportSearchActionTests extends ESTestCase {
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch)
);
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, clusters.getSkipped());
assertEquals(0, clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertNotNull(response.get());
Map<String, SearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());

View file

@ -186,8 +186,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -217,8 +220,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -298,8 +304,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -368,8 +377,16 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(0));
assertThat(clusters.getSkipped(), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(2));
}
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -389,8 +406,16 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(0));
assertThat(clusters.getSkipped(), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(2));
}
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -458,8 +483,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -490,8 +518,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -578,8 +609,16 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
}
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -620,8 +659,16 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
}
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -692,8 +739,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -725,8 +775,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(2));
assertThat(clusters.getSuccessful(), equalTo(2));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(2));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
assertNotNull(localClusterSearchInfo);
@ -788,8 +841,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -811,8 +867,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -862,8 +921,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -884,8 +946,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(1));
assertThat(clusters.getSkipped(), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -940,8 +1005,16 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(0));
assertThat(clusters.getSkipped(), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
}
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -958,9 +1031,16 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
SearchResponse.Clusters clusters = statusResponse.getClusters();
assertThat(clusters.getTotal(), equalTo(1));
assertThat(clusters.getSuccessful(), equalTo(0));
assertThat(clusters.getSkipped(), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
if (skipUnavailable) {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(1));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
} else {
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(1));
}
assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();

View file

@ -357,6 +357,9 @@ public class AsyncSearchResponseTests extends ESTestCase {
"total" : 3,
"successful" : 0,
"skipped" : 0,
"running" : 3,
"partial" : 0,
"failed" : 0,
"details" : {
"cluster_1" : {
"status" : "running",
@ -413,6 +416,9 @@ public class AsyncSearchResponseTests extends ESTestCase {
"total" : 3,
"successful" : 0,
"skipped" : 0,
"running" : 3,
"partial" : 0,
"failed" : 0,
"details" : {
"cluster_1" : {
"status" : "running",
@ -575,8 +581,11 @@ public class AsyncSearchResponseTests extends ESTestCase {
},
"_clusters" : {
"total" : 4,
"successful" : 3,
"successful" : 2,
"skipped" : 1,
"running" : 0,
"partial" : 1,
"failed" : 0,
"details" : {
"(local)" : {
"status" : "successful",

View file

@ -153,48 +153,11 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
response.getSkippedShards(),
response.getFailedShards(),
clusters.getTotal(),
clusters.getSuccessful(),
clusters.getSkipped(),
response.getCompletionStatus() == null ? "" : Strings.format("""
,"completion_status" : %s""", response.getCompletionStatus().getStatus()) };
expectedJson = Strings.format("""
{
"id" : "%s",
"is_running" : %s,
"is_partial" : %s,
"start_time_in_millis" : %s,
"expiration_time_in_millis" : %s,
%s
"_shards" : {
"total" : %s,
"successful" : %s,
"skipped" : %s,
"failed" : %s
},
"_clusters": {
"total": %s,
"successful": %s,
"skipped": %s
}
%s
}
""", args);
} else {
Object[] args = new Object[] {
response.getId(),
response.isRunning(),
response.isPartial(),
response.getStartTime(),
response.getExpirationTime(),
completionTimeEntry,
response.getTotalShards(),
response.getSuccessfulShards(),
response.getSkippedShards(),
response.getFailedShards(),
clusters.getTotal(),
clusters.getSuccessful(),
clusters.getSkipped(),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
response.getCompletionStatus() == null ? "" : Strings.format("""
,"completion_status" : %s""", response.getCompletionStatus().getStatus()) };
@ -216,6 +179,55 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
"total": %s,
"successful": %s,
"skipped": %s,
"running": %s,
"partial": %s,
"failed": %s
}
%s
}
""", args);
} else {
Object[] args = new Object[] {
response.getId(),
response.isRunning(),
response.isPartial(),
response.getStartTime(),
response.getExpirationTime(),
completionTimeEntry,
response.getTotalShards(),
response.getSuccessfulShards(),
response.getSkippedShards(),
response.getFailedShards(),
clusters.getTotal(),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
response.getCompletionStatus() == null ? "" : Strings.format("""
,"completion_status" : %s""", response.getCompletionStatus().getStatus()) };
expectedJson = Strings.format("""
{
"id" : "%s",
"is_running" : %s,
"is_partial" : %s,
"start_time_in_millis" : %s,
"expiration_time_in_millis" : %s,
%s
"_shards" : {
"total" : %s,
"successful" : %s,
"skipped" : %s,
"failed" : %s
},
"_clusters": {
"total": %s,
"successful": %s,
"skipped": %s,
"running": %s,
"partial": %s,
"failed": %s,
"details": {
"(local)": {
"status": "running",
@ -373,11 +385,10 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
} else {
// CCS search
totalClusters = 80;
int successful = randomInt(60);
successfulClusters = randomInt(60);
int partial = randomInt(20);
successfulClusters = successful + partial;
skippedClusters = totalClusters - successfulClusters;
clusters = AsyncSearchResponseTests.createCCSClusterObjects(80, 80, true, successful, skippedClusters, partial);
skippedClusters = totalClusters - (successfulClusters + partial);
clusters = AsyncSearchResponseTests.createCCSClusterObjects(80, 80, true, successfulClusters, skippedClusters, partial);
}
SearchResponse searchResponse = new SearchResponse(
internalSearchResponse,
@ -396,8 +407,11 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
assertEquals(0, statusFromStoredSearch.getFailedShards());
assertEquals(statusFromStoredSearch.getCompletionStatus(), RestStatus.OK);
assertEquals(totalClusters, statusFromStoredSearch.getClusters().getTotal());
assertEquals(skippedClusters, statusFromStoredSearch.getClusters().getSkipped());
assertEquals(successfulClusters, statusFromStoredSearch.getClusters().getSuccessful());
assertEquals(skippedClusters, statusFromStoredSearch.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
assertEquals(
successfulClusters,
statusFromStoredSearch.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98706")
@ -432,7 +446,8 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
assertEquals(0, statusFromStoredSearch.getFailedShards());
assertNull("completion_status should not be present if still running", statusFromStoredSearch.getCompletionStatus());
assertEquals(100, statusFromStoredSearch.getClusters().getTotal());
assertEquals(successful + partial, statusFromStoredSearch.getClusters().getSuccessful());
assertEquals(skipped, statusFromStoredSearch.getClusters().getSkipped());
assertEquals(successful, statusFromStoredSearch.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL));
assertEquals(partial, statusFromStoredSearch.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL));
assertEquals(skipped, statusFromStoredSearch.getClusters().getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));
}
}

View file

@ -54,10 +54,10 @@ public interface DataExtractor {
*/
default void checkForSkippedClusters(SearchResponse searchResponse) {
SearchResponse.Clusters clusterResponse = searchResponse.getClusters();
if (clusterResponse != null && clusterResponse.getSkipped() > 0) {
if (clusterResponse != null && clusterResponse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED) > 0) {
throw new ResourceNotFoundException(
"[{}] remote clusters out of [{}] were skipped when performing datafeed search",
clusterResponse.getSkipped(),
clusterResponse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clusterResponse.getTotal()
);
}