[Stack Monitoring] Filter CCR list by currently viewed cluster (#148018)

## 📓 Summary

Closes #147224 

The issue was caused by the missing usage of the `clusterUuid` parameter
in the query used to retrieve the cluster followers and stats.

Adding a `term` condition for the cluster_uuid now retrieves only the
followers related to the interested cluster. In case a cluster has no
followers, an empty list will be returned by the endpoint.

This PR also adds stronger typing against the data returned by the
endpoint and adds some minor improvements.

## 🧪 Testing
Testing locally this PR requires some effort since is necessary to
create a multi-cluster locally.
Following [this
guide](https://github.com/elastic/kibana/blob/main/x-pack/plugins/monitoring/dev_docs/how_to/running_components_from_source.md#multi-cluster-tests-for-ccrccs-or-listing)
is possible to locally setup the 2 cluster and make one of them follow
the other.
Also, the new issue
[oblt#3279](https://github.com/elastic/observability-test-environments/issues/3279)
has been opened to have an easier way to access a multi-cluster setup
with oblt clusters.

### Before bugfix


https://user-images.githubusercontent.com/34506779/209954682-b519fcd3-888b-472a-8a1d-acf57ff5de05.mov

### After bugfix


https://user-images.githubusercontent.com/34506779/209954718-5045c49e-f113-42e5-aabf-81d2c5f53556.mov

Co-authored-by: Marco Antonio Ghiani <marcoantonio.ghiani@elastic.co>
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Marco Antonio Ghiani 2023-01-04 08:16:28 +01:00 committed by GitHub
parent faa020b89b
commit 296025b1e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 260 additions and 151 deletions

View file

@ -6,6 +6,7 @@
*/
import * as rt from 'io-ts';
import { ElasticsearchLegacySource, ElasticsearchMetricbeatSource } from '../../types/es';
import { clusterUuidRT, ccsRT, timeRangeRT } from '../shared';
export const postElasticsearchCcrRequestParamsRT = rt.type({
@ -25,6 +26,67 @@ export type PostElasticsearchCcrRequestPayload = rt.TypeOf<
typeof postElasticsearchCcrRequestPayloadRT
>;
export const postElasticsearchCcrResponsePayloadRT = rt.type({
// TODO: add payload entries
const errorRt = rt.partial({ error: rt.union([rt.string, rt.undefined]) });
export const CcrShardRT = rt.type({
shardId: rt.number,
error: errorRt,
opsSynced: rt.number,
syncLagTime: rt.number,
syncLagOps: rt.number,
syncLagOpsLeader: rt.number,
syncLagOpsFollower: rt.number,
});
export type CcrShard = rt.TypeOf<typeof CcrShardRT>;
export const postElasticsearchCcrResponsePayloadRT = rt.array(
rt.type({
id: rt.string,
index: rt.string,
follows: rt.string,
shards: rt.array(CcrShardRT),
error: errorRt,
opsSynced: rt.number,
syncLagTime: rt.number,
syncLagOps: rt.number,
})
);
export type PostElasticsearchCcrResponsePayload = rt.TypeOf<
typeof postElasticsearchCcrResponsePayloadRT
>;
interface ValueObj<T> {
value: T;
}
export interface CcrShardBucket {
key: number;
ops_synced: ValueObj<string>;
lag_ops: ValueObj<number>;
leader_lag_ops: ValueObj<number>;
follower_lag_ops: ValueObj<number>;
}
export interface CcrBucket {
key: string;
by_shard_id: {
buckets: CcrShardBucket[];
};
leader_index: {
buckets: Array<{
remote_cluster: {
buckets: Array<{
key: string;
}>;
};
}>;
};
}
export interface CcrFullStats {
[key: string]: Array<
| NonNullable<ElasticsearchLegacySource['ccr_stats']>
| NonNullable<ElasticsearchMetricbeatSource['elasticsearch']>['ccr']
>;
}

View file

@ -96,7 +96,7 @@ export const ElasticsearchCcrPage: React.FC<ComponentProps> = ({ clusters }) =>
render={({ flyoutComponent, bottomBarComponent }: SetupModeProps) => (
<SetupModeContext.Provider value={{ setupModeSupported: true }}>
{flyoutComponent}
<Ccr data={data.data} alerts={alerts} />
<Ccr data={data} alerts={alerts} />
{bottomBarComponent}
</SetupModeContext.Provider>
)}

View file

@ -13,7 +13,12 @@ import {
import {
postElasticsearchCcrRequestParamsRT,
postElasticsearchCcrRequestPayloadRT,
PostElasticsearchCcrResponsePayload,
postElasticsearchCcrResponsePayloadRT,
CcrBucket,
CcrFullStats,
CcrShard,
CcrShardBucket,
} from '../../../../../common/http_api/elasticsearch';
import { TimeRange } from '../../../../../common/http_api/shared';
import {
@ -24,7 +29,7 @@ import {
import { MonitoringConfig } from '../../../../config';
import { createValidationFunction } from '../../../../lib/create_route_validation_function';
import { handleError } from '../../../../lib/errors/handle_error';
import { LegacyRequest, MonitoringCore } from '../../../../types';
import { MonitoringCore } from '../../../../types';
function getBucketScript(max: string, min: string) {
return {
@ -38,12 +43,15 @@ function getBucketScript(max: string, min: string) {
};
}
function buildRequest(
req: LegacyRequest<unknown, unknown, { timeRange: TimeRange }>,
config: MonitoringConfig,
esIndexPattern: string
) {
const { min, max } = req.payload.timeRange;
interface BuildRequestParams {
clusterUuid: string;
config: MonitoringConfig;
esIndexPattern: string;
timeRange: TimeRange;
}
function buildRequest({ clusterUuid, config, esIndexPattern, timeRange }: BuildRequestParams) {
const { min, max } = timeRange;
const maxBucketSize = config.ui.max_bucket_size;
const aggs = {
ops_synced_max: {
@ -125,6 +133,13 @@ function buildRequest(
query: {
bool: {
must: [
{
term: {
cluster_uuid: {
value: clusterUuid,
},
},
},
{
bool: {
should: [
@ -210,6 +225,41 @@ function buildRequest(
};
}
function buildShardStats({
fullStats,
bucket,
shardBucket,
}: {
bucket: CcrBucket;
fullStats: CcrFullStats;
shardBucket: CcrShardBucket;
}) {
const fullStat: any = fullStats[`${bucket.key}:${shardBucket.key}`][0];
const fullLegacyStat: ElasticsearchLegacySource = fullStat._source?.ccr_stats
? fullStat._source
: null;
const fullMbStat: ElasticsearchMetricbeatSource = fullStat._source?.elasticsearch?.ccr
? fullStat._source
: null;
const readExceptions =
fullLegacyStat?.ccr_stats?.read_exceptions ??
fullMbStat?.elasticsearch?.ccr?.read_exceptions ??
[];
const shardStat = {
shardId: shardBucket.key,
error: readExceptions.length ? readExceptions[0].exception?.type : null,
opsSynced: get(shardBucket, 'ops_synced.value'),
syncLagTime:
fullLegacyStat?.ccr_stats?.time_since_last_read_millis ??
fullMbStat?.elasticsearch?.ccr?.follower?.time_since_last_read?.ms,
syncLagOps: get(shardBucket, 'lag_ops.value'),
syncLagOpsLeader: get(shardBucket, 'leader_lag_ops.value'),
syncLagOpsFollower: get(shardBucket, 'follower_lag_ops.value'),
};
return shardStat;
}
export function ccrRoute(server: MonitoringCore) {
const validateParams = createValidationFunction(postElasticsearchCcrRequestParamsRT);
const validateBody = createValidationFunction(postElasticsearchCcrRequestPayloadRT);
@ -224,6 +274,7 @@ export function ccrRoute(server: MonitoringCore) {
async handler(req) {
const config = server.config;
const ccs = req.payload.ccs;
const { clusterUuid } = req.params;
const dataset = 'ccr';
const moduleType = 'elasticsearch';
const esIndexPattern = getIndexPatterns({
@ -235,19 +286,19 @@ export function ccrRoute(server: MonitoringCore) {
try {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const params = buildRequest(req, config, esIndexPattern);
const params = buildRequest({
clusterUuid,
config,
esIndexPattern,
timeRange: req.payload.timeRange,
});
const response: ElasticsearchResponse = await callWithRequest(req, 'search', params);
if (!response || Object.keys(response).length === 0) {
return { data: [] };
return [];
}
const fullStats: {
[key: string]: Array<
| NonNullable<ElasticsearchLegacySource['ccr_stats']>
| NonNullable<ElasticsearchMetricbeatSource['elasticsearch']>['ccr']
>;
} =
const fullStats: CcrFullStats =
response.hits?.hits.reduce((accum, hit) => {
const innerHits = hit.inner_hits?.by_shard.hits?.hits ?? [];
const grouped = groupBy(innerHits, (innerHit) => {
@ -265,78 +316,39 @@ export function ccrRoute(server: MonitoringCore) {
}, {}) ?? {};
const buckets = response.aggregations?.by_follower_index.buckets ?? [];
const data = buckets.reduce((accum: any, bucket: any) => {
const data: PostElasticsearchCcrResponsePayload = buckets.map((bucket: CcrBucket) => {
const leaderIndex = get(bucket, 'leader_index.buckets[0].key');
const remoteCluster = get(
bucket,
'leader_index.buckets[0].remote_cluster.buckets[0].key'
);
const follows = remoteCluster ? `${leaderIndex} on ${remoteCluster}` : leaderIndex;
const stat: {
[key: string]: any;
shards: Array<{
error?: string;
opsSynced: number;
syncLagTime: number;
syncLagOps: number;
}>;
} = {
const shards: CcrShard[] = get(bucket, 'by_shard_id.buckets').map(
(shardBucket: CcrShardBucket) => buildShardStats({ bucket, fullStats, shardBucket })
);
const error = (shards.find((shard) => shard.error) || {}).error;
const opsSynced = shards.reduce((sum, curr) => sum + curr.opsSynced, 0);
const syncLagTime = shards.reduce((max, curr) => Math.max(max, curr.syncLagTime), 0);
const syncLagOps = shards.reduce((max, curr) => Math.max(max, curr.syncLagOps), 0);
const stat = {
id: bucket.key,
index: bucket.key,
follows,
shards: [],
error: undefined,
opsSynced: undefined,
syncLagTime: undefined,
syncLagOps: undefined,
shards,
error,
opsSynced,
syncLagTime,
syncLagOps,
};
stat.shards = get(bucket, 'by_shard_id.buckets').reduce(
(accum2: any, shardBucket: any) => {
const fullStat: any = fullStats[`${bucket.key}:${shardBucket.key}`][0];
const fullLegacyStat: ElasticsearchLegacySource = fullStat._source?.ccr_stats
? fullStat._source
: null;
const fullMbStat: ElasticsearchMetricbeatSource = fullStat._source?.elasticsearch?.ccr
? fullStat._source
: null;
const readExceptions =
fullLegacyStat?.ccr_stats?.read_exceptions ??
fullMbStat?.elasticsearch?.ccr?.read_exceptions ??
[];
const shardStat = {
shardId: shardBucket.key,
error: readExceptions.length ? readExceptions[0].exception?.type : null,
opsSynced: get(shardBucket, 'ops_synced.value'),
syncLagTime:
fullLegacyStat?.ccr_stats?.time_since_last_read_millis ??
fullMbStat?.elasticsearch?.ccr?.follower?.time_since_last_read?.ms,
syncLagOps: get(shardBucket, 'lag_ops.value'),
syncLagOpsLeader: get(shardBucket, 'leader_lag_ops.value'),
syncLagOpsFollower: get(shardBucket, 'follower_lag_ops.value'),
};
accum2.push(shardStat);
return accum2;
},
[]
);
return stat;
});
stat.error = (stat.shards.find((shard) => shard.error) || {}).error;
stat.opsSynced = stat.shards.reduce((sum, { opsSynced }) => sum + opsSynced, 0);
stat.syncLagTime = stat.shards.reduce(
(max, { syncLagTime }) => Math.max(max, syncLagTime),
0
);
stat.syncLagOps = stat.shards.reduce(
(max, { syncLagOps }) => Math.max(max, syncLagOps),
0
);
accum.push(stat);
return accum;
}, []);
return postElasticsearchCcrResponsePayloadRT.encode({ data });
return postElasticsearchCcrResponsePayloadRT.encode(data);
} catch (err) {
return handleError(err, req);
}

View file

@ -29,7 +29,7 @@ export default function ({ getService }) {
it('should return all followers and a grouping of stats by follower index', async () => {
const { body } = await supertest
.post('/api/monitoring/v1/clusters/YCxj-RAgSZCP6GuOQ8M1EQ/elasticsearch/ccr')
.post('/api/monitoring/v1/clusters/vX4lH4C6QmyrJeYrvKr0-A/elasticsearch/ccr')
.set('kbn-xsrf', 'xxx')
.send({
timeRange,
@ -38,5 +38,17 @@ export default function ({ getService }) {
expect(body).to.eql(ccrFixture);
});
it('should return an empty list of followers if the cluster_uuid does not have any match', async () => {
const { body } = await supertest
.post('/api/monitoring/v1/clusters/random_uuid/elasticsearch/ccr')
.set('kbn-xsrf', 'xxx')
.send({
timeRange,
})
.expect(200);
expect(body).to.eql([]);
});
});
}

View file

@ -32,7 +32,7 @@ export default function ({ getService }) {
it('should return all followers and a grouping of stats by follower index', async () => {
const { body } = await supertest
.post('/api/monitoring/v1/clusters/YCxj-RAgSZCP6GuOQ8M1EQ/elasticsearch/ccr')
.post('/api/monitoring/v1/clusters/vX4lH4C6QmyrJeYrvKr0-A/elasticsearch/ccr')
.set('kbn-xsrf', 'xxx')
.send({
timeRange,
@ -41,6 +41,18 @@ export default function ({ getService }) {
expect(body).to.eql(ccrFixture);
});
it('should return an empty list of followers if the cluster_uuid does not have any match', async () => {
const { body } = await supertest
.post('/api/monitoring/v1/clusters/random_uuid/elasticsearch/ccr')
.set('kbn-xsrf', 'xxx')
.send({
timeRange,
})
.expect(200);
expect(body).to.eql([]);
});
});
});
});

View file

@ -1,83 +1,94 @@
{
"data": [{
[
{
"id": "follower2",
"index": "follower2",
"follows": "leader2",
"shards": [{
"shardId": 0,
"error": null,
"opsSynced": 52,
"syncLagTime": 4900,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}, {
"shardId": 1,
"error": null,
"opsSynced": 47,
"syncLagTime": 9919,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}, {
"shardId": 2,
"error": null,
"opsSynced": 51,
"syncLagTime": 14929,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}, {
"shardId": 3,
"error": null,
"opsSynced": 50,
"syncLagTime": 39933,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}, {
"shardId": 4,
"error": null,
"opsSynced": 55,
"syncLagTime": 49923,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}],
"shards": [
{
"shardId": 0,
"error": null,
"opsSynced": 52,
"syncLagTime": 4900,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
},
{
"shardId": 1,
"error": null,
"opsSynced": 47,
"syncLagTime": 9919,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
},
{
"shardId": 2,
"error": null,
"opsSynced": 51,
"syncLagTime": 14929,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
},
{
"shardId": 3,
"error": null,
"opsSynced": 50,
"syncLagTime": 39933,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
},
{
"shardId": 4,
"error": null,
"opsSynced": 55,
"syncLagTime": 49923,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}
],
"opsSynced": 255,
"syncLagTime": 49923,
"syncLagOps": 0
}, {
},
{
"id": "follower",
"index": "follower",
"follows": "leader",
"shards": [{
"shardId": 0,
"error": null,
"opsSynced": 85,
"syncLagTime": 19886,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}, {
"shardId": 1,
"error": null,
"opsSynced": 94,
"syncLagTime": 4901,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}, {
"shardId": 2,
"error": null,
"opsSynced": 76,
"syncLagTime": 14899,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}],
"shards": [
{
"shardId": 0,
"error": null,
"opsSynced": 85,
"syncLagTime": 19886,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
},
{
"shardId": 1,
"error": null,
"opsSynced": 94,
"syncLagTime": 4901,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
},
{
"shardId": 2,
"error": null,
"opsSynced": 76,
"syncLagTime": 14899,
"syncLagOps": 0,
"syncLagOpsLeader": 0,
"syncLagOpsFollower": 0
}
],
"opsSynced": 255,
"syncLagTime": 19886,
"syncLagOps": 0
}]
}
}
]