Calc follower vs leader indexing lag based on shard global checkpoints (#104015)

* Calc follower vs leader indexing lag baed on shard global checkpoints

* code simplify

* follow PR comments

* spotless

* Update docs/reference/ccr/apis/follow/get-follow-stats.asciidoc

Co-authored-by: Iraklis Psaroudakis <kingherc@gmail.com>

---------

Co-authored-by: Iraklis Psaroudakis <kingherc@gmail.com>
This commit is contained in:
Volodymyr Krasnikov 2024-01-10 11:03:52 -08:00 committed by GitHub
parent f33122a191
commit f6f86d11d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 1 deletions

View file

@ -75,6 +75,9 @@ task. In this situation, the following task must be resumed manually with the
`index`::
(string) The name of the follower index.
`total_global_checkpoint_lag`::
(long) Indication of how much the follower is lagging the leader. This is the sum of the difference between the `leader_global_checkpoint` and the `follower_global_checkpoint` for all shards.
//Begin shards
`shards`::
(array) An array of shard-level following task statistics.
@ -219,6 +222,7 @@ The API returns the following results:
"indices" : [
{
"index" : "follower_index",
"total_global_checkpoint_lag" : 256,
"shards" : [
{
"remote_cluster" : "remote_cluster",
@ -255,6 +259,7 @@ The API returns the following results:
]
}
--------------------------------------------------
// TESTRESPONSE[s/"total_global_checkpoint_lag" : 256/"total_global_checkpoint_lag" : 0/]
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.indices.0.shards.0.leader_global_checkpoint/]
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.indices.0.shards.0.leader_max_seq_no/]
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.indices.0.shards.0.follower_global_checkpoint/]

View file

@ -112,6 +112,7 @@ The API returns the following results:
"indices" : [
{
"index" : "follower_index",
"total_global_checkpoint_lag" : 256,
"shards" : [
{
"remote_cluster" : "remote_cluster",
@ -149,6 +150,7 @@ The API returns the following results:
}
}
--------------------------------------------------
// TESTRESPONSE[s/"total_global_checkpoint_lag" : 256/"total_global_checkpoint_lag" : 0/]
// TESTRESPONSE[s/"number_of_failed_follow_indices" : 0/"number_of_failed_follow_indices" : $body.auto_follow_stats.number_of_failed_follow_indices/]
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]

View file

@ -54,6 +54,7 @@
ccr.follow_stats:
index: bar
- match: { indices.0.index: "bar" }
- match: { indices.0.total_global_checkpoint_lag: 0 }
- match: { indices.0.shards.0.leader_index: "foo" }
- match: { indices.0.shards.0.follower_index: "bar" }
- match: { indices.0.shards.0.shard_id: 0 }

View file

@ -92,7 +92,10 @@ public class FollowStatsAction extends ActionType<FollowStatsAction.StatsRespons
taskResponsesByIndex.entrySet().iterator(),
indexEntry -> Iterators.concat(
Iterators.<ToXContent>single(
(builder, params) -> builder.startObject().field("index", indexEntry.getKey()).startArray("shards")
(builder, params) -> builder.startObject()
.field("index", indexEntry.getKey())
.field("total_global_checkpoint_lag", calcFollowerToLeaderLaggingOps(indexEntry.getValue()))
.startArray("shards")
),
indexEntry.getValue().values().iterator(),
Iterators.single((builder, params) -> builder.endArray().endObject())
@ -102,6 +105,14 @@ public class FollowStatsAction extends ActionType<FollowStatsAction.StatsRespons
);
}
private static long calcFollowerToLeaderLaggingOps(Map<Integer, StatsResponse> followShardTaskStats) {
return followShardTaskStats.values()
.stream()
.map(StatsResponse::status)
.mapToLong(s -> s.leaderGlobalCheckpoint() - s.followerGlobalCheckpoint())
.sum();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;