mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Response Ops][Task Manager] Emitting metrics when metrics are reset (#184592)
## Summary This updates the task manager metrics aggregator to collect and emit metrics when a `reset$` event is observed. The `/api/task_manager/metrics` route subscribes to and saves the latest task manager metrics and immediately returns the latest metrics when the API is accessed. At a minimum, metrics are collected and emitted at every polling interval (every 3 seconds). Usually emission is more frequent than this because we emit metrics events every time a task run completes. Under normal circumstances, when the agent is configured to collect from the API once every 10 seconds, this is what happens ``` 00:00:00 metrics$.subscribe(({errors: 3}) => lastMetrics = metrics) - metrics emitted and saved 00:00:03 metrics$.subscribe(({errors: 4}) => lastMetrics = metrics) - metrics emitted and saved 00:00:05 API called with reset=true, return lastMetrics, metrics reset to 0 00:00:06 metrics$.subscribe(({errors: 1}) => lastMetrics = metrics) - metrics emitted and saved 00:00:09 metrics$.subscribe(({errors: 2}) => lastMetrics = metrics) - metrics emitted and saved 00:00:10 API called with reset=true, return lastMetrics, metrics reset to 0 ``` We can see that the metrics are reset and then by the time the next collection interval comes around, fresh metrics have been emitted. We currently have an issue where the API is collected against twice in quick succession. Most of the time, this leads to duplicate metrics being collected. ``` 00:00:00:00 metrics$.subscribe(({errors: 3}) => lastMetrics = metrics) - metrics emitted and saved 00:00:03:00 metrics$.subscribe(({errors: 4}) => lastMetrics = metrics) - metrics emitted and saved 00:00:05:00 API called with reset=true, return lastMetrics, metrics reset to 0 00:00:05:01 API called with reset=true, return lastMetrics, metrics reset to 0 - this is a duplicate 00:00:06:00 metrics$.subscribe(({errors: 1}) => lastMetrics = metrics) - metrics emitted and saved 00:00:09:00 metrics$.subscribe(({errors: 2}) => lastMetrics = metrics) - metrics emitted and saved ``` However sometimes, this leads to a race condition that leads to different metrics being collected. ``` 00:00:00:00 metrics$.subscribe(({errors: 3}) => lastMetrics = metrics) - metrics emitted and saved 00:00:03:00 metrics$.subscribe(({errors: 4}) => lastMetrics = metrics) - metrics emitted and saved 00:00:05:00 API called with reset=true, return lastMetrics, metrics reset to 0 00:00:05:01 metrics$.subscribe(({errors: 1}) => lastMetrics = metrics) - metrics emitted and saved 00:00:05:02 API called with reset=true, return lastMetrics, metrics reset to 0 00:00:06:00 metrics$.subscribe(({errors: 1}) => lastMetrics = metrics) - metrics emitted and saved 00:00:09:00 metrics$.subscribe(({errors: 2}) => lastMetrics = metrics) - metrics emitted and saved ``` With this PR, on every reset, we'll re-emit the metrics so so even in the face of the duplicate collection, we won't be emitting duplicate metrics. After this is deployed, we should not need to exclude `kubernetes.container.name :"elastic-internal-init-config"` from the dashboards ``` 00:00:00:00 metrics$.subscribe(({errors: 3}) => lastMetrics = metrics) - metrics emitted and saved 00:00:03:00 metrics$.subscribe(({errors: 4}) => lastMetrics = metrics) - metrics emitted and saved 00:00:05:00 API called with reset=true, return lastMetrics, metrics reset to 0 00:00:05:00 metrics$.subscribe(({errors: 0}) => lastMetrics = metrics) - metrics emitted and saved 00:00:05:01 API called with reset=true, return lastMetrics, metrics reset to 0 00:00:05:01 metrics$.subscribe(({errors: 0}) => lastMetrics = metrics) - metrics emitted and saved 00:00:06:00 metrics$.subscribe(({errors: 1}) => lastMetrics = metrics) - metrics emitted and saved 00:00:09:00 metrics$.subscribe(({errors: 2}) => lastMetrics = metrics) - metrics emitted and saved ```
This commit is contained in:
parent
bd25609eb4
commit
557633456c
3 changed files with 223 additions and 130 deletions
|
@ -7,7 +7,7 @@
|
|||
|
||||
import sinon from 'sinon';
|
||||
import { Subject } from 'rxjs';
|
||||
import { take, bufferCount, skip } from 'rxjs';
|
||||
import { take, bufferCount } from 'rxjs';
|
||||
import { loggingSystemMock } from '@kbn/core/server/mocks';
|
||||
import {
|
||||
isTaskManagerMetricEvent,
|
||||
|
@ -109,13 +109,7 @@ describe('createAggregator', () => {
|
|||
|
||||
return new Promise<void>((resolve) => {
|
||||
taskClaimAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events.length),
|
||||
bufferCount(events.length)
|
||||
)
|
||||
.pipe(take(events.length), bufferCount(events.length))
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
|
||||
expect(metrics[0]).toEqual({
|
||||
key: 'task_claim',
|
||||
|
@ -268,11 +262,8 @@ describe('createAggregator', () => {
|
|||
return new Promise<void>((resolve) => {
|
||||
taskClaimAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events1.length + events2.length),
|
||||
bufferCount(events1.length + events2.length)
|
||||
take(events1.length + events2.length + 1),
|
||||
bufferCount(events1.length + events2.length + 1)
|
||||
)
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
|
||||
expect(metrics[0]).toEqual({
|
||||
|
@ -337,6 +328,16 @@ describe('createAggregator', () => {
|
|||
});
|
||||
// reset event should have been received here
|
||||
expect(metrics[6]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 0,
|
||||
total: 0,
|
||||
total_errors: 0,
|
||||
duration: { counts: [], values: [] },
|
||||
duration_values: [],
|
||||
},
|
||||
});
|
||||
expect(metrics[7]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -346,7 +347,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10],
|
||||
},
|
||||
});
|
||||
expect(metrics[7]).toEqual({
|
||||
expect(metrics[8]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -356,7 +357,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10],
|
||||
},
|
||||
});
|
||||
expect(metrics[8]).toEqual({
|
||||
expect(metrics[9]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -366,7 +367,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10],
|
||||
},
|
||||
});
|
||||
expect(metrics[9]).toEqual({
|
||||
expect(metrics[10]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 2,
|
||||
|
@ -376,7 +377,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[10]).toEqual({
|
||||
expect(metrics[11]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 3,
|
||||
|
@ -435,11 +436,8 @@ describe('createAggregator', () => {
|
|||
return new Promise<void>((resolve) => {
|
||||
taskClaimAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events1.length + events2.length),
|
||||
bufferCount(events1.length + events2.length)
|
||||
take(events1.length + events2.length + 1),
|
||||
bufferCount(events1.length + events2.length + 1)
|
||||
)
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
|
||||
expect(metrics[0]).toEqual({
|
||||
|
@ -504,6 +502,16 @@ describe('createAggregator', () => {
|
|||
});
|
||||
// reset interval should have fired here
|
||||
expect(metrics[6]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 0,
|
||||
total: 0,
|
||||
total_errors: 0,
|
||||
duration: { counts: [], values: [] },
|
||||
duration_values: [],
|
||||
},
|
||||
});
|
||||
expect(metrics[7]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -513,7 +521,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10],
|
||||
},
|
||||
});
|
||||
expect(metrics[7]).toEqual({
|
||||
expect(metrics[8]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -523,7 +531,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10],
|
||||
},
|
||||
});
|
||||
expect(metrics[8]).toEqual({
|
||||
expect(metrics[9]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -533,7 +541,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10],
|
||||
},
|
||||
});
|
||||
expect(metrics[9]).toEqual({
|
||||
expect(metrics[10]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 2,
|
||||
|
@ -543,7 +551,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[10]).toEqual({
|
||||
expect(metrics[11]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 3,
|
||||
|
@ -605,14 +613,22 @@ describe('createAggregator', () => {
|
|||
return new Promise<void>((resolve) => {
|
||||
taskClaimAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events1.length + events2.length + 1),
|
||||
bufferCount(events1.length + events2.length + 1)
|
||||
take(events1.length + events2.length + 3),
|
||||
bufferCount(events1.length + events2.length + 3)
|
||||
)
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
|
||||
// reset event
|
||||
expect(metrics[0]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 0,
|
||||
total: 0,
|
||||
total_errors: 0,
|
||||
duration: { counts: [], values: [] },
|
||||
duration_values: [],
|
||||
},
|
||||
});
|
||||
expect(metrics[1]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -622,7 +638,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10],
|
||||
},
|
||||
});
|
||||
expect(metrics[1]).toEqual({
|
||||
expect(metrics[2]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 2,
|
||||
|
@ -632,7 +648,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[2]).toEqual({
|
||||
expect(metrics[3]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 3,
|
||||
|
@ -642,7 +658,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[3]).toEqual({
|
||||
expect(metrics[4]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 4,
|
||||
|
@ -652,7 +668,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10, 10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[4]).toEqual({
|
||||
expect(metrics[5]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 4,
|
||||
|
@ -662,7 +678,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10, 10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[5]).toEqual({
|
||||
expect(metrics[6]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 5,
|
||||
|
@ -673,7 +689,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
});
|
||||
// reset interval fired here but stats should not clear
|
||||
expect(metrics[6]).toEqual({
|
||||
expect(metrics[7]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 6,
|
||||
|
@ -683,7 +699,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10, 10, 10, 10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[7]).toEqual({
|
||||
expect(metrics[8]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 6,
|
||||
|
@ -693,7 +709,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10, 10, 10, 10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[8]).toEqual({
|
||||
expect(metrics[9]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 6,
|
||||
|
@ -703,7 +719,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10, 10, 10, 10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[9]).toEqual({
|
||||
expect(metrics[10]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 7,
|
||||
|
@ -713,7 +729,7 @@ describe('createAggregator', () => {
|
|||
duration_values: [10, 10, 10, 10, 10, 10, 10],
|
||||
},
|
||||
});
|
||||
expect(metrics[10]).toEqual({
|
||||
expect(metrics[11]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 8,
|
||||
|
@ -724,7 +740,17 @@ describe('createAggregator', () => {
|
|||
},
|
||||
});
|
||||
// reset interval fired here and stats should have cleared
|
||||
expect(metrics[11]).toEqual({
|
||||
expect(metrics[12]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 0,
|
||||
total: 0,
|
||||
total_errors: 0,
|
||||
duration: { counts: [], values: [] },
|
||||
duration_values: [],
|
||||
},
|
||||
});
|
||||
expect(metrics[13]).toEqual({
|
||||
key: 'task_claim',
|
||||
value: {
|
||||
success: 1,
|
||||
|
@ -795,13 +821,7 @@ describe('createAggregator', () => {
|
|||
|
||||
return new Promise<void>((resolve) => {
|
||||
taskRunAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(taskRunEvents.length),
|
||||
bufferCount(taskRunEvents.length)
|
||||
)
|
||||
.pipe(take(taskRunEvents.length), bufferCount(taskRunEvents.length))
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskRunMetric>>) => {
|
||||
expect(metrics[0]).toEqual({
|
||||
key: 'task_run',
|
||||
|
@ -1824,11 +1844,8 @@ describe('createAggregator', () => {
|
|||
return new Promise<void>((resolve) => {
|
||||
taskRunAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(taskRunEvents1.length + taskRunEvents2.length),
|
||||
bufferCount(taskRunEvents1.length + taskRunEvents2.length)
|
||||
take(taskRunEvents1.length + taskRunEvents2.length + 1),
|
||||
bufferCount(taskRunEvents1.length + taskRunEvents2.length + 1)
|
||||
)
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskRunMetric>>) => {
|
||||
expect(metrics[0]).toEqual({
|
||||
|
@ -2225,6 +2242,55 @@ describe('createAggregator', () => {
|
|||
});
|
||||
// reset event should have been received here
|
||||
expect(metrics[10]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
delay: { counts: [], values: [] },
|
||||
delay_values: [],
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
by_type: {
|
||||
alerting: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
'alerting:example': {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
report: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
telemetry: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[11]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2273,7 +2339,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[11]).toEqual({
|
||||
expect(metrics[12]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2322,7 +2388,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[12]).toEqual({
|
||||
expect(metrics[13]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2371,7 +2437,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[13]).toEqual({
|
||||
expect(metrics[14]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2420,7 +2486,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[14]).toEqual({
|
||||
expect(metrics[15]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2469,7 +2535,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[15]).toEqual({
|
||||
expect(metrics[16]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2518,7 +2584,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[16]).toEqual({
|
||||
expect(metrics[17]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2567,7 +2633,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[17]).toEqual({
|
||||
expect(metrics[18]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2616,7 +2682,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[18]).toEqual({
|
||||
expect(metrics[19]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2665,7 +2731,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[19]).toEqual({
|
||||
expect(metrics[20]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -2789,11 +2855,8 @@ describe('createAggregator', () => {
|
|||
return new Promise<void>((resolve) => {
|
||||
taskRunAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(taskRunEvents1.length + taskRunEvents2.length),
|
||||
bufferCount(taskRunEvents1.length + taskRunEvents2.length)
|
||||
take(taskRunEvents1.length + taskRunEvents2.length + 1),
|
||||
bufferCount(taskRunEvents1.length + taskRunEvents2.length + 1)
|
||||
)
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskRunMetric>>) => {
|
||||
expect(metrics[0]).toEqual({
|
||||
|
@ -3190,6 +3253,55 @@ describe('createAggregator', () => {
|
|||
});
|
||||
// reset event should have been received here
|
||||
expect(metrics[10]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
delay: { counts: [], values: [] },
|
||||
delay_values: [],
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
by_type: {
|
||||
alerting: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
'alerting:example': {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
report: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
telemetry: {
|
||||
success: 0,
|
||||
not_timed_out: 0,
|
||||
total: 0,
|
||||
framework_errors: 0,
|
||||
user_errors: 0,
|
||||
total_errors: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[11]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3238,7 +3350,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[11]).toEqual({
|
||||
expect(metrics[12]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3287,7 +3399,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[12]).toEqual({
|
||||
expect(metrics[13]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3336,7 +3448,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[13]).toEqual({
|
||||
expect(metrics[14]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3385,7 +3497,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[14]).toEqual({
|
||||
expect(metrics[15]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3434,7 +3546,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[15]).toEqual({
|
||||
expect(metrics[16]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3483,7 +3595,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[16]).toEqual({
|
||||
expect(metrics[17]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3532,7 +3644,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[17]).toEqual({
|
||||
expect(metrics[18]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3581,7 +3693,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[18]).toEqual({
|
||||
expect(metrics[19]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3630,7 +3742,7 @@ describe('createAggregator', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
expect(metrics[19]).toEqual({
|
||||
expect(metrics[20]).toEqual({
|
||||
key: 'task_run',
|
||||
value: {
|
||||
overall: {
|
||||
|
@ -3771,13 +3883,7 @@ describe('createAggregator', () => {
|
|||
|
||||
return new Promise<void>((resolve) => {
|
||||
taskOverdueAggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events.length),
|
||||
bufferCount(events.length)
|
||||
)
|
||||
.pipe(take(events.length), bufferCount(events.length))
|
||||
.subscribe((metrics: Array<AggregatedStat<TaskOverdueMetric>>) => {
|
||||
expect(metrics[0]).toEqual({
|
||||
key: 'task_overdue',
|
||||
|
@ -3933,17 +4039,9 @@ describe('createAggregator', () => {
|
|||
});
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
aggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events.length),
|
||||
bufferCount(events.length)
|
||||
)
|
||||
.subscribe(() => {
|
||||
resolve();
|
||||
});
|
||||
aggregator.pipe(take(events.length), bufferCount(events.length)).subscribe(() => {
|
||||
resolve();
|
||||
});
|
||||
|
||||
for (const event of events) {
|
||||
events$.next(event);
|
||||
|
@ -3984,17 +4082,9 @@ describe('createAggregator', () => {
|
|||
});
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
aggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events.length),
|
||||
bufferCount(events.length)
|
||||
)
|
||||
.subscribe(() => {
|
||||
resolve();
|
||||
});
|
||||
aggregator.pipe(take(events.length), bufferCount(events.length)).subscribe(() => {
|
||||
resolve();
|
||||
});
|
||||
|
||||
for (const event of events) {
|
||||
events$.next(event);
|
||||
|
@ -4040,17 +4130,9 @@ describe('createAggregator', () => {
|
|||
});
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
aggregator
|
||||
.pipe(
|
||||
// skip initial metric which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
take(events.length),
|
||||
bufferCount(events.length)
|
||||
)
|
||||
.subscribe(() => {
|
||||
resolve();
|
||||
});
|
||||
aggregator.pipe(take(events.length + 1), bufferCount(events.length + 1)).subscribe(() => {
|
||||
resolve();
|
||||
});
|
||||
|
||||
for (const event of events) {
|
||||
events$.next(event);
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { combineLatest, filter, interval, map, merge, Observable, startWith } from 'rxjs';
|
||||
import { filter, interval, map, merge, Observable } from 'rxjs';
|
||||
import { JsonValue } from '@kbn/utility-types';
|
||||
import { Logger } from '@kbn/core/server';
|
||||
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
|
||||
|
@ -32,11 +32,12 @@ export function createAggregator<T extends JsonValue>({
|
|||
eventFilter,
|
||||
metricsAggregator,
|
||||
}: CreateMetricsAggregatorOpts<T>): AggregatedStatProvider<T> {
|
||||
let taskResetEvent$: Observable<T> | undefined;
|
||||
if (reset$) {
|
||||
let lastResetTime: Date = new Date();
|
||||
// Resets the aggregators either when the reset interval has passed or
|
||||
// a reset$ event is received
|
||||
merge(
|
||||
taskResetEvent$ = merge(
|
||||
interval(config.metrics_reset_interval).pipe(
|
||||
map(() => {
|
||||
if (intervalHasPassedSince(lastResetTime, config.metrics_reset_interval)) {
|
||||
|
@ -62,11 +63,13 @@ export function createAggregator<T extends JsonValue>({
|
|||
return true;
|
||||
})
|
||||
)
|
||||
).subscribe((shouldReset: boolean) => {
|
||||
if (shouldReset) {
|
||||
).pipe(
|
||||
filter((shouldReset: boolean) => shouldReset),
|
||||
map(() => {
|
||||
metricsAggregator.reset();
|
||||
}
|
||||
});
|
||||
return metricsAggregator.collect();
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
const taskEvents$: Observable<T> = events$.pipe(
|
||||
|
@ -77,8 +80,13 @@ export function createAggregator<T extends JsonValue>({
|
|||
})
|
||||
);
|
||||
|
||||
return combineLatest([taskEvents$.pipe(startWith(metricsAggregator.initialMetric()))]).pipe(
|
||||
map(([value]: [T]) => {
|
||||
const observablesToMerge: Array<Observable<T>> = [taskEvents$];
|
||||
if (taskResetEvent$) {
|
||||
observablesToMerge.push(taskResetEvent$);
|
||||
}
|
||||
|
||||
return merge(...observablesToMerge).pipe(
|
||||
map((value: T) => {
|
||||
return {
|
||||
key,
|
||||
value,
|
||||
|
|
|
@ -133,8 +133,8 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
expect(metrics?.task_claim).not.to.be(null);
|
||||
expect(metrics?.task_claim?.value).not.to.be(null);
|
||||
|
||||
expect(metrics?.task_claim?.value.success).to.equal(1);
|
||||
expect(metrics?.task_claim?.value.total).to.equal(1);
|
||||
expect(metrics?.task_claim?.value.success).to.equal(0);
|
||||
expect(metrics?.task_claim?.value.total).to.equal(0);
|
||||
|
||||
previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!;
|
||||
|
||||
|
@ -264,7 +264,10 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
.expect(200);
|
||||
|
||||
const metrics = (
|
||||
await getMetrics(true, (m) => m?.metrics?.task_run?.value.overall.framework_errors! === 1)
|
||||
await getMetrics(
|
||||
false,
|
||||
(m) => m?.metrics?.task_run?.value.overall.framework_errors! === 1
|
||||
)
|
||||
).metrics;
|
||||
|
||||
const total = metrics?.task_run?.value.overall.total || 0;
|
||||
|
@ -302,13 +305,13 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
.expect(200);
|
||||
|
||||
const metrics = (
|
||||
await getMetrics(true, (m) => m?.metrics?.task_run?.value.overall.user_errors! === 1)
|
||||
await getMetrics(false, (m) => m?.metrics?.task_run?.value.overall.user_errors! === 1)
|
||||
).metrics;
|
||||
|
||||
const total = metrics?.task_run?.value.overall.total || 0;
|
||||
const success = metrics?.task_run?.value.overall.success || 0;
|
||||
|
||||
expect(total - success).to.be(1);
|
||||
expect(total - success).to.be(2);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue