Revert "[Response Ops][Task Manager] Emitting metrics when metrics are reset (#184812)

This reverts commit 557633456c.
This commit is contained in:
Ying Mao 2024-06-05 08:19:13 -04:00 committed by GitHub
parent 2570872504
commit 8eb3ef4146
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 130 additions and 223 deletions

View file

@ -7,7 +7,7 @@
import sinon from 'sinon';
import { Subject } from 'rxjs';
import { take, bufferCount } from 'rxjs';
import { take, bufferCount, skip } from 'rxjs';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import {
isTaskManagerMetricEvent,
@ -109,7 +109,13 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskClaimAggregator
.pipe(take(events.length), bufferCount(events.length))
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events.length),
bufferCount(events.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
expect(metrics[0]).toEqual({
key: 'task_claim',
@ -262,8 +268,11 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskClaimAggregator
.pipe(
take(events1.length + events2.length + 1),
bufferCount(events1.length + events2.length + 1)
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events1.length + events2.length),
bufferCount(events1.length + events2.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
expect(metrics[0]).toEqual({
@ -328,16 +337,6 @@ describe('createAggregator', () => {
});
// reset event should have been received here
expect(metrics[6]).toEqual({
key: 'task_claim',
value: {
success: 0,
total: 0,
total_errors: 0,
duration: { counts: [], values: [] },
duration_values: [],
},
});
expect(metrics[7]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -347,7 +346,7 @@ describe('createAggregator', () => {
duration_values: [10],
},
});
expect(metrics[8]).toEqual({
expect(metrics[7]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -357,7 +356,7 @@ describe('createAggregator', () => {
duration_values: [10],
},
});
expect(metrics[9]).toEqual({
expect(metrics[8]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -367,7 +366,7 @@ describe('createAggregator', () => {
duration_values: [10],
},
});
expect(metrics[10]).toEqual({
expect(metrics[9]).toEqual({
key: 'task_claim',
value: {
success: 2,
@ -377,7 +376,7 @@ describe('createAggregator', () => {
duration_values: [10, 10],
},
});
expect(metrics[11]).toEqual({
expect(metrics[10]).toEqual({
key: 'task_claim',
value: {
success: 3,
@ -436,8 +435,11 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskClaimAggregator
.pipe(
take(events1.length + events2.length + 1),
bufferCount(events1.length + events2.length + 1)
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events1.length + events2.length),
bufferCount(events1.length + events2.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
expect(metrics[0]).toEqual({
@ -502,16 +504,6 @@ describe('createAggregator', () => {
});
// reset interval should have fired here
expect(metrics[6]).toEqual({
key: 'task_claim',
value: {
success: 0,
total: 0,
total_errors: 0,
duration: { counts: [], values: [] },
duration_values: [],
},
});
expect(metrics[7]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -521,7 +513,7 @@ describe('createAggregator', () => {
duration_values: [10],
},
});
expect(metrics[8]).toEqual({
expect(metrics[7]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -531,7 +523,7 @@ describe('createAggregator', () => {
duration_values: [10],
},
});
expect(metrics[9]).toEqual({
expect(metrics[8]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -541,7 +533,7 @@ describe('createAggregator', () => {
duration_values: [10],
},
});
expect(metrics[10]).toEqual({
expect(metrics[9]).toEqual({
key: 'task_claim',
value: {
success: 2,
@ -551,7 +543,7 @@ describe('createAggregator', () => {
duration_values: [10, 10],
},
});
expect(metrics[11]).toEqual({
expect(metrics[10]).toEqual({
key: 'task_claim',
value: {
success: 3,
@ -613,22 +605,14 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskClaimAggregator
.pipe(
take(events1.length + events2.length + 3),
bufferCount(events1.length + events2.length + 3)
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events1.length + events2.length + 1),
bufferCount(events1.length + events2.length + 1)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
// reset event
expect(metrics[0]).toEqual({
key: 'task_claim',
value: {
success: 0,
total: 0,
total_errors: 0,
duration: { counts: [], values: [] },
duration_values: [],
},
});
expect(metrics[1]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -638,7 +622,7 @@ describe('createAggregator', () => {
duration_values: [10],
},
});
expect(metrics[2]).toEqual({
expect(metrics[1]).toEqual({
key: 'task_claim',
value: {
success: 2,
@ -648,7 +632,7 @@ describe('createAggregator', () => {
duration_values: [10, 10],
},
});
expect(metrics[3]).toEqual({
expect(metrics[2]).toEqual({
key: 'task_claim',
value: {
success: 3,
@ -658,7 +642,7 @@ describe('createAggregator', () => {
duration_values: [10, 10, 10],
},
});
expect(metrics[4]).toEqual({
expect(metrics[3]).toEqual({
key: 'task_claim',
value: {
success: 4,
@ -668,7 +652,7 @@ describe('createAggregator', () => {
duration_values: [10, 10, 10, 10],
},
});
expect(metrics[5]).toEqual({
expect(metrics[4]).toEqual({
key: 'task_claim',
value: {
success: 4,
@ -678,7 +662,7 @@ describe('createAggregator', () => {
duration_values: [10, 10, 10, 10],
},
});
expect(metrics[6]).toEqual({
expect(metrics[5]).toEqual({
key: 'task_claim',
value: {
success: 5,
@ -689,7 +673,7 @@ describe('createAggregator', () => {
},
});
// reset interval fired here but stats should not clear
expect(metrics[7]).toEqual({
expect(metrics[6]).toEqual({
key: 'task_claim',
value: {
success: 6,
@ -699,7 +683,7 @@ describe('createAggregator', () => {
duration_values: [10, 10, 10, 10, 10, 10],
},
});
expect(metrics[8]).toEqual({
expect(metrics[7]).toEqual({
key: 'task_claim',
value: {
success: 6,
@ -709,7 +693,7 @@ describe('createAggregator', () => {
duration_values: [10, 10, 10, 10, 10, 10],
},
});
expect(metrics[9]).toEqual({
expect(metrics[8]).toEqual({
key: 'task_claim',
value: {
success: 6,
@ -719,7 +703,7 @@ describe('createAggregator', () => {
duration_values: [10, 10, 10, 10, 10, 10],
},
});
expect(metrics[10]).toEqual({
expect(metrics[9]).toEqual({
key: 'task_claim',
value: {
success: 7,
@ -729,7 +713,7 @@ describe('createAggregator', () => {
duration_values: [10, 10, 10, 10, 10, 10, 10],
},
});
expect(metrics[11]).toEqual({
expect(metrics[10]).toEqual({
key: 'task_claim',
value: {
success: 8,
@ -740,17 +724,7 @@ describe('createAggregator', () => {
},
});
// reset interval fired here and stats should have cleared
expect(metrics[12]).toEqual({
key: 'task_claim',
value: {
success: 0,
total: 0,
total_errors: 0,
duration: { counts: [], values: [] },
duration_values: [],
},
});
expect(metrics[13]).toEqual({
expect(metrics[11]).toEqual({
key: 'task_claim',
value: {
success: 1,
@ -821,7 +795,13 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskRunAggregator
.pipe(take(taskRunEvents.length), bufferCount(taskRunEvents.length))
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(taskRunEvents.length),
bufferCount(taskRunEvents.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskRunMetric>>) => {
expect(metrics[0]).toEqual({
key: 'task_run',
@ -1844,8 +1824,11 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskRunAggregator
.pipe(
take(taskRunEvents1.length + taskRunEvents2.length + 1),
bufferCount(taskRunEvents1.length + taskRunEvents2.length + 1)
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(taskRunEvents1.length + taskRunEvents2.length),
bufferCount(taskRunEvents1.length + taskRunEvents2.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskRunMetric>>) => {
expect(metrics[0]).toEqual({
@ -2242,55 +2225,6 @@ describe('createAggregator', () => {
});
// reset event should have been received here
expect(metrics[10]).toEqual({
key: 'task_run',
value: {
overall: {
success: 0,
not_timed_out: 0,
total: 0,
delay: { counts: [], values: [] },
delay_values: [],
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
by_type: {
alerting: {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
'alerting:example': {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
report: {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
telemetry: {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
},
},
});
expect(metrics[11]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2339,7 +2273,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[12]).toEqual({
expect(metrics[11]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2388,7 +2322,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[13]).toEqual({
expect(metrics[12]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2437,7 +2371,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[14]).toEqual({
expect(metrics[13]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2486,7 +2420,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[15]).toEqual({
expect(metrics[14]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2535,7 +2469,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[16]).toEqual({
expect(metrics[15]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2584,7 +2518,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[17]).toEqual({
expect(metrics[16]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2633,7 +2567,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[18]).toEqual({
expect(metrics[17]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2682,7 +2616,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[19]).toEqual({
expect(metrics[18]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2731,7 +2665,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[20]).toEqual({
expect(metrics[19]).toEqual({
key: 'task_run',
value: {
overall: {
@ -2855,8 +2789,11 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskRunAggregator
.pipe(
take(taskRunEvents1.length + taskRunEvents2.length + 1),
bufferCount(taskRunEvents1.length + taskRunEvents2.length + 1)
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(taskRunEvents1.length + taskRunEvents2.length),
bufferCount(taskRunEvents1.length + taskRunEvents2.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskRunMetric>>) => {
expect(metrics[0]).toEqual({
@ -3253,55 +3190,6 @@ describe('createAggregator', () => {
});
// reset event should have been received here
expect(metrics[10]).toEqual({
key: 'task_run',
value: {
overall: {
success: 0,
not_timed_out: 0,
total: 0,
delay: { counts: [], values: [] },
delay_values: [],
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
by_type: {
alerting: {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
'alerting:example': {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
report: {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
telemetry: {
success: 0,
not_timed_out: 0,
total: 0,
framework_errors: 0,
user_errors: 0,
total_errors: 0,
},
},
},
});
expect(metrics[11]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3350,7 +3238,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[12]).toEqual({
expect(metrics[11]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3399,7 +3287,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[13]).toEqual({
expect(metrics[12]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3448,7 +3336,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[14]).toEqual({
expect(metrics[13]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3497,7 +3385,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[15]).toEqual({
expect(metrics[14]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3546,7 +3434,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[16]).toEqual({
expect(metrics[15]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3595,7 +3483,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[17]).toEqual({
expect(metrics[16]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3644,7 +3532,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[18]).toEqual({
expect(metrics[17]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3693,7 +3581,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[19]).toEqual({
expect(metrics[18]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3742,7 +3630,7 @@ describe('createAggregator', () => {
},
},
});
expect(metrics[20]).toEqual({
expect(metrics[19]).toEqual({
key: 'task_run',
value: {
overall: {
@ -3883,7 +3771,13 @@ describe('createAggregator', () => {
return new Promise<void>((resolve) => {
taskOverdueAggregator
.pipe(take(events.length), bufferCount(events.length))
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events.length),
bufferCount(events.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskOverdueMetric>>) => {
expect(metrics[0]).toEqual({
key: 'task_overdue',
@ -4039,9 +3933,17 @@ describe('createAggregator', () => {
});
return new Promise<void>((resolve) => {
aggregator.pipe(take(events.length), bufferCount(events.length)).subscribe(() => {
resolve();
});
aggregator
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events.length),
bufferCount(events.length)
)
.subscribe(() => {
resolve();
});
for (const event of events) {
events$.next(event);
@ -4082,9 +3984,17 @@ describe('createAggregator', () => {
});
return new Promise<void>((resolve) => {
aggregator.pipe(take(events.length), bufferCount(events.length)).subscribe(() => {
resolve();
});
aggregator
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events.length),
bufferCount(events.length)
)
.subscribe(() => {
resolve();
});
for (const event of events) {
events$.next(event);
@ -4130,9 +4040,17 @@ describe('createAggregator', () => {
});
return new Promise<void>((resolve) => {
aggregator.pipe(take(events.length + 1), bufferCount(events.length + 1)).subscribe(() => {
resolve();
});
aggregator
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events.length),
bufferCount(events.length)
)
.subscribe(() => {
resolve();
});
for (const event of events) {
events$.next(event);

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { filter, interval, map, merge, Observable } from 'rxjs';
import { combineLatest, filter, interval, map, merge, Observable, startWith } from 'rxjs';
import { JsonValue } from '@kbn/utility-types';
import { Logger } from '@kbn/core/server';
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
@ -32,12 +32,11 @@ export function createAggregator<T extends JsonValue>({
eventFilter,
metricsAggregator,
}: CreateMetricsAggregatorOpts<T>): AggregatedStatProvider<T> {
let taskResetEvent$: Observable<T> | undefined;
if (reset$) {
let lastResetTime: Date = new Date();
// Resets the aggregators either when the reset interval has passed or
// a reset$ event is received
taskResetEvent$ = merge(
merge(
interval(config.metrics_reset_interval).pipe(
map(() => {
if (intervalHasPassedSince(lastResetTime, config.metrics_reset_interval)) {
@ -63,13 +62,11 @@ export function createAggregator<T extends JsonValue>({
return true;
})
)
).pipe(
filter((shouldReset: boolean) => shouldReset),
map(() => {
).subscribe((shouldReset: boolean) => {
if (shouldReset) {
metricsAggregator.reset();
return metricsAggregator.collect();
})
);
}
});
}
const taskEvents$: Observable<T> = events$.pipe(
@ -80,13 +77,8 @@ export function createAggregator<T extends JsonValue>({
})
);
const observablesToMerge: Array<Observable<T>> = [taskEvents$];
if (taskResetEvent$) {
observablesToMerge.push(taskResetEvent$);
}
return merge(...observablesToMerge).pipe(
map((value: T) => {
return combineLatest([taskEvents$.pipe(startWith(metricsAggregator.initialMetric()))]).pipe(
map(([value]: [T]) => {
return {
key,
value,

View file

@ -133,8 +133,8 @@ export default function ({ getService }: FtrProviderContext) {
expect(metrics?.task_claim).not.to.be(null);
expect(metrics?.task_claim?.value).not.to.be(null);
expect(metrics?.task_claim?.value.success).to.equal(0);
expect(metrics?.task_claim?.value.total).to.equal(0);
expect(metrics?.task_claim?.value.success).to.equal(1);
expect(metrics?.task_claim?.value.total).to.equal(1);
previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!;
@ -264,10 +264,7 @@ export default function ({ getService }: FtrProviderContext) {
.expect(200);
const metrics = (
await getMetrics(
false,
(m) => m?.metrics?.task_run?.value.overall.framework_errors! === 1
)
await getMetrics(true, (m) => m?.metrics?.task_run?.value.overall.framework_errors! === 1)
).metrics;
const total = metrics?.task_run?.value.overall.total || 0;
@ -305,13 +302,13 @@ export default function ({ getService }: FtrProviderContext) {
.expect(200);
const metrics = (
await getMetrics(false, (m) => m?.metrics?.task_run?.value.overall.user_errors! === 1)
await getMetrics(true, (m) => m?.metrics?.task_run?.value.overall.user_errors! === 1)
).metrics;
const total = metrics?.task_run?.value.overall.total || 0;
const success = metrics?.task_run?.value.overall.success || 0;
expect(total - success).to.be(2);
expect(total - success).to.be(1);
});
});