[Response Ops][Alerting] SLIs Phase 2 - Histogram for task overdue by (#166643)

Towards https://github.com/elastic/response-ops-team/issues/130

## Summary

This implements the third of 3 SLIs described in
https://github.com/elastic/response-ops-team/issues/130 - the histogram
for tracking task overdue time in seconds. We bucket up to 30 minutes.
This implementation is slightly different than what was originally
described for this SLI, which was number of overdue tasks. We changed to
track a histogram of number of seconds overdue because this will provide
a better metric against which to set an SLO.

## To Verify

Run Kibana and create some alerting rules. Navigate to
https://localhost:5601/api/task_manager/metrics?reset=false and you
should see that the new metric under `task_overdue.value`. There should
be an overall task delay histogram and a histogram for each task type,
with groupings for the alerting and action task types

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2023-09-20 09:42:45 -04:00 committed by GitHub
parent 27056ca66c
commit d3a289a607
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1448 additions and 186 deletions

View file

@ -6,12 +6,16 @@
*/
import sinon from 'sinon';
import { Subject, Observable } from 'rxjs';
import { Subject } from 'rxjs';
import { take, bufferCount, skip } from 'rxjs/operators';
import { isTaskManagerStatEvent, isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events';
import {
isTaskManagerMetricEvent,
isTaskManagerStatEvent,
isTaskPollingCycleEvent,
isTaskRunEvent,
} from '../task_events';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { AggregatedStat } from '../lib/runtime_statistics_aggregator';
import { taskPollingLifecycleMock } from '../polling_lifecycle.mock';
import { TaskManagerConfig } from '../config';
import { createAggregator } from './create_aggregator';
import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator';
@ -24,6 +28,8 @@ import {
import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator';
import * as TaskClaimMetricsAggregatorModule from './task_claim_metrics_aggregator';
import { metricsAggregatorMock } from './metrics_aggregator.mock';
import { getTaskManagerMetricEvent } from './task_overdue_metrics_aggregator.test';
import { TaskOverdueMetric, TaskOverdueMetricsAggregator } from './task_overdue_metrics_aggregator';
const mockMetricsAggregator = metricsAggregatorMock.create();
const config: TaskManagerConfig = {
@ -76,7 +82,7 @@ describe('createAggregator', () => {
describe('with TaskClaimMetricsAggregator', () => {
test('returns a cumulative count of successful polling cycles and total polling cycles', async () => {
const pollingCycleEvents = [
const events = [
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
@ -90,16 +96,13 @@ describe('createAggregator', () => {
taskClaimSuccessEvent,
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const taskClaimAggregator = createAggregator({
key: 'task_claim',
taskPollingLifecycle,
events$,
config,
resetMetrics$: new Subject<boolean>(),
taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent),
reset$: new Subject<boolean>(),
eventFilter: (event: TaskLifecycleEvent) => isTaskPollingCycleEvent(event),
metricsAggregator: new TaskClaimMetricsAggregator(),
});
@ -109,8 +112,8 @@ describe('createAggregator', () => {
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(pollingCycleEvents.length),
bufferCount(pollingCycleEvents.length)
take(events.length),
bufferCount(events.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
expect(metrics[0]).toEqual({
@ -160,15 +163,15 @@ describe('createAggregator', () => {
resolve();
});
for (const event of pollingCycleEvents) {
for (const event of events) {
events$.next(event);
}
});
});
test('resets count when resetMetric$ event is received', async () => {
const resetMetrics$ = new Subject<boolean>();
const pollingCycleEvents1 = [
test('resets count when reset$ event is received', async () => {
const reset$ = new Subject<boolean>();
const events1 = [
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
@ -177,7 +180,7 @@ describe('createAggregator', () => {
taskClaimSuccessEvent,
];
const pollingCycleEvents2 = [
const events2 = [
taskClaimSuccessEvent,
taskClaimFailureEvent,
taskClaimFailureEvent,
@ -185,16 +188,13 @@ describe('createAggregator', () => {
taskClaimSuccessEvent,
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const taskClaimAggregator = createAggregator({
key: 'task_claim',
taskPollingLifecycle,
events$,
config,
resetMetrics$,
taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent),
reset$,
eventFilter: (event: TaskLifecycleEvent) => isTaskPollingCycleEvent(event),
metricsAggregator: new TaskClaimMetricsAggregator(),
});
@ -204,8 +204,8 @@ describe('createAggregator', () => {
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(pollingCycleEvents1.length + pollingCycleEvents2.length),
bufferCount(pollingCycleEvents1.length + pollingCycleEvents2.length)
take(events1.length + events2.length),
bufferCount(events1.length + events2.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
expect(metrics[0]).toEqual({
@ -256,11 +256,11 @@ describe('createAggregator', () => {
resolve();
});
for (const event of pollingCycleEvents1) {
for (const event of events1) {
events$.next(event);
}
resetMetrics$.next(true);
for (const event of pollingCycleEvents2) {
reset$.next(true);
for (const event of events2) {
events$.next(event);
}
});
@ -269,7 +269,7 @@ describe('createAggregator', () => {
test('resets count when configured metrics reset interval expires', async () => {
const clock = sinon.useFakeTimers();
clock.tick(0);
const pollingCycleEvents1 = [
const events1 = [
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
@ -278,7 +278,7 @@ describe('createAggregator', () => {
taskClaimSuccessEvent,
];
const pollingCycleEvents2 = [
const events2 = [
taskClaimSuccessEvent,
taskClaimFailureEvent,
taskClaimFailureEvent,
@ -286,19 +286,16 @@ describe('createAggregator', () => {
taskClaimSuccessEvent,
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const taskClaimAggregator = createAggregator({
key: 'task_claim',
taskPollingLifecycle,
events$,
config: {
...config,
metrics_reset_interval: 10,
},
resetMetrics$: new Subject<boolean>(),
taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent),
reset$: new Subject<boolean>(),
eventFilter: (event: TaskLifecycleEvent) => isTaskPollingCycleEvent(event),
metricsAggregator: new TaskClaimMetricsAggregator(),
});
@ -308,8 +305,8 @@ describe('createAggregator', () => {
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(pollingCycleEvents1.length + pollingCycleEvents2.length),
bufferCount(pollingCycleEvents1.length + pollingCycleEvents2.length)
take(events1.length + events2.length),
bufferCount(events1.length + events2.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
expect(metrics[0]).toEqual({
@ -360,11 +357,11 @@ describe('createAggregator', () => {
resolve();
});
for (const event of pollingCycleEvents1) {
for (const event of events1) {
events$.next(event);
}
clock.tick(20);
for (const event of pollingCycleEvents2) {
for (const event of events2) {
events$.next(event);
}
@ -398,17 +395,14 @@ describe('createAggregator', () => {
getTaskRunFailedEvent('actions:webhook'),
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const taskRunAggregator = createAggregator({
key: 'task_run',
taskPollingLifecycle,
events$,
config,
resetMetrics$: new Subject<boolean>(),
taskEventFilter: (taskEvent: TaskLifecycleEvent) =>
isTaskRunEvent(taskEvent) || isTaskManagerStatEvent(taskEvent),
reset$: new Subject<boolean>(),
eventFilter: (event: TaskLifecycleEvent) =>
isTaskRunEvent(event) || isTaskManagerStatEvent(event),
metricsAggregator: new TaskRunMetricsAggregator(),
});
@ -769,7 +763,7 @@ describe('createAggregator', () => {
});
test('resets count when resetMetric$ event is received', async () => {
const resetMetrics$ = new Subject<boolean>();
const reset$ = new Subject<boolean>();
const taskRunEvents1 = [
getTaskManagerStatEvent(3.234),
getTaskRunSuccessEvent('alerting:example'),
@ -796,17 +790,14 @@ describe('createAggregator', () => {
getTaskRunFailedEvent('actions:webhook'),
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const taskRunAggregator = createAggregator({
key: 'task_run',
taskPollingLifecycle,
events$,
config,
resetMetrics$,
taskEventFilter: (taskEvent: TaskLifecycleEvent) =>
isTaskRunEvent(taskEvent) || isTaskManagerStatEvent(taskEvent),
reset$,
eventFilter: (event: TaskLifecycleEvent) =>
isTaskRunEvent(event) || isTaskManagerStatEvent(event),
metricsAggregator: new TaskRunMetricsAggregator(),
});
@ -1155,7 +1146,7 @@ describe('createAggregator', () => {
for (const event of taskRunEvents1) {
events$.next(event);
}
resetMetrics$.next(true);
reset$.next(true);
for (const event of taskRunEvents2) {
events$.next(event);
}
@ -1191,20 +1182,17 @@ describe('createAggregator', () => {
getTaskRunFailedEvent('actions:webhook'),
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const taskRunAggregator = createAggregator({
key: 'task_run',
taskPollingLifecycle,
events$,
config: {
...config,
metrics_reset_interval: 10,
},
resetMetrics$: new Subject<boolean>(),
taskEventFilter: (taskEvent: TaskLifecycleEvent) =>
isTaskRunEvent(taskEvent) || isTaskManagerStatEvent(taskEvent),
reset$: new Subject<boolean>(),
eventFilter: (event: TaskLifecycleEvent) =>
isTaskRunEvent(event) || isTaskManagerStatEvent(event),
metricsAggregator: new TaskRunMetricsAggregator(),
});
@ -1563,8 +1551,185 @@ describe('createAggregator', () => {
});
});
test('should filter task lifecycle events using specified taskEventFilter', () => {
const pollingCycleEvents = [
describe('with TaskOverdueMetricsAggregator', () => {
test('returns latest values for task overdue by time', async () => {
const events = [
getTaskManagerMetricEvent({
numOverdueTasks: {
'alerting:example': [{ key: 40, doc_count: 1 }],
'alerting:.index-threshold': [
{ key: 20, doc_count: 2 },
{ key: 120, doc_count: 1 },
],
'actions:webhook': [{ key: 0, doc_count: 2 }],
'actions:.email': [{ key: 0, doc_count: 1 }],
total: [
{ key: 0, doc_count: 3 },
{ key: 20, doc_count: 2 },
{ key: 40, doc_count: 1 },
{ key: 120, doc_count: 1 },
],
},
}),
getTaskManagerMetricEvent({
numOverdueTasks: {
total: [],
},
}),
getTaskManagerMetricEvent({
numOverdueTasks: {
telemetry: [
{ key: 0, doc_count: 1 },
{ key: 20, doc_count: 1 },
],
reporting: [{ key: 0, doc_count: 1 }],
'actions:webhook': [
{ key: 0, doc_count: 3 },
{ key: 30, doc_count: 2 },
{ key: 50, doc_count: 1 },
],
'actions:.email': [{ key: 0, doc_count: 11 }],
total: [
{ key: 0, doc_count: 16 },
{ key: 20, doc_count: 1 },
{ key: 30, doc_count: 2 },
{ key: 50, doc_count: 1 },
],
},
}),
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskOverdueAggregator = createAggregator({
key: 'task_overdue',
events$,
config,
reset$: new Subject<boolean>(),
eventFilter: (event: TaskLifecycleEvent) => isTaskManagerMetricEvent(event),
metricsAggregator: new TaskOverdueMetricsAggregator(),
});
return new Promise<void>((resolve) => {
taskOverdueAggregator
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events.length),
bufferCount(events.length)
)
.subscribe((metrics: Array<AggregatedStat<TaskOverdueMetric>>) => {
expect(metrics[0]).toEqual({
key: 'task_overdue',
value: {
overall: {
overdue_by: {
counts: [3, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
by_type: {
'alerting:example': {
overdue_by: {
counts: [0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50],
},
},
'alerting:__index-threshold': {
overdue_by: {
counts: [0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
alerting: {
overdue_by: {
counts: [0, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
'actions:webhook': {
overdue_by: {
counts: [2],
values: [10],
},
},
'actions:__email': {
overdue_by: {
counts: [1],
values: [10],
},
},
actions: {
overdue_by: {
counts: [3],
values: [10],
},
},
},
},
});
expect(metrics[1]).toEqual({
key: 'task_overdue',
value: {
overall: { overdue_by: { counts: [], values: [] } },
by_type: {},
},
});
expect(metrics[2]).toEqual({
key: 'task_overdue',
value: {
overall: {
overdue_by: {
counts: [16, 0, 1, 2, 0, 1],
values: [10, 20, 30, 40, 50, 60],
},
},
by_type: {
telemetry: {
overdue_by: {
counts: [1, 0, 1],
values: [10, 20, 30],
},
},
reporting: {
overdue_by: {
counts: [1],
values: [10],
},
},
'actions:webhook': {
overdue_by: {
counts: [3, 0, 0, 2, 0, 1],
values: [10, 20, 30, 40, 50, 60],
},
},
'actions:__email': {
overdue_by: {
counts: [11],
values: [10],
},
},
actions: {
overdue_by: {
counts: [14, 0, 0, 2, 0, 1],
values: [10, 20, 30, 40, 50, 60],
},
},
},
},
});
resolve();
});
for (const event of events) {
events$.next(event);
}
});
});
});
test('should filter task lifecycle events using specified eventFilter', () => {
const events = [
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
@ -1577,17 +1742,15 @@ describe('createAggregator', () => {
taskClaimFailureEvent,
taskClaimSuccessEvent,
];
const taskEventFilter = jest.fn().mockReturnValue(true);
const eventFilter = jest.fn().mockReturnValue(true);
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const aggregator = createAggregator({
key: 'test',
taskPollingLifecycle,
events$,
config,
resetMetrics$: new Subject<boolean>(),
taskEventFilter,
reset$: new Subject<boolean>(),
eventFilter,
metricsAggregator: new TaskClaimMetricsAggregator(),
});
@ -1597,27 +1760,27 @@ describe('createAggregator', () => {
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(pollingCycleEvents.length),
bufferCount(pollingCycleEvents.length)
take(events.length),
bufferCount(events.length)
)
.subscribe(() => {
resolve();
});
for (const event of pollingCycleEvents) {
for (const event of events) {
events$.next(event);
}
expect(taskEventFilter).toHaveBeenCalledTimes(pollingCycleEvents.length);
expect(eventFilter).toHaveBeenCalledTimes(events.length);
});
});
test('should call metricAggregator to process task lifecycle events', () => {
test('should call metricAggregator to process events', () => {
const spy = jest
.spyOn(TaskClaimMetricsAggregatorModule, 'TaskClaimMetricsAggregator')
.mockImplementation(() => mockMetricsAggregator);
const pollingCycleEvents = [
const events = [
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
@ -1630,18 +1793,16 @@ describe('createAggregator', () => {
taskClaimFailureEvent,
taskClaimSuccessEvent,
];
const taskEventFilter = jest.fn().mockReturnValue(true);
const eventFilter = jest.fn().mockReturnValue(true);
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const aggregator = createAggregator({
key: 'test',
taskPollingLifecycle,
events$,
config,
resetMetrics$: new Subject<boolean>(),
taskEventFilter,
metricsAggregator: mockMetricsAggregator,
reset$: new Subject<boolean>(),
eventFilter,
metricsAggregator: new TaskClaimMetricsAggregator(),
});
return new Promise<void>((resolve) => {
@ -1650,22 +1811,20 @@ describe('createAggregator', () => {
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(pollingCycleEvents.length),
bufferCount(pollingCycleEvents.length)
take(events.length),
bufferCount(events.length)
)
.subscribe(() => {
resolve();
});
for (const event of pollingCycleEvents) {
for (const event of events) {
events$.next(event);
}
expect(mockMetricsAggregator.initialMetric).toHaveBeenCalledTimes(1);
expect(mockMetricsAggregator.processTaskLifecycleEvent).toHaveBeenCalledTimes(
pollingCycleEvents.length
);
expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(pollingCycleEvents.length);
expect(mockMetricsAggregator.processEvent).toHaveBeenCalledTimes(events.length);
expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(events.length);
expect(mockMetricsAggregator.reset).not.toHaveBeenCalled();
spy.mockRestore();
});
@ -1676,8 +1835,8 @@ describe('createAggregator', () => {
.spyOn(TaskClaimMetricsAggregatorModule, 'TaskClaimMetricsAggregator')
.mockImplementation(() => mockMetricsAggregator);
const resetMetrics$ = new Subject<boolean>();
const pollingCycleEvents = [
const reset$ = new Subject<boolean>();
const events = [
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
@ -1690,17 +1849,15 @@ describe('createAggregator', () => {
taskClaimFailureEvent,
taskClaimSuccessEvent,
];
const taskEventFilter = jest.fn().mockReturnValue(true);
const eventFilter = jest.fn().mockReturnValue(true);
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});
const aggregator = createAggregator({
key: 'test',
taskPollingLifecycle,
events$,
config,
resetMetrics$,
taskEventFilter,
reset$,
eventFilter,
metricsAggregator: mockMetricsAggregator,
});
@ -1710,30 +1867,28 @@ describe('createAggregator', () => {
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(pollingCycleEvents.length),
bufferCount(pollingCycleEvents.length)
take(events.length),
bufferCount(events.length)
)
.subscribe(() => {
resolve();
});
for (const event of pollingCycleEvents) {
for (const event of events) {
events$.next(event);
}
for (let i = 0; i < 5; i++) {
events$.next(pollingCycleEvents[i]);
events$.next(events[i]);
}
resetMetrics$.next(true);
for (let i = 0; i < pollingCycleEvents.length; i++) {
events$.next(pollingCycleEvents[i]);
reset$.next(true);
for (let i = 0; i < events.length; i++) {
events$.next(events[i]);
}
expect(mockMetricsAggregator.initialMetric).toHaveBeenCalledTimes(1);
expect(mockMetricsAggregator.processTaskLifecycleEvent).toHaveBeenCalledTimes(
pollingCycleEvents.length
);
expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(pollingCycleEvents.length);
expect(mockMetricsAggregator.processEvent).toHaveBeenCalledTimes(events.length);
expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(events.length);
expect(mockMetricsAggregator.reset).toHaveBeenCalledTimes(1);
spy.mockRestore();
});

View file

@ -7,41 +7,43 @@
import { combineLatest, filter, interval, map, merge, Observable, startWith } from 'rxjs';
import { JsonValue } from '@kbn/utility-types';
import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle';
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { TaskManagerConfig } from '../config';
import { ITaskMetricsAggregator } from './types';
import { TaskLifecycleEvent } from '../polling_lifecycle';
export interface CreateMetricsAggregatorOpts<T> {
key: string;
config: TaskManagerConfig;
resetMetrics$: Observable<boolean>;
taskPollingLifecycle: TaskPollingLifecycle;
taskEventFilter: (taskEvent: TaskLifecycleEvent) => boolean;
reset$?: Observable<boolean>;
events$: Observable<TaskLifecycleEvent>;
eventFilter: (event: TaskLifecycleEvent) => boolean;
metricsAggregator: ITaskMetricsAggregator<T>;
}
export function createAggregator<T extends JsonValue>({
key,
taskPollingLifecycle,
config,
resetMetrics$,
taskEventFilter,
reset$,
events$,
eventFilter,
metricsAggregator,
}: CreateMetricsAggregatorOpts<T>): AggregatedStatProvider<T> {
// Resets the aggregators either when the reset interval has passed or
// a resetMetrics$ event is received
merge(
interval(config.metrics_reset_interval).pipe(map(() => true)),
resetMetrics$.pipe(map(() => true))
).subscribe(() => {
metricsAggregator.reset();
});
if (reset$) {
// Resets the aggregators either when the reset interval has passed or
// a reset$ event is received
merge(
interval(config.metrics_reset_interval).pipe(map(() => true)),
reset$.pipe(map(() => true))
).subscribe(() => {
metricsAggregator.reset();
});
}
const taskEvents$: Observable<T> = taskPollingLifecycle.events.pipe(
filter((taskEvent: TaskLifecycleEvent) => taskEventFilter(taskEvent)),
map((taskEvent: TaskLifecycleEvent) => {
metricsAggregator.processTaskLifecycleEvent(taskEvent);
const taskEvents$: Observable<T> = events$.pipe(
filter((event: TaskLifecycleEvent) => eventFilter(event)),
map((event: TaskLifecycleEvent) => {
metricsAggregator.processTaskLifecycleEvent(event);
return metricsAggregator.collect();
})
);

View file

@ -9,18 +9,28 @@ import { Observable } from 'rxjs';
import { TaskManagerConfig } from '../config';
import { Metrics, createMetricsAggregators, createMetricsStream } from './metrics_stream';
import { TaskPollingLifecycle } from '../polling_lifecycle';
import { TaskManagerMetricsCollector } from './task_metrics_collector';
export type { Metrics } from './metrics_stream';
export function metricsStream(
config: TaskManagerConfig,
resetMetrics$: Observable<boolean>,
taskPollingLifecycle?: TaskPollingLifecycle
): Observable<Metrics> {
interface MetricsStreamOpts {
config: TaskManagerConfig;
reset$: Observable<boolean>; // emits when counter metrics should be reset
taskPollingLifecycle?: TaskPollingLifecycle; // subscribe to task lifecycle events
taskManagerMetricsCollector?: TaskManagerMetricsCollector; // subscribe to collected task manager metrics
}
export function metricsStream({
config,
reset$,
taskPollingLifecycle,
taskManagerMetricsCollector,
}: MetricsStreamOpts): Observable<Metrics> {
return createMetricsStream(
createMetricsAggregators({
config,
resetMetrics$,
reset$,
taskPollingLifecycle,
taskManagerMetricsCollector,
})
);
}

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getTaskTypeGroup } from './get_task_type_group';
describe('getTaskTypeGroup', () => {
test('should correctly group based on task type prefix', () => {
expect(getTaskTypeGroup('alerting:abc')).toEqual('alerting');
expect(getTaskTypeGroup('actions:def')).toEqual('actions');
});
test('should return undefined if no match', () => {
expect(getTaskTypeGroup('alerting-abc')).toBeUndefined();
expect(getTaskTypeGroup('fooalertingbar')).toBeUndefined();
});
});

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
const taskTypeGrouping = new Set<string>(['alerting:', 'actions:']);
export function getTaskTypeGroup(taskType: string): string | undefined {
for (const group of taskTypeGrouping) {
if (taskType.startsWith(group)) {
return group.replace(':', '');
}
}
}

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { Counter } from './counter';
export { getTaskTypeGroup } from './get_task_type_group';
export { MetricCounterService } from './metric_counter_service';
export { type SerializedHistogram, SimpleHistogram } from './simple_histogram';
export { unflattenObject } from './unflatten_object';

View file

@ -6,17 +6,8 @@
*/
import { JsonObject } from '@kbn/utility-types';
import { set } from '@kbn/safer-lodash-set';
import { Counter } from './counter';
interface GenericObject {
[key: string]: unknown;
}
export const unflattenObject = <T extends object = GenericObject>(object: object): T =>
Object.entries(object).reduce((acc, [key, value]) => {
set(acc, key, value);
return acc;
}, {} as T);
import { unflattenObject } from './unflatten_object';
export class MetricCounterService<T extends JsonObject> {
private readonly counters = new Map<string, Counter>();

View file

@ -53,6 +53,8 @@ describe('SimpleHistogram', () => {
test('should correctly record values', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(0);
histogram.record(10);
histogram.record(23);
histogram.record(34);
histogram.record(21);
@ -64,8 +66,8 @@ describe('SimpleHistogram', () => {
histogram.record(2);
expect(histogram.get()).toEqual([
{ value: 10, count: 2 },
{ value: 20, count: 0 },
{ value: 10, count: 3 },
{ value: 20, count: 1 },
{ value: 30, count: 2 },
{ value: 40, count: 2 },
{ value: 50, count: 0 },
@ -77,6 +79,33 @@ describe('SimpleHistogram', () => {
]);
});
test('should correctly record values with specific increment', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(0);
histogram.record(23, 2);
histogram.record(34);
histogram.record(21);
histogram.record(56);
histogram.record(78);
histogram.record(33, 4);
histogram.record(99, 5);
histogram.record(1);
histogram.record(2);
expect(histogram.get()).toEqual([
{ value: 10, count: 3 },
{ value: 20, count: 0 },
{ value: 30, count: 3 },
{ value: 40, count: 5 },
{ value: 50, count: 0 },
{ value: 60, count: 1 },
{ value: 70, count: 0 },
{ value: 80, count: 1 },
{ value: 90, count: 0 },
{ value: 100, count: 5 },
]);
});
test('should ignore values less than 0 and greater than max', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(23);

View file

@ -40,14 +40,14 @@ export class SimpleHistogram {
}
}
public record(value: number) {
public record(value: number, increment: number = 1) {
if (value < 0 || value > this.maxValue) {
return;
}
for (let i = 0; i < this.histogramBuckets.length; i++) {
if (value >= this.histogramBuckets[i].min && value < this.histogramBuckets[i].max) {
this.histogramBuckets[i].count++;
this.histogramBuckets[i].count += increment;
break;
}

View file

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { unflattenObject } from './unflatten_object';
describe('unflattenObject', () => {
test('should unflatten an object', () => {
const obj = {
a: true,
'b.baz[0].a': false,
'b.baz[0].b': 'foo',
'b.baz[1]': 'bar',
'b.baz[2]': true,
'b.foo': 'bar',
'b.baz[3][0]': 1,
'b.baz[3][1]': 2,
'c.b.foo': 'cheese',
};
expect(unflattenObject(obj)).toEqual({
a: true,
b: {
foo: 'bar',
baz: [
{
a: false,
b: 'foo',
},
'bar',
true,
[1, 2],
],
},
c: {
b: {
foo: 'cheese',
},
},
});
});
});

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { set } from '@kbn/safer-lodash-set';
interface GenericObject {
[key: string]: unknown;
}
export const unflattenObject = <T extends object = GenericObject>(object: object): T =>
Object.entries(object).reduce((acc, [key, value]) => {
set(acc, key, value);
return acc;
}, {} as T);

View file

@ -11,15 +11,23 @@ import { set } from '@kbn/safer-lodash-set';
import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle';
import { TaskManagerConfig } from '../config';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { isTaskManagerStatEvent, isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events';
import {
isTaskManagerStatEvent,
isTaskManagerMetricEvent,
isTaskPollingCycleEvent,
isTaskRunEvent,
} from '../task_events';
import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator';
import { createAggregator } from './create_aggregator';
import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator';
import { TaskOverdueMetric, TaskOverdueMetricsAggregator } from './task_overdue_metrics_aggregator';
import { TaskManagerMetricsCollector } from './task_metrics_collector';
export interface Metrics {
last_update: string;
metrics: {
task_claim?: Metric<TaskClaimMetric>;
task_run?: Metric<TaskRunMetric>;
task_overdue?: Metric<TaskOverdueMetric>;
};
}
@ -30,36 +38,51 @@ export interface Metric<T> {
interface CreateMetricsAggregatorsOpts {
config: TaskManagerConfig;
resetMetrics$: Observable<boolean>;
reset$: Observable<boolean>;
taskPollingLifecycle?: TaskPollingLifecycle;
taskManagerMetricsCollector?: TaskManagerMetricsCollector;
}
export function createMetricsAggregators({
config,
resetMetrics$,
reset$,
taskPollingLifecycle,
taskManagerMetricsCollector,
}: CreateMetricsAggregatorsOpts): AggregatedStatProvider {
const aggregators: AggregatedStatProvider[] = [];
if (taskPollingLifecycle) {
aggregators.push(
createAggregator({
key: 'task_claim',
taskPollingLifecycle,
events$: taskPollingLifecycle.events,
config,
resetMetrics$,
taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent),
reset$,
eventFilter: (event: TaskLifecycleEvent) => isTaskPollingCycleEvent(event),
metricsAggregator: new TaskClaimMetricsAggregator(),
}),
createAggregator({
key: 'task_run',
taskPollingLifecycle,
events$: taskPollingLifecycle.events,
config,
resetMetrics$,
taskEventFilter: (taskEvent: TaskLifecycleEvent) =>
isTaskRunEvent(taskEvent) || isTaskManagerStatEvent(taskEvent),
reset$,
eventFilter: (event: TaskLifecycleEvent) =>
isTaskRunEvent(event) || isTaskManagerStatEvent(event),
metricsAggregator: new TaskRunMetricsAggregator(),
})
);
}
if (taskManagerMetricsCollector) {
aggregators.push(
createAggregator({
key: 'task_overdue',
events$: taskManagerMetricsCollector.events,
config,
eventFilter: (event: TaskLifecycleEvent) => isTaskManagerMetricEvent(event),
metricsAggregator: new TaskOverdueMetricsAggregator(),
})
);
}
return merge(...aggregators);
}

View file

@ -9,9 +9,8 @@ import { JsonObject } from '@kbn/utility-types';
import { isOk } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskRun } from '../task_events';
import { SerializedHistogram, SimpleHistogram } from './simple_histogram';
import { type SerializedHistogram, SimpleHistogram, MetricCounterService } from './lib';
import { ITaskMetricsAggregator } from './types';
import { MetricCounterService } from './counter/metric_counter_service';
const HDR_HISTOGRAM_MAX = 30000; // 30 seconds
const HDR_HISTOGRAM_BUCKET_SIZE = 100; // 100 millis

View file

@ -0,0 +1,393 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import sinon from 'sinon';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { taskStoreMock } from '../task_store.mock';
import { TaskManagerMetricsCollector } from './task_metrics_collector';
describe('TaskManagerMetricsCollector', () => {
let clock: sinon.SinonFakeTimers;
const mockTaskStore = taskStoreMock.create({});
const logger = loggingSystemMock.create().get();
beforeEach(() => {
clock = sinon.useFakeTimers({ toFake: ['Date', 'setTimeout', 'clearTimeout'] });
});
afterEach(() => {
jest.clearAllMocks();
clock.restore();
});
test('intializes the metrics collector with the provided interval and emits events at each interval', async () => {
mockTaskStore.aggregate.mockResolvedValueOnce({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 4, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {
overallOverdueByHistogram: {
buckets: [
{ key: 0, doc_count: 1 },
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
},
byTaskType: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{
key: 'taskType1',
doc_count: 3,
overdueByHistogram: {
buckets: [
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
},
},
{
key: 'taskType2',
doc_count: 1,
overdueByHistogram: {
buckets: [{ key: 0, doc_count: 1 }],
},
},
],
},
},
});
mockTaskStore.aggregate.mockResolvedValueOnce({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 0, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {
overallOverdueByHistogram: {
buckets: [],
},
byTaskType: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [],
},
},
});
mockTaskStore.aggregate.mockResolvedValueOnce({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 1, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {
overallOverdueByHistogram: {
buckets: [{ key: 0, doc_count: 1 }],
},
byTaskType: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{
key: 'taskType3',
doc_count: 1,
overdueByHistogram: {
buckets: [{ key: 0, doc_count: 1 }],
},
},
],
},
},
});
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
const taskManagerMetricsCollector = new TaskManagerMetricsCollector({
logger,
pollInterval,
store: mockTaskStore,
});
const handler = jest.fn();
taskManagerMetricsCollector.events.subscribe(handler);
expect(mockTaskStore.aggregate).toHaveBeenCalledTimes(1);
expect(mockTaskStore.aggregate).toHaveBeenCalledWith({
aggs: {
overallOverdueByHistogram: {
histogram: {
field: 'overdueBySeconds',
min_doc_count: 1,
interval: 10,
},
},
byTaskType: {
terms: { field: 'task.taskType', size: 500 },
aggs: {
overdueByHistogram: {
histogram: {
field: 'overdueBySeconds',
interval: 10,
},
},
},
},
},
runtime_mappings: {
overdueBySeconds: {
type: 'long',
script: {
source: `
def runAt = doc['task.runAt'];
if(!runAt.empty) {
emit((new Date().getTime() - runAt.value.getMillis()) / 1000);
} else {
def retryAt = doc['task.retryAt'];
if(!retryAt.empty) {
emit((new Date().getTime() - retryAt.value.getMillis()) / 1000);
} else {
emit(0);
}
}
`,
},
},
},
query: {
bool: {
filter: [
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
],
},
},
size: 0,
});
await new Promise((resolve) => setImmediate(resolve));
clock.tick(halfInterval);
expect(mockTaskStore.aggregate).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith({
event: {
tag: 'ok',
value: {
numOverdueTasks: {
taskType1: [
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
taskType2: [{ key: 0, doc_count: 1 }],
total: [
{ key: 0, doc_count: 1 },
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
},
},
},
type: 'TASK_MANAGER_METRIC',
});
clock.tick(halfInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(mockTaskStore.aggregate).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({
event: {
tag: 'ok',
value: {
numOverdueTasks: {
total: [],
},
},
},
type: 'TASK_MANAGER_METRIC',
});
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval + 10);
await new Promise((resolve) => setImmediate(resolve));
expect(mockTaskStore.aggregate).toHaveBeenCalledTimes(3);
expect(handler).toHaveBeenCalledTimes(3);
expect(handler).toHaveBeenCalledWith({
event: {
tag: 'ok',
value: {
numOverdueTasks: {
taskType3: [{ key: 0, doc_count: 1 }],
total: [{ key: 0, doc_count: 1 }],
},
},
},
type: 'TASK_MANAGER_METRIC',
});
});
test('emits empty metric when querying for data fails and continues to query on interval', async () => {
mockTaskStore.aggregate.mockImplementationOnce(async () => {
throw new Error('failed to query');
});
mockTaskStore.aggregate.mockResolvedValueOnce({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 4, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {
overallOverdueByHistogram: {
buckets: [
{ key: 0, doc_count: 1 },
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
},
byTaskType: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{
key: 'taskType1',
doc_count: 3,
overdueByHistogram: {
buckets: [
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
},
},
{
key: 'taskType2',
doc_count: 1,
overdueByHistogram: {
buckets: [{ key: 0, doc_count: 1 }],
},
},
],
},
},
});
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
const taskManagerMetricsCollector = new TaskManagerMetricsCollector({
logger,
pollInterval,
store: mockTaskStore,
});
const handler = jest.fn();
taskManagerMetricsCollector.events.subscribe(handler);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(halfInterval);
expect(mockTaskStore.aggregate).toHaveBeenCalledTimes(1);
expect(logger.debug).toHaveBeenCalledWith(
`Error querying for task manager metrics - failed to query`
);
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith({
event: {
tag: 'ok',
value: {
numOverdueTasks: {
total: [],
},
},
},
type: 'TASK_MANAGER_METRIC',
});
clock.tick(halfInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(mockTaskStore.aggregate).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({
event: {
tag: 'ok',
value: {
numOverdueTasks: {
taskType1: [
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
taskType2: [{ key: 0, doc_count: 1 }],
total: [
{ key: 0, doc_count: 1 },
{ key: 10, doc_count: 1 },
{ key: 60, doc_count: 2 },
],
},
},
},
type: 'TASK_MANAGER_METRIC',
});
});
test('handles malformed result', async () => {
mockTaskStore.aggregate.mockResolvedValueOnce({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 4, relation: 'eq' }, max_score: null, hits: [] },
// no aggregation in result
});
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
const taskManagerMetricsCollector = new TaskManagerMetricsCollector({
logger,
pollInterval,
store: mockTaskStore,
});
const handler = jest.fn();
taskManagerMetricsCollector.events.subscribe(handler);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(halfInterval);
expect(mockTaskStore.aggregate).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith({
event: {
tag: 'ok',
value: {
numOverdueTasks: {
total: [],
},
},
},
type: 'TASK_MANAGER_METRIC',
});
});
});

View file

@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/core/server';
import {
AggregationsStringTermsBucket,
AggregationsStringTermsBucketKeys,
AggregationsTermsAggregateBase,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { Observable, Subject } from 'rxjs';
import { TaskStore } from '../task_store';
import {
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
} from '../queries/mark_available_tasks_as_claimed';
import { ITaskEventEmitter, TaskLifecycleEvent } from '../polling_lifecycle';
import { asTaskManagerMetricEvent } from '../task_events';
import { asOk } from '../lib/result_type';
const DEFAULT_POLL_INTERVAL = 5000; // query every 5 seconds
interface ConstructorOpts {
logger: Logger;
store: TaskStore;
pollInterval?: number;
}
export interface TaskManagerMetrics {
numOverdueTasks: {
total: AggregationsStringTermsBucketKeys[];
[key: string]: AggregationsStringTermsBucketKeys[];
};
}
type OverdueTaskAggBucket = AggregationsStringTermsBucketKeys & {
overdueByHistogram: AggregationsStringTermsBucket;
};
export class TaskManagerMetricsCollector implements ITaskEventEmitter<TaskLifecycleEvent> {
private store: TaskStore;
private logger: Logger;
private readonly pollInterval: number;
private running: boolean = false;
// emit collected metrics
private metrics$ = new Subject<TaskLifecycleEvent>();
constructor({ logger, store, pollInterval }: ConstructorOpts) {
this.store = store;
this.logger = logger;
this.pollInterval = pollInterval ?? DEFAULT_POLL_INTERVAL;
this.start();
}
public get events(): Observable<TaskLifecycleEvent> {
return this.metrics$;
}
private start() {
if (!this.running) {
this.running = true;
this.runCollectionCycle();
}
}
private async runCollectionCycle() {
const start = Date.now();
try {
const results = await this.store.aggregate({
size: 0,
query: {
bool: {
filter: [
{
bool: {
should: [IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt],
},
},
],
},
},
runtime_mappings: {
overdueBySeconds: {
type: 'long',
script: {
source: `
def runAt = doc['task.runAt'];
if(!runAt.empty) {
emit((new Date().getTime() - runAt.value.getMillis()) / 1000);
} else {
def retryAt = doc['task.retryAt'];
if(!retryAt.empty) {
emit((new Date().getTime() - retryAt.value.getMillis()) / 1000);
} else {
emit(0);
}
}
`,
},
},
},
aggs: {
overallOverdueByHistogram: {
histogram: {
field: 'overdueBySeconds',
min_doc_count: 1,
interval: 10,
},
},
byTaskType: {
terms: {
field: 'task.taskType',
size: 500,
},
aggs: {
overdueByHistogram: {
histogram: {
field: 'overdueBySeconds',
interval: 10,
},
},
},
},
},
});
const aggregations =
(results?.aggregations as {
overallOverdueByHistogram: AggregationsTermsAggregateBase<AggregationsStringTermsBucketKeys>;
byTaskType: AggregationsTermsAggregateBase<OverdueTaskAggBucket>;
}) ?? {};
const byTaskType = ((aggregations.byTaskType.buckets as OverdueTaskAggBucket[]) ?? []).reduce(
(acc: Record<string, number>, bucket: OverdueTaskAggBucket) => {
acc[bucket.key] = bucket?.overdueByHistogram?.buckets ?? [];
return acc;
},
{}
);
const metrics = {
numOverdueTasks: {
total:
(aggregations?.overallOverdueByHistogram
?.buckets as AggregationsStringTermsBucketKeys[]) ?? [],
...byTaskType,
},
};
this.metrics$.next(asTaskManagerMetricEvent(asOk(metrics)));
} catch (e) {
this.logger.debug(`Error querying for task manager metrics - ${e.message}`);
// emit empty metrics so we don't have stale metrics
this.metrics$.next(asTaskManagerMetricEvent(asOk({ numOverdueTasks: { total: [] } })));
}
if (this.running) {
// Set the next runCycle call
setTimeout(
this.runCollectionCycle.bind(this),
Math.max(this.pollInterval - (Date.now() - start), 0)
);
}
}
}

View file

@ -0,0 +1,243 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { asOk } from '../lib/result_type';
import { asTaskManagerMetricEvent } from '../task_events';
import { TaskManagerMetrics } from './task_metrics_collector';
import { TaskOverdueMetricsAggregator } from './task_overdue_metrics_aggregator';
export const getTaskManagerMetricEvent = (value: TaskManagerMetrics) => {
return asTaskManagerMetricEvent(asOk(value));
};
describe('TaskOverdueMetricsAggregator', () => {
let taskOverdueMetricsAggregator: TaskOverdueMetricsAggregator;
beforeEach(() => {
taskOverdueMetricsAggregator = new TaskOverdueMetricsAggregator();
});
test('should correctly initialize', () => {
expect(taskOverdueMetricsAggregator.collect()).toEqual({
overall: { overdue_by: { counts: [], values: [] } },
by_type: {},
});
});
test('should correctly return initialMetrics', () => {
expect(taskOverdueMetricsAggregator.initialMetric()).toEqual({
overall: { overdue_by: { counts: [], values: [] } },
by_type: {},
});
});
test('should correctly process task manager metric event', () => {
taskOverdueMetricsAggregator.processTaskLifecycleEvent(
getTaskManagerMetricEvent({
numOverdueTasks: {
telemetry: [
{ key: 0, doc_count: 1 },
{ key: 20, doc_count: 1 },
],
total: [
{ key: 0, doc_count: 1 },
{ key: 20, doc_count: 1 },
],
},
})
);
expect(taskOverdueMetricsAggregator.collect()).toEqual({
overall: { overdue_by: { counts: [1, 0, 1], values: [10, 20, 30] } },
by_type: {
telemetry: { overdue_by: { counts: [1, 0, 1], values: [10, 20, 30] } },
},
});
});
test('should correctly process empty task manager metric event', () => {
taskOverdueMetricsAggregator.processTaskLifecycleEvent(
getTaskManagerMetricEvent({
numOverdueTasks: {
total: [],
},
})
);
expect(taskOverdueMetricsAggregator.collect()).toEqual({
overall: { overdue_by: { counts: [], values: [] } },
by_type: {},
});
});
test('should correctly return latest metric event', () => {
taskOverdueMetricsAggregator.processTaskLifecycleEvent(
getTaskManagerMetricEvent({
numOverdueTasks: {
telemetry: [
{ key: 0, doc_count: 1 },
{ key: 20, doc_count: 1 },
],
total: [
{ key: 0, doc_count: 1 },
{ key: 20, doc_count: 1 },
],
},
})
);
taskOverdueMetricsAggregator.processTaskLifecycleEvent(
getTaskManagerMetricEvent({
numOverdueTasks: {
telemetry: [{ key: 40, doc_count: 1 }],
total: [{ key: 40, doc_count: 1 }],
},
})
);
expect(taskOverdueMetricsAggregator.collect()).toEqual({
overall: { overdue_by: { counts: [0, 0, 0, 0, 1], values: [10, 20, 30, 40, 50] } },
by_type: {
telemetry: { overdue_by: { counts: [0, 0, 0, 0, 1], values: [10, 20, 30, 40, 50] } },
},
});
});
test('should correctly group alerting and action task types', () => {
taskOverdueMetricsAggregator.processTaskLifecycleEvent(
getTaskManagerMetricEvent({
numOverdueTasks: {
'alerting:example': [{ key: 40, doc_count: 1 }],
'alerting:.index-threshold': [
{ key: 20, doc_count: 2 },
{ key: 120, doc_count: 1 },
],
'actions:webhook': [{ key: 0, doc_count: 2 }],
'actions:.email': [{ key: 0, doc_count: 1 }],
total: [
{ key: 0, doc_count: 3 },
{ key: 20, doc_count: 2 },
{ key: 40, doc_count: 1 },
{ key: 120, doc_count: 1 },
],
},
})
);
expect(taskOverdueMetricsAggregator.collect()).toEqual({
overall: {
overdue_by: {
counts: [3, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
by_type: {
'alerting:example': {
overdue_by: {
counts: [0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50],
},
},
'alerting:__index-threshold': {
overdue_by: {
counts: [0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
alerting: {
overdue_by: {
counts: [0, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
'actions:webhook': {
overdue_by: {
counts: [2],
values: [10],
},
},
'actions:__email': {
overdue_by: {
counts: [1],
values: [10],
},
},
actions: {
overdue_by: {
counts: [3],
values: [10],
},
},
},
});
});
test('should correctly ignore reset events', () => {
taskOverdueMetricsAggregator.processTaskLifecycleEvent(
getTaskManagerMetricEvent({
numOverdueTasks: {
'alerting:example': [{ key: 40, doc_count: 1 }],
'alerting:.index-threshold': [
{ key: 20, doc_count: 2 },
{ key: 120, doc_count: 1 },
],
'actions:webhook': [{ key: 0, doc_count: 2 }],
'actions:.email': [{ key: 0, doc_count: 1 }],
total: [
{ key: 0, doc_count: 3 },
{ key: 20, doc_count: 2 },
{ key: 40, doc_count: 1 },
{ key: 120, doc_count: 1 },
],
},
})
);
taskOverdueMetricsAggregator.reset();
expect(taskOverdueMetricsAggregator.collect()).toEqual({
overall: {
overdue_by: {
counts: [3, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
by_type: {
'alerting:example': {
overdue_by: {
counts: [0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50],
},
},
'alerting:__index-threshold': {
overdue_by: {
counts: [0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
alerting: {
overdue_by: {
counts: [0, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130],
},
},
'actions:webhook': {
overdue_by: {
counts: [2],
values: [10],
},
},
'actions:__email': {
overdue_by: {
counts: [1],
values: [10],
},
},
actions: {
overdue_by: {
counts: [3],
values: [10],
},
},
},
});
});
});

View file

@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { JsonObject } from '@kbn/utility-types';
import { keys, mapValues } from 'lodash';
import { isOk, unwrap } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskManagerMetric } from '../task_events';
import {
unflattenObject,
getTaskTypeGroup,
type SerializedHistogram,
SimpleHistogram,
} from './lib';
import { TaskManagerMetrics } from './task_metrics_collector';
import { ITaskMetricsAggregator } from './types';
const HDR_HISTOGRAM_MAX = 1800; // 30 minutes
const HDR_HISTOGRAM_BUCKET_SIZE = 10; // 10 seconds
const OVERDUE_BY_KEY = 'overdue_by';
enum TaskOverdueMetricKeys {
OVERALL = 'overall',
BY_TYPE = 'by_type',
}
interface TaskOverdueHistogram extends JsonObject {
[OVERDUE_BY_KEY]: SerializedHistogram;
}
export interface TaskOverdueMetric extends JsonObject {
[TaskOverdueMetricKeys.OVERALL]: TaskOverdueHistogram;
[TaskOverdueMetricKeys.BY_TYPE]: {
[key: string]: TaskOverdueHistogram;
};
}
export class TaskOverdueMetricsAggregator implements ITaskMetricsAggregator<TaskOverdueMetric> {
private histograms: Record<string, SimpleHistogram> = {};
public initialMetric(): TaskOverdueMetric {
return {
by_type: {},
overall: { overdue_by: { counts: [], values: [] } },
};
}
public collect(): TaskOverdueMetric {
if (keys(this.histograms).length === 0) {
return {
by_type: {},
overall: { overdue_by: { counts: [], values: [] } },
};
}
return unflattenObject(mapValues(this.histograms, (hist) => hist.serialize()));
}
public reset() {
// no-op because this metric is not a counter
}
public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) {
this.histograms = {};
let metric: TaskManagerMetrics;
if (isOk((taskEvent as TaskManagerMetric).event)) {
metric = unwrap((taskEvent as TaskManagerMetric).event) as TaskManagerMetrics;
for (const key of Object.keys(metric.numOverdueTasks)) {
const hist = new SimpleHistogram(HDR_HISTOGRAM_MAX, HDR_HISTOGRAM_BUCKET_SIZE);
(metric.numOverdueTasks[key] ?? []).forEach((bucket) => {
const overdueInSec = parseInt(bucket.key, 10);
hist.record(overdueInSec, bucket.doc_count);
if (key === 'total') {
this.histograms[`${TaskOverdueMetricKeys.OVERALL}.${OVERDUE_BY_KEY}`] = hist;
} else {
const taskType = key.replaceAll('.', '__');
const taskTypeGroup = getTaskTypeGroup(taskType);
this.histograms[`${TaskOverdueMetricKeys.BY_TYPE}.${taskType}.${OVERDUE_BY_KEY}`] =
hist;
if (taskTypeGroup) {
const groupHist =
this.histograms[
`${TaskOverdueMetricKeys.BY_TYPE}.${taskTypeGroup}.${OVERDUE_BY_KEY}`
] ?? new SimpleHistogram(HDR_HISTOGRAM_MAX, HDR_HISTOGRAM_BUCKET_SIZE);
groupHist.record(overdueInSec, bucket.doc_count);
this.histograms[
`${TaskOverdueMetricKeys.BY_TYPE}.${taskTypeGroup}.${OVERDUE_BY_KEY}`
] = groupHist;
}
}
});
}
}
}
}

View file

@ -17,12 +17,14 @@ import {
isTaskRunEvent,
TaskManagerStat,
} from '../task_events';
import { MetricCounterService } from './counter/metric_counter_service';
import { SerializedHistogram, SimpleHistogram } from './simple_histogram';
import {
getTaskTypeGroup,
MetricCounterService,
SimpleHistogram,
type SerializedHistogram,
} from './lib';
import { ITaskMetricsAggregator } from './types';
const taskTypeGrouping = new Set<string>(['alerting:', 'actions:']);
const HDR_HISTOGRAM_MAX = 1800; // 30 minutes
const HDR_HISTOGRAM_BUCKET_SIZE = 10; // 10 seconds
@ -92,7 +94,7 @@ export class TaskRunMetricsAggregator implements ITaskMetricsAggregator<TaskRunM
const { task, isExpired }: RanTask | ErroredTask = unwrap(taskEvent.event);
const success = isOk((taskEvent as TaskRun).event);
const taskType = task.taskType.replaceAll('.', '__');
const taskTypeGroup = this.getTaskTypeGroup(taskType);
const taskTypeGroup = getTaskTypeGroup(taskType);
// increment the total counters
this.incrementCounters(TaskRunKeys.TOTAL, taskType, taskTypeGroup);
@ -122,12 +124,4 @@ export class TaskRunMetricsAggregator implements ITaskMetricsAggregator<TaskRunM
this.counter.increment(key, `${TaskRunMetricKeys.BY_TYPE}.${group}`);
}
}
private getTaskTypeGroup(taskType: string): string | undefined {
for (const group of taskTypeGrouping) {
if (taskType.startsWith(group)) {
return group.replaceAll(':', '');
}
}
}
}

View file

@ -11,5 +11,5 @@ export interface ITaskMetricsAggregator<T> {
initialMetric: () => T;
collect: () => T;
reset: () => void;
processTaskLifecycleEvent: (taskEvent: TaskLifecycleEvent) => void;
processTaskLifecycleEvent: (event: TaskLifecycleEvent) => void;
}

View file

@ -36,6 +36,7 @@ import { TASK_MANAGER_INDEX } from './constants';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { setupIntervalLogging } from './lib/log_health_metrics';
import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
export interface TaskManagerSetupContract {
/**
@ -88,6 +89,7 @@ export class TaskManagerPlugin
private shouldRunBackgroundTasks: boolean;
private readonly kibanaVersion: PluginInitializerContext['env']['packageInfo']['version'];
private adHocTaskCounter: AdHocTaskCounter;
private taskManagerMetricsCollector?: TaskManagerMetricsCollector;
private nodeRoles: PluginInitializerContext['node']['roles'];
constructor(private readonly initContext: PluginInitializerContext) {
@ -249,6 +251,10 @@ export class TaskManagerPlugin
// Only poll for tasks if configured to run tasks
if (this.shouldRunBackgroundTasks) {
this.taskManagerMetricsCollector = new TaskManagerMetricsCollector({
logger: this.logger,
store: taskStore,
});
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
@ -285,9 +291,12 @@ export class TaskManagerPlugin
this.ephemeralTaskLifecycle
).subscribe((stat) => this.monitoringStats$.next(stat));
metricsStream(this.config!, this.resetMetrics$, this.taskPollingLifecycle).subscribe((metric) =>
this.metrics$.next(metric)
);
metricsStream({
config: this.config!,
reset$: this.resetMetrics$,
taskPollingLifecycle: this.taskPollingLifecycle,
taskManagerMetricsCollector: this.taskManagerMetricsCollector,
}).subscribe((metric) => this.metrics$.next(metric));
const taskScheduling = new TaskScheduling({
logger: this.logger,

View file

@ -27,6 +27,7 @@ import {
TaskManagerStat,
asTaskManagerStatEvent,
EphemeralTaskRejectedDueToCapacity,
TaskManagerMetric,
} from './task_events';
import { fillPool, FillPoolResult, TimedFillPoolResult } from './lib/fill_pool';
import { Middleware } from './lib/middleware';
@ -42,6 +43,10 @@ import { TaskTypeDictionary } from './task_type_dictionary';
import { delayOnClaimConflicts } from './polling';
import { TaskClaiming, ClaimOwnershipResult } from './queries/task_claiming';
export interface ITaskEventEmitter<T> {
get events(): Observable<T>;
}
export type TaskPollingLifecycleOpts = {
logger: Logger;
definitions: TaskTypeDictionary;
@ -61,12 +66,13 @@ export type TaskLifecycleEvent =
| TaskRunRequest
| TaskPollingCycle
| TaskManagerStat
| TaskManagerMetric
| EphemeralTaskRejectedDueToCapacity;
/**
* The public interface into the task manager system.
*/
export class TaskPollingLifecycle {
export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEvent> {
private definitions: TaskTypeDictionary;
private store: TaskStore;

View file

@ -15,6 +15,7 @@ import { PollingError } from './polling';
import { TaskRunResult } from './task_running';
import { EphemeralTaskInstanceRequest } from './ephemeral_task_lifecycle';
import type { EventLoopDelayConfig } from './config';
import { TaskManagerMetrics } from './metrics/task_metrics_collector';
export enum TaskPersistence {
Recurring = 'recurring',
@ -28,6 +29,7 @@ export enum TaskEventType {
TASK_RUN = 'TASK_RUN',
TASK_RUN_REQUEST = 'TASK_RUN_REQUEST',
TASK_POLLING_CYCLE = 'TASK_POLLING_CYCLE',
TASK_MANAGER_METRIC = 'TASK_MANAGER_METRIC',
TASK_MANAGER_STAT = 'TASK_MANAGER_STAT',
EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY = 'EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY',
}
@ -82,6 +84,7 @@ export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;
export type EphemeralTaskRejectedDueToCapacity = TaskEvent<EphemeralTaskInstanceRequest, Error>;
export type TaskPollingCycle<T = string> = TaskEvent<ClaimAndFillPoolResult, PollingError<T>>;
export type TaskManagerMetric = TaskEvent<TaskManagerMetrics, Error>;
export type TaskManagerStats =
| 'load'
@ -175,6 +178,15 @@ export function asTaskManagerStatEvent(
};
}
export function asTaskManagerMetricEvent(
event: Result<TaskManagerMetrics, never>
): TaskManagerMetric {
return {
type: TaskEventType.TASK_MANAGER_METRIC,
event,
};
}
export function asEphemeralTaskRejectedDueToCapacityEvent(
id: string,
event: Result<EphemeralTaskInstanceRequest, Error>,
@ -219,6 +231,11 @@ export function isTaskManagerWorkerUtilizationStatEvent(
): taskEvent is TaskManagerStat {
return taskEvent.type === TaskEventType.TASK_MANAGER_STAT && taskEvent.id === 'workerUtilization';
}
export function isTaskManagerMetricEvent(
taskEvent: TaskEvent<unknown, unknown>
): taskEvent is TaskManagerStat {
return taskEvent.type === TaskEventType.TASK_MANAGER_METRIC;
}
export function isEphemeralTaskRejectedDueToCapacityEvent(
taskEvent: TaskEvent<unknown, unknown>
): taskEvent is EphemeralTaskRejectedDueToCapacity {

View file

@ -28,13 +28,15 @@ export default function ({ getService }: FtrProviderContext) {
function getMetrics(
reset: boolean = false,
callback: (metrics: NodeMetrics) => boolean
callback?: (metrics: NodeMetrics) => boolean
): Promise<NodeMetrics> {
return retry.try(async () => {
const metrics = await getMetricsRequest(reset);
if (metrics.metrics && callback(metrics)) {
return metrics;
if (metrics.metrics) {
if ((callback && callback(metrics)) || !callback) {
return metrics;
}
}
await delay(500);
@ -142,7 +144,7 @@ export default function ({ getService }: FtrProviderContext) {
});
});
describe('task run test', () => {
describe('task run', () => {
let ruleId: string | null = null;
before(async () => {
// create a rule that fires actions
@ -233,5 +235,18 @@ export default function ({ getService }: FtrProviderContext) {
);
});
});
describe('task overdue', () => {
it('histograms should exist', async () => {
const metrics = (await getMetrics(false)).metrics;
expect(metrics).not.to.be(null);
expect(metrics?.task_overdue).not.to.be(null);
expect(metrics?.task_overdue?.value).not.to.be(null);
expect(metrics?.task_overdue?.value.overall).not.to.be(null);
expect(metrics?.task_overdue?.value.overall.overdue_by).not.to.be(null);
expect(Array.isArray(metrics?.task_overdue?.value.overall.overdue_by.counts)).to.be(true);
expect(Array.isArray(metrics?.task_overdue?.value.overall.overdue_by.values)).to.be(true);
});
});
});
}