[Response Ops][Task Manager] Expose SLI metrics in HTTP API - Take 2 (#163652)

## Summary

Redo of this PR https://github.com/elastic/kibana/pull/162178 but
without the `native-hdr-histogram` library which caused issues in the
serverless build. In the future we may want to pursue generating a
custom build of this native library but for our current purposes, a
simple bucketed histogram should suffice. The only changes from the
original PR are in this commit:
dde5245ded,
where we create a `SimpleHistogram` class to bucket task claim durations
into `100ms` buckets.

Please reference the original PR for more description about this HTTP
API

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2023-08-11 13:54:31 -04:00 committed by GitHub
parent 57c17c4927
commit 3762df1a22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
41 changed files with 2739 additions and 95 deletions

View file

@ -23,6 +23,7 @@ describe('config validation', () => {
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
"enabled": false,
@ -81,6 +82,7 @@ describe('config validation', () => {
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
"enabled": false,
@ -137,6 +139,7 @@ describe('config validation', () => {
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
"enabled": false,

View file

@ -20,6 +20,8 @@ export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000;
export const DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW = 50;
export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60;
export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds
// At the default poll interval of 3sec, this averages over the last 15sec.
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;
@ -52,18 +54,76 @@ const eventLoopDelaySchema = schema.object({
});
const requeueInvalidTasksConfig = schema.object({
enabled: schema.boolean({ defaultValue: false }),
delay: schema.number({ defaultValue: 3000, min: 0 }),
enabled: schema.boolean({ defaultValue: false }),
max_attempts: schema.number({ defaultValue: 100, min: 1, max: 500 }),
});
export const configSchema = schema.object(
{
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
defaultValue: 10,
min: 1,
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
event_loop_delay: eventLoopDelaySchema,
/* The maximum number of times a task will be attempted before being abandoned as failed */
max_attempts: schema.number({
defaultValue: 3,
min: 1,
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.number({
defaultValue: DEFAULT_MAX_WORKERS,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
/* The interval at which monotonically increasing metrics counters will reset */
metrics_reset_interval: schema.number({
defaultValue: DEFAULT_METRICS_RESET_INTERVAL,
min: 10 * 1000, // minimum 10 seconds
}),
/* The rate at which we refresh monitored stats that require aggregation queries against ES. */
monitored_aggregated_stats_refresh_rate: schema.number({
defaultValue: DEFAULT_MONITORING_REFRESH_RATE,
/* don't run monitored stat aggregations any faster than once every 5 seconds */
min: 5000,
}),
monitored_stats_health_verbose_log: schema.object({
enabled: schema.boolean({ defaultValue: false }),
level: schema.oneOf([schema.literal('debug'), schema.literal('info')], {
defaultValue: 'debug',
}),
/* The amount of seconds we allow a task to delay before printing a warning server log */
warn_delayed_task_start_in_seconds: schema.number({
defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS,
}),
}),
/* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */
monitored_stats_required_freshness: schema.number({
defaultValue: (config?: unknown) =>
((config as { poll_interval: number })?.poll_interval ?? DEFAULT_POLL_INTERVAL) + 1000,
min: 100,
}),
/* The size of the running average window for monitored stats. */
monitored_stats_running_average_window: schema.number({
defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW,
max: 100,
min: 10,
}),
/* Task Execution result warn & error thresholds. */
monitored_task_execution_thresholds: schema.object({
custom: schema.recordOf(schema.string(), taskExecutionFailureThresholdSchema, {
defaultValue: {},
}),
default: taskExecutionFailureThresholdSchema,
}),
/* How often, in milliseconds, the task manager will look for more work. */
poll_interval: schema.number({
defaultValue: DEFAULT_POLL_INTERVAL,
@ -75,11 +135,11 @@ export const configSchema = schema.object(
defaultValue: 1000,
min: 1,
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.number({
defaultValue: DEFAULT_MAX_WORKERS,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
requeue_invalid_tasks: requeueInvalidTasksConfig,
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
}),
/* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */
version_conflict_threshold: schema.number({
@ -87,64 +147,11 @@ export const configSchema = schema.object(
min: 50,
max: 100,
}),
/* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */
monitored_stats_required_freshness: schema.number({
defaultValue: (config?: unknown) =>
((config as { poll_interval: number })?.poll_interval ?? DEFAULT_POLL_INTERVAL) + 1000,
min: 100,
}),
/* The rate at which we refresh monitored stats that require aggregation queries against ES. */
monitored_aggregated_stats_refresh_rate: schema.number({
defaultValue: DEFAULT_MONITORING_REFRESH_RATE,
/* don't run monitored stat aggregations any faster than once every 5 seconds */
min: 5000,
}),
/* The size of the running average window for monitored stats. */
monitored_stats_running_average_window: schema.number({
defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW,
max: 100,
min: 10,
}),
/* Task Execution result warn & error thresholds. */
monitored_task_execution_thresholds: schema.object({
default: taskExecutionFailureThresholdSchema,
custom: schema.recordOf(schema.string(), taskExecutionFailureThresholdSchema, {
defaultValue: {},
}),
}),
monitored_stats_health_verbose_log: schema.object({
enabled: schema.boolean({ defaultValue: false }),
level: schema.oneOf([schema.literal('debug'), schema.literal('info')], {
defaultValue: 'debug',
}),
/* The amount of seconds we allow a task to delay before printing a warning server log */
warn_delayed_task_start_in_seconds: schema.number({
defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS,
}),
}),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
defaultValue: 10,
min: 1,
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
event_loop_delay: eventLoopDelaySchema,
worker_utilization_running_average_window: schema.number({
defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW,
max: 100,
min: 1,
}),
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
}),
requeue_invalid_tasks: requeueInvalidTasksConfig,
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
},
{
validate: (config) => {

View file

@ -84,6 +84,7 @@ describe('EphemeralTaskLifecycle', () => {
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
...config,
},
elasticsearchAndSOAvailability$,

View file

@ -79,6 +79,7 @@ describe('managed configuration', () => {
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
});
logger = context.logger.get('taskManager');

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { combineLatest, filter, interval, map, merge, Observable, startWith } from 'rxjs';
import { JsonValue } from '@kbn/utility-types';
import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle';
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { TaskManagerConfig } from '../config';
import { ITaskMetricsAggregator } from './types';
export interface CreateMetricsAggregatorOpts<T> {
key: string;
config: TaskManagerConfig;
resetMetrics$: Observable<boolean>;
taskPollingLifecycle: TaskPollingLifecycle;
taskEventFilter: (taskEvent: TaskLifecycleEvent) => boolean;
metricsAggregator: ITaskMetricsAggregator<T>;
}
export function createAggregator<T extends JsonValue>({
key,
taskPollingLifecycle,
config,
resetMetrics$,
taskEventFilter,
metricsAggregator,
}: CreateMetricsAggregatorOpts<T>): AggregatedStatProvider<T> {
// Resets the aggregators either when the reset interval has passed or
// a resetMetrics$ event is received
merge(
interval(config.metrics_reset_interval).pipe(map(() => true)),
resetMetrics$.pipe(map(() => true))
).subscribe(() => {
metricsAggregator.reset();
});
const taskEvents$: Observable<T> = taskPollingLifecycle.events.pipe(
filter((taskEvent: TaskLifecycleEvent) => taskEventFilter(taskEvent)),
map((taskEvent: TaskLifecycleEvent) => {
metricsAggregator.processTaskLifecycleEvent(taskEvent);
return metricsAggregator.collect();
})
);
return combineLatest([taskEvents$.pipe(startWith(metricsAggregator.initialMetric()))]).pipe(
map(([value]: [T]) => {
return {
key,
value,
} as AggregatedStat<T>;
})
);
}

View file

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Observable } from 'rxjs';
import { TaskManagerConfig } from '../config';
import { Metrics, createMetricsAggregators, createMetricsStream } from './metrics_stream';
import { TaskPollingLifecycle } from '../polling_lifecycle';
export type { Metrics } from './metrics_stream';
export function metricsStream(
config: TaskManagerConfig,
resetMetrics$: Observable<boolean>,
taskPollingLifecycle?: TaskPollingLifecycle
): Observable<Metrics> {
return createMetricsStream(
createMetricsAggregators({
config,
resetMetrics$,
taskPollingLifecycle,
})
);
}

View file

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const createIMetricsAggregatorMock = () => {
return jest.fn().mockImplementation(() => {
return {
initialMetric: jest.fn().mockReturnValue({ count: 0 }),
reset: jest.fn(),
collect: jest.fn(),
processTaskLifecycleEvent: jest.fn(),
};
});
};
export const metricsAggregatorMock = {
create: createIMetricsAggregatorMock(),
};

View file

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subject } from 'rxjs';
import { take, bufferCount } from 'rxjs/operators';
import { createMetricsStream } from './metrics_stream';
import { JsonValue } from '@kbn/utility-types';
import { AggregatedStat } from '../lib/runtime_statistics_aggregator';
beforeEach(() => {
jest.resetAllMocks();
});
describe('createMetricsStream', () => {
it('incrementally updates the metrics returned by the endpoint', async () => {
const aggregatedStats$ = new Subject<AggregatedStat>();
return new Promise<void>((resolve) => {
createMetricsStream(aggregatedStats$)
.pipe(take(3), bufferCount(3))
.subscribe(([initialValue, secondValue, thirdValue]) => {
expect(initialValue.metrics).toMatchObject({
lastUpdate: expect.any(String),
metrics: {},
});
expect(secondValue).toMatchObject({
lastUpdate: expect.any(String),
metrics: {
newAggregatedStat: {
timestamp: expect.any(String),
value: {
some: {
complex: {
value: 123,
},
},
},
},
},
});
expect(thirdValue).toMatchObject({
lastUpdate: expect.any(String),
metrics: {
newAggregatedStat: {
timestamp: expect.any(String),
value: {
some: {
updated: {
value: 456,
},
},
},
},
},
});
});
aggregatedStats$.next({
key: 'newAggregatedStat',
value: {
some: {
complex: {
value: 123,
},
},
} as JsonValue,
});
aggregatedStats$.next({
key: 'newAggregatedStat',
value: {
some: {
updated: {
value: 456,
},
},
} as JsonValue,
});
resolve();
});
});
});

View file

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { merge, of, Observable } from 'rxjs';
import { map, scan } from 'rxjs/operators';
import { set } from '@kbn/safer-lodash-set';
import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle';
import { TaskManagerConfig } from '../config';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events';
import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator';
import { createAggregator } from './create_aggregator';
import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator';
export interface Metrics {
last_update: string;
metrics: {
task_claim?: Metric<TaskClaimMetric>;
task_run?: Metric<TaskRunMetric>;
};
}
export interface Metric<T> {
timestamp: string;
value: T;
}
interface CreateMetricsAggregatorsOpts {
config: TaskManagerConfig;
resetMetrics$: Observable<boolean>;
taskPollingLifecycle?: TaskPollingLifecycle;
}
export function createMetricsAggregators({
config,
resetMetrics$,
taskPollingLifecycle,
}: CreateMetricsAggregatorsOpts): AggregatedStatProvider {
const aggregators: AggregatedStatProvider[] = [];
if (taskPollingLifecycle) {
aggregators.push(
createAggregator({
key: 'task_claim',
taskPollingLifecycle,
config,
resetMetrics$,
taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent),
metricsAggregator: new TaskClaimMetricsAggregator(),
}),
createAggregator({
key: 'task_run',
taskPollingLifecycle,
config,
resetMetrics$,
taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent),
metricsAggregator: new TaskRunMetricsAggregator(),
})
);
}
return merge(...aggregators);
}
export function createMetricsStream(provider$: AggregatedStatProvider): Observable<Metrics> {
const initialMetrics = {
last_update: new Date().toISOString(),
metrics: {},
};
return merge(
// emit the initial metrics
of(initialMetrics),
// emit updated metrics whenever a provider updates a specific key on the stats
provider$.pipe(
map(({ key, value }) => {
return {
value: { timestamp: new Date().toISOString(), value },
key,
};
}),
scan((metrics: Metrics, { key, value }) => {
// incrementally merge stats as they come in
set(metrics.metrics, key, value);
metrics.last_update = new Date().toISOString();
return metrics;
}, initialMetrics)
)
);
}

View file

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SimpleHistogram } from './simple_histogram';
describe('SimpleHistogram', () => {
test('should throw error when bucketSize is greater than range', () => {
expect(() => {
new SimpleHistogram(10, 100);
}).toThrowErrorMatchingInlineSnapshot(`"bucket size cannot be greater than value range"`);
});
test('should correctly initialize when bucketSize evenly divides range', () => {
const histogram = new SimpleHistogram(100, 10);
expect(histogram.get()).toEqual([
{ value: 10, count: 0 },
{ value: 20, count: 0 },
{ value: 30, count: 0 },
{ value: 40, count: 0 },
{ value: 50, count: 0 },
{ value: 60, count: 0 },
{ value: 70, count: 0 },
{ value: 80, count: 0 },
{ value: 90, count: 0 },
{ value: 100, count: 0 },
]);
});
test('should correctly initialize when bucketSize does not evenly divides range', () => {
const histogram = new SimpleHistogram(100, 7);
expect(histogram.get()).toEqual([
{ value: 7, count: 0 },
{ value: 14, count: 0 },
{ value: 21, count: 0 },
{ value: 28, count: 0 },
{ value: 35, count: 0 },
{ value: 42, count: 0 },
{ value: 49, count: 0 },
{ value: 56, count: 0 },
{ value: 63, count: 0 },
{ value: 70, count: 0 },
{ value: 77, count: 0 },
{ value: 84, count: 0 },
{ value: 91, count: 0 },
{ value: 98, count: 0 },
{ value: 105, count: 0 },
]);
});
test('should correctly record values', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(23);
histogram.record(34);
histogram.record(21);
histogram.record(56);
histogram.record(78);
histogram.record(33);
histogram.record(99);
histogram.record(1);
histogram.record(2);
expect(histogram.get()).toEqual([
{ value: 10, count: 2 },
{ value: 20, count: 0 },
{ value: 30, count: 2 },
{ value: 40, count: 2 },
{ value: 50, count: 0 },
{ value: 60, count: 1 },
{ value: 70, count: 0 },
{ value: 80, count: 1 },
{ value: 90, count: 0 },
{ value: 100, count: 1 },
]);
});
test('should ignore values less than 0 and greater than max', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(23);
histogram.record(34);
histogram.record(21);
histogram.record(56);
histogram.record(78);
histogram.record(33);
histogram.record(99);
histogram.record(1);
histogram.record(2);
const hist1 = histogram.get();
histogram.record(-1);
histogram.record(200);
expect(histogram.get()).toEqual(hist1);
});
test('should correctly reset values', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(23);
histogram.record(34);
histogram.record(21);
histogram.record(56);
histogram.record(78);
histogram.record(33);
histogram.record(99);
histogram.record(1);
histogram.record(2);
expect(histogram.get()).toEqual([
{ value: 10, count: 2 },
{ value: 20, count: 0 },
{ value: 30, count: 2 },
{ value: 40, count: 2 },
{ value: 50, count: 0 },
{ value: 60, count: 1 },
{ value: 70, count: 0 },
{ value: 80, count: 1 },
{ value: 90, count: 0 },
{ value: 100, count: 1 },
]);
histogram.reset();
expect(histogram.get()).toEqual([
{ value: 10, count: 0 },
{ value: 20, count: 0 },
{ value: 30, count: 0 },
{ value: 40, count: 0 },
{ value: 50, count: 0 },
{ value: 60, count: 0 },
{ value: 70, count: 0 },
{ value: 80, count: 0 },
{ value: 90, count: 0 },
{ value: 100, count: 0 },
]);
});
test('should correctly truncate zero values', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(23);
histogram.record(34);
histogram.record(21);
histogram.record(56);
histogram.record(33);
histogram.record(1);
histogram.record(2);
expect(histogram.get()).toEqual([
{ value: 10, count: 2 },
{ value: 20, count: 0 },
{ value: 30, count: 2 },
{ value: 40, count: 2 },
{ value: 50, count: 0 },
{ value: 60, count: 1 },
{ value: 70, count: 0 },
{ value: 80, count: 0 },
{ value: 90, count: 0 },
{ value: 100, count: 0 },
]);
expect(histogram.get(true)).toEqual([
{ value: 10, count: 2 },
{ value: 20, count: 0 },
{ value: 30, count: 2 },
{ value: 40, count: 2 },
{ value: 50, count: 0 },
{ value: 60, count: 1 },
]);
});
test('should correctly truncate zero values when all values are zero', () => {
const histogram = new SimpleHistogram(100, 10);
expect(histogram.get(true)).toEqual([]);
});
});

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { last } from 'lodash';
interface Bucket {
min: number; // inclusive
max: number; // exclusive
count: number;
}
export class SimpleHistogram {
private maxValue: number;
private bucketSize: number;
private histogramBuckets: Bucket[] = [];
constructor(max: number, bucketSize: number) {
if (bucketSize > max) {
throw new Error(`bucket size cannot be greater than value range`);
}
this.maxValue = max;
this.bucketSize = bucketSize;
this.initializeBuckets();
}
public reset() {
for (let i = 0; i < this.histogramBuckets.length; i++) {
this.histogramBuckets[i].count = 0;
}
}
public record(value: number) {
if (value < 0 || value > this.maxValue) {
return;
}
for (let i = 0; i < this.histogramBuckets.length; i++) {
if (value >= this.histogramBuckets[i].min && value < this.histogramBuckets[i].max) {
this.histogramBuckets[i].count++;
break;
}
}
}
public get(truncate: boolean = false) {
let histogramToReturn = this.histogramBuckets;
if (truncate) {
// find the index of the last bucket with a non-zero value
const nonZeroCountsWithIndex = this.histogramBuckets
.map((bucket: Bucket, index: number) => ({ count: bucket.count, index }))
.filter(({ count }) => count > 0);
const lastNonZeroIndex: number =
nonZeroCountsWithIndex.length > 0 ? last(nonZeroCountsWithIndex)?.index ?? -1 : -1;
histogramToReturn =
lastNonZeroIndex >= 0 ? this.histogramBuckets.slice(0, lastNonZeroIndex + 1) : [];
}
return histogramToReturn.map((bucket: Bucket) => ({
count: bucket.count,
value: bucket.max,
}));
}
private initializeBuckets() {
let i = 0;
while (i < this.maxValue) {
this.histogramBuckets.push({
min: i,
max: i + Math.min(this.bucketSize, this.maxValue),
count: 0,
});
i += this.bucketSize;
}
}
}

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SuccessRateCounter } from './success_rate_counter';
describe('SuccessRateCounter', () => {
let successRateCounter: SuccessRateCounter;
beforeEach(() => {
successRateCounter = new SuccessRateCounter();
});
test('should correctly initialize', () => {
expect(successRateCounter.get()).toEqual({ success: 0, total: 0 });
});
test('should correctly return initialMetrics', () => {
expect(successRateCounter.initialMetric()).toEqual({ success: 0, total: 0 });
});
test('should correctly increment counter when success is true', () => {
successRateCounter.increment(true);
successRateCounter.increment(true);
expect(successRateCounter.get()).toEqual({ success: 2, total: 2 });
});
test('should correctly increment counter when success is false', () => {
successRateCounter.increment(false);
successRateCounter.increment(false);
expect(successRateCounter.get()).toEqual({ success: 0, total: 2 });
});
test('should correctly reset counter', () => {
successRateCounter.increment(true);
successRateCounter.increment(true);
successRateCounter.increment(false);
successRateCounter.increment(false);
successRateCounter.increment(true);
successRateCounter.increment(true);
successRateCounter.increment(false);
expect(successRateCounter.get()).toEqual({ success: 4, total: 7 });
successRateCounter.reset();
expect(successRateCounter.get()).toEqual({ success: 0, total: 0 });
});
});

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { JsonObject } from '@kbn/utility-types';
export interface SuccessRate extends JsonObject {
success: number;
total: number;
}
export class SuccessRateCounter {
private success = 0;
private total = 0;
public initialMetric(): SuccessRate {
return {
success: 0,
total: 0,
};
}
public get(): SuccessRate {
return {
success: this.success,
total: this.total,
};
}
public increment(success: boolean) {
if (success) {
this.success++;
}
this.total++;
}
public reset() {
this.success = 0;
this.total = 0;
}
}

View file

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { none } from 'fp-ts/lib/Option';
import { FillPoolResult } from '../lib/fill_pool';
import { asOk, asErr } from '../lib/result_type';
import { PollingError, PollingErrorType } from '../polling';
import { asTaskPollingCycleEvent } from '../task_events';
import { TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator';
export const taskClaimSuccessEvent = asTaskPollingCycleEvent<string>(
asOk({
result: FillPoolResult.PoolFilled,
stats: {
tasksUpdated: 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
}),
{
start: 1689698780490,
stop: 1689698780500,
}
);
export const taskClaimFailureEvent = asTaskPollingCycleEvent<string>(
asErr(
new PollingError<string>(
'Failed to poll for work: Error: failed to work',
PollingErrorType.WorkError,
none
)
)
);
describe('TaskClaimMetricsAggregator', () => {
let taskClaimMetricsAggregator: TaskClaimMetricsAggregator;
beforeEach(() => {
taskClaimMetricsAggregator = new TaskClaimMetricsAggregator();
});
test('should correctly initialize', () => {
expect(taskClaimMetricsAggregator.collect()).toEqual({
success: 0,
total: 0,
duration: { counts: [], values: [] },
});
});
test('should correctly return initialMetrics', () => {
expect(taskClaimMetricsAggregator.initialMetric()).toEqual({
success: 0,
total: 0,
duration: { counts: [], values: [] },
});
});
test('should correctly process task lifecycle success event', () => {
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent);
expect(taskClaimMetricsAggregator.collect()).toEqual({
success: 2,
total: 2,
duration: { counts: [2], values: [100] },
});
});
test('should correctly process task lifecycle failure event', () => {
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent);
expect(taskClaimMetricsAggregator.collect()).toEqual({
success: 0,
total: 2,
duration: { counts: [], values: [] },
});
});
test('should correctly reset counter', () => {
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent);
taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent);
expect(taskClaimMetricsAggregator.collect()).toEqual({
success: 4,
total: 7,
duration: { counts: [4], values: [100] },
});
taskClaimMetricsAggregator.reset();
expect(taskClaimMetricsAggregator.collect()).toEqual({
success: 0,
total: 0,
duration: { counts: [], values: [] },
});
});
});

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isOk } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskRun } from '../task_events';
import { SimpleHistogram } from './simple_histogram';
import { SuccessRate, SuccessRateCounter } from './success_rate_counter';
import { ITaskMetricsAggregator } from './types';
const HDR_HISTOGRAM_MAX = 30000; // 30 seconds
const HDR_HISTOGRAM_BUCKET_SIZE = 100; // 100 millis
export type TaskClaimMetric = SuccessRate & {
duration: {
counts: number[];
values: number[];
};
};
export class TaskClaimMetricsAggregator implements ITaskMetricsAggregator<TaskClaimMetric> {
private claimSuccessRate = new SuccessRateCounter();
private durationHistogram = new SimpleHistogram(HDR_HISTOGRAM_MAX, HDR_HISTOGRAM_BUCKET_SIZE);
public initialMetric(): TaskClaimMetric {
return {
...this.claimSuccessRate.initialMetric(),
duration: { counts: [], values: [] },
};
}
public collect(): TaskClaimMetric {
return {
...this.claimSuccessRate.get(),
duration: this.serializeHistogram(),
};
}
public reset() {
this.claimSuccessRate.reset();
this.durationHistogram.reset();
}
public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) {
const success = isOk((taskEvent as TaskRun).event);
this.claimSuccessRate.increment(success);
if (taskEvent.timing) {
const durationInMs = taskEvent.timing.stop - taskEvent.timing.start;
this.durationHistogram.record(durationInMs);
}
}
private serializeHistogram() {
const counts: number[] = [];
const values: number[] = [];
for (const { count, value } of this.durationHistogram.get(true)) {
counts.push(count);
values.push(value);
}
return { counts, values };
}
}

View file

@ -0,0 +1,208 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import * as uuid from 'uuid';
import { asOk, asErr } from '../lib/result_type';
import { TaskStatus } from '../task';
import { asTaskRunEvent, TaskPersistence } from '../task_events';
import { TaskRunResult } from '../task_running';
import { TaskRunMetricsAggregator } from './task_run_metrics_aggregator';
export const getTaskRunSuccessEvent = (type: string) => {
const id = uuid.v4();
return asTaskRunEvent(
id,
asOk({
task: {
id,
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {},
taskType: type,
params: {},
ownerId: null,
},
persistence: TaskPersistence.Recurring,
result: TaskRunResult.Success,
}),
{
start: 1689698780490,
stop: 1689698780500,
}
);
};
export const getTaskRunFailedEvent = (type: string) => {
const id = uuid.v4();
return asTaskRunEvent(
id,
asErr({
error: new Error('task failed to run'),
task: {
id,
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {},
taskType: type,
params: {},
ownerId: null,
},
persistence: TaskPersistence.Recurring,
result: TaskRunResult.Failed,
})
);
};
describe('TaskRunMetricsAggregator', () => {
let taskRunMetricsAggregator: TaskRunMetricsAggregator;
beforeEach(() => {
taskRunMetricsAggregator = new TaskRunMetricsAggregator();
});
test('should correctly initialize', () => {
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, total: 0 },
by_type: {},
});
});
test('should correctly return initialMetrics', () => {
expect(taskRunMetricsAggregator.initialMetric()).toEqual({
overall: { success: 0, total: 0 },
by_type: {},
});
});
test('should correctly process task run success event', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 2, total: 2 },
by_type: {
telemetry: { success: 2, total: 2 },
},
});
});
test('should correctly process task run failure event', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, total: 2 },
by_type: {
telemetry: { success: 0, total: 2 },
},
});
});
test('should correctly process different task types', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 3, total: 4 },
by_type: {
report: { success: 2, total: 2 },
telemetry: { success: 1, total: 2 },
},
});
});
test('should correctly group alerting and action task types', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(
getTaskRunSuccessEvent('alerting:.index-threshold')
);
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:.email'));
taskRunMetricsAggregator.processTaskLifecycleEvent(
getTaskRunSuccessEvent('alerting:.index-threshold')
);
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 11, total: 14 },
by_type: {
actions: { success: 3, total: 3 },
'actions:.email': { success: 1, total: 1 },
'actions:webhook': { success: 2, total: 2 },
alerting: { success: 5, total: 7 },
'alerting:example': { success: 3, total: 5 },
'alerting:.index-threshold': { success: 2, total: 2 },
report: { success: 2, total: 2 },
telemetry: { success: 1, total: 2 },
},
});
});
test('should correctly reset counter', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(
getTaskRunSuccessEvent('alerting:.index-threshold')
);
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:.email'));
taskRunMetricsAggregator.processTaskLifecycleEvent(
getTaskRunSuccessEvent('alerting:.index-threshold')
);
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 11, total: 14 },
by_type: {
actions: { success: 3, total: 3 },
'actions:.email': { success: 1, total: 1 },
'actions:webhook': { success: 2, total: 2 },
alerting: { success: 5, total: 7 },
'alerting:example': { success: 3, total: 5 },
'alerting:.index-threshold': { success: 2, total: 2 },
report: { success: 2, total: 2 },
telemetry: { success: 1, total: 2 },
},
});
taskRunMetricsAggregator.reset();
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, total: 0 },
by_type: {
actions: { success: 0, total: 0 },
'actions:.email': { success: 0, total: 0 },
'actions:webhook': { success: 0, total: 0 },
alerting: { success: 0, total: 0 },
'alerting:example': { success: 0, total: 0 },
'alerting:.index-threshold': { success: 0, total: 0 },
report: { success: 0, total: 0 },
telemetry: { success: 0, total: 0 },
},
});
});
});

View file

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { JsonObject } from '@kbn/utility-types';
import { isOk, unwrap } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { ErroredTask, RanTask, TaskRun } from '../task_events';
import { SuccessRate, SuccessRateCounter } from './success_rate_counter';
import { ITaskMetricsAggregator } from './types';
const taskTypeGrouping = new Set<string>(['alerting:', 'actions:']);
export interface TaskRunMetric extends JsonObject {
overall: SuccessRate;
by_type: {
[key: string]: SuccessRate;
};
}
export class TaskRunMetricsAggregator implements ITaskMetricsAggregator<TaskRunMetric> {
private taskRunSuccessRate = new SuccessRateCounter();
private taskRunCounter: Map<string, SuccessRateCounter> = new Map();
public initialMetric(): TaskRunMetric {
return {
overall: this.taskRunSuccessRate.initialMetric(),
by_type: {},
};
}
public collect(): TaskRunMetric {
return {
overall: this.taskRunSuccessRate.get(),
by_type: this.collectTaskTypeEntries(),
};
}
public reset() {
this.taskRunSuccessRate.reset();
for (const taskType of this.taskRunCounter.keys()) {
this.taskRunCounter.get(taskType)!.reset();
}
}
public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) {
const { task }: RanTask | ErroredTask = unwrap((taskEvent as TaskRun).event);
const taskType = task.taskType;
const taskTypeSuccessRate: SuccessRateCounter =
this.taskRunCounter.get(taskType) ?? new SuccessRateCounter();
const success = isOk((taskEvent as TaskRun).event);
this.taskRunSuccessRate.increment(success);
taskTypeSuccessRate.increment(success);
this.taskRunCounter.set(taskType, taskTypeSuccessRate);
const taskTypeGroup = this.getTaskTypeGroup(taskType);
if (taskTypeGroup) {
const taskTypeGroupSuccessRate: SuccessRateCounter =
this.taskRunCounter.get(taskTypeGroup) ?? new SuccessRateCounter();
taskTypeGroupSuccessRate.increment(success);
this.taskRunCounter.set(taskTypeGroup, taskTypeGroupSuccessRate);
}
}
private collectTaskTypeEntries() {
const collected: Record<string, SuccessRate> = {};
for (const [key, value] of this.taskRunCounter) {
collected[key] = value.get();
}
return collected;
}
private getTaskTypeGroup(taskType: string): string | undefined {
for (const group of taskTypeGrouping) {
if (taskType.startsWith(group)) {
return group.replaceAll(':', '');
}
}
}
}

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { TaskLifecycleEvent } from '../polling_lifecycle';
export interface ITaskMetricsAggregator<T> {
initialMetric: () => T;
collect: () => T;
reset: () => void;
processTaskLifecycleEvent: (taskEvent: TaskLifecycleEvent) => void;
}

View file

@ -19,7 +19,7 @@ import {
import { asOk } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskRunResult } from '../task_running';
import { AggregatedStat } from './runtime_statistics_aggregator';
import { AggregatedStat } from '../lib/runtime_statistics_aggregator';
import { taskPollingLifecycleMock } from '../polling_lifecycle.mock';
import {
BackgroundTaskUtilizationStat,

View file

@ -20,7 +20,7 @@ import {
TaskTiming,
} from '../task_events';
import { MonitoredStat } from './monitoring_stats_stream';
import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator';
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { createRunningAveragedStat } from './task_run_calcultors';
import { DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW } from '../config';

View file

@ -52,6 +52,7 @@ describe('Configuration Statistics Aggregator', () => {
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
};
const managedConfig = {

View file

@ -8,7 +8,7 @@
import { combineLatest, of } from 'rxjs';
import { pick, merge } from 'lodash';
import { map, startWith } from 'rxjs/operators';
import { AggregatedStatProvider } from './runtime_statistics_aggregator';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { TaskManagerConfig } from '../config';
import { ManagedConfiguration } from '../lib/create_managed_configuration';

View file

@ -26,7 +26,7 @@ import {
SummarizedEphemeralTaskStat,
EphemeralTaskStat,
} from './ephemeral_task_statistics';
import { AggregatedStat } from './runtime_statistics_aggregator';
import { AggregatedStat } from '../lib/runtime_statistics_aggregator';
import { ephemeralTaskLifecycleMock } from '../ephemeral_task_lifecycle.mock';
import { times, takeRight, take as takeLeft } from 'lodash';

View file

@ -9,7 +9,7 @@ import { map, filter, startWith, buffer, share } from 'rxjs/operators';
import { JsonObject } from '@kbn/utility-types';
import { combineLatest, Observable, zip } from 'rxjs';
import { isOk, Ok } from '../lib/result_type';
import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator';
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { isTaskRunEvent, isTaskManagerStatEvent } from '../task_events';

View file

@ -8,8 +8,9 @@
import { TaskManagerConfig } from '../config';
import { of, Subject } from 'rxjs';
import { take, bufferCount } from 'rxjs/operators';
import { createMonitoringStatsStream, AggregatedStat } from './monitoring_stats_stream';
import { createMonitoringStatsStream } from './monitoring_stats_stream';
import { JsonValue } from '@kbn/utility-types';
import { AggregatedStat } from '../lib/runtime_statistics_aggregator';
beforeEach(() => {
jest.resetAllMocks();
@ -56,6 +57,7 @@ describe('createMonitoringStatsStream', () => {
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
};
it('returns the initial config used to configure Task Manager', async () => {

View file

@ -37,13 +37,11 @@ import {
import { ConfigStat, createConfigurationAggregator } from './configuration_statistics';
import { TaskManagerConfig } from '../config';
import { AggregatedStatProvider } from './runtime_statistics_aggregator';
import { ManagedConfiguration } from '../lib/create_managed_configuration';
import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle';
import { CapacityEstimationStat, withCapacityEstimate } from './capacity_estimation';
import { AdHocTaskCounter } from '../lib/adhoc_task_counter';
export type { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
export interface MonitoringStats {
last_update: string;

View file

@ -30,7 +30,7 @@ import {
TaskRunStat,
SummarizedTaskRunStat,
} from './task_run_statistics';
import { AggregatedStat } from './runtime_statistics_aggregator';
import { AggregatedStat } from '../lib/runtime_statistics_aggregator';
import { FillPoolResult } from '../lib/fill_pool';
import { taskPollingLifecycleMock } from '../polling_lifecycle.mock';
import { configSchema } from '../config';

View file

@ -10,7 +10,7 @@ import { filter, startWith, map } from 'rxjs/operators';
import { JsonObject, JsonValue } from '@kbn/utility-types';
import { isNumber, mapValues } from 'lodash';
import { Logger } from '@kbn/core/server';
import { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator';
import { AggregatedStatProvider, AggregatedStat } from '../lib/runtime_statistics_aggregator';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import {
isTaskRunEvent,

View file

@ -12,7 +12,7 @@ import { JsonObject } from '@kbn/utility-types';
import { keyBy, mapValues } from 'lodash';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { AggregationResultOf } from '@kbn/es-types';
import { AggregatedStatProvider } from './runtime_statistics_aggregator';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { parseIntervalAsSecond, asInterval, parseIntervalAsMillisecond } from '../lib/intervals';
import { HealthStatus } from './monitoring_stats_stream';
import { TaskStore } from '../task_store';

View file

@ -77,6 +77,7 @@ const pluginInitializerContextParams = {
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
};
describe('TaskManagerPlugin', () => {

View file

@ -27,7 +27,7 @@ import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './tas
import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store';
import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
import { backgroundTaskUtilizationRoute, healthRoute } from './routes';
import { backgroundTaskUtilizationRoute, healthRoute, metricsRoute } from './routes';
import { createMonitoringStats, MonitoringStats } from './monitoring';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTask, ConcreteTaskInstance } from './task';
@ -35,6 +35,7 @@ import { registerTaskManagerUsageCollector } from './usage';
import { TASK_MANAGER_INDEX } from './constants';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { setupIntervalLogging } from './lib/log_health_metrics';
import { metricsStream, Metrics } from './metrics';
export interface TaskManagerSetupContract {
/**
@ -82,6 +83,8 @@ export class TaskManagerPlugin
private middleware: Middleware = createInitialMiddleware();
private elasticsearchAndSOAvailability$?: Observable<boolean>;
private monitoringStats$ = new Subject<MonitoringStats>();
private metrics$ = new Subject<Metrics>();
private resetMetrics$ = new Subject<boolean>();
private shouldRunBackgroundTasks: boolean;
private readonly kibanaVersion: PluginInitializerContext['env']['packageInfo']['version'];
private adHocTaskCounter: AdHocTaskCounter;
@ -155,6 +158,12 @@ export class TaskManagerPlugin
getClusterClient: () =>
startServicesPromise.then(({ elasticsearch }) => elasticsearch.client),
});
metricsRoute({
router,
metrics$: this.metrics$,
resetMetrics$: this.resetMetrics$,
taskManagerId: this.taskManagerId,
});
core.status.derivedStatus$.subscribe((status) =>
this.logger.debug(`status core.status.derivedStatus now set to ${status.level}`)
@ -276,6 +285,10 @@ export class TaskManagerPlugin
this.ephemeralTaskLifecycle
).subscribe((stat) => this.monitoringStats$.next(stat));
metricsStream(this.config!, this.resetMetrics$, this.taskPollingLifecycle).subscribe((metric) =>
this.metrics$.next(metric)
);
const taskScheduling = new TaskScheduling({
logger: this.logger,
taskStore,

View file

@ -82,6 +82,7 @@ describe('TaskPollingLifecycle', () => {
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
},
taskStore: mockTaskStore,
logger: taskManagerLogger,

View file

@ -7,3 +7,4 @@
export { healthRoute } from './health';
export { backgroundTaskUtilizationRoute } from './background_task_utilization';
export { metricsRoute } from './metrics';

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { of, Subject } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { httpServiceMock } from '@kbn/core/server/mocks';
import { metricsRoute } from './metrics';
import { mockHandlerArguments } from './_mock_handler_arguments';
describe('metricsRoute', () => {
beforeEach(() => {
jest.resetAllMocks();
});
it('registers route', async () => {
const router = httpServiceMock.createRouter();
metricsRoute({
router,
metrics$: of(),
resetMetrics$: new Subject<boolean>(),
taskManagerId: uuidv4(),
});
const [config] = router.get.mock.calls[0];
expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`);
});
it('emits resetMetric$ event when route is accessed and reset query param is true', async () => {
let resetCalledTimes = 0;
const resetMetrics$ = new Subject<boolean>();
resetMetrics$.subscribe(() => {
resetCalledTimes++;
});
const router = httpServiceMock.createRouter();
metricsRoute({
router,
metrics$: of(),
resetMetrics$,
taskManagerId: uuidv4(),
});
const [config, handler] = router.get.mock.calls[0];
const [context, req, res] = mockHandlerArguments({}, { query: { reset: true } }, ['ok']);
expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`);
await handler(context, req, res);
expect(resetCalledTimes).toEqual(1);
});
it('does not emit resetMetric$ event when route is accessed and reset query param is false', async () => {
let resetCalledTimes = 0;
const resetMetrics$ = new Subject<boolean>();
resetMetrics$.subscribe(() => {
resetCalledTimes++;
});
const router = httpServiceMock.createRouter();
metricsRoute({
router,
metrics$: of(),
resetMetrics$,
taskManagerId: uuidv4(),
});
const [config, handler] = router.get.mock.calls[0];
const [context, req, res] = mockHandlerArguments({}, { query: { reset: false } }, ['ok']);
expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`);
await handler(context, req, res);
expect(resetCalledTimes).toEqual(0);
});
});

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
IRouter,
RequestHandlerContext,
KibanaRequest,
IKibanaResponse,
KibanaResponseFactory,
} from '@kbn/core/server';
import { schema, TypeOf } from '@kbn/config-schema';
import { Observable, Subject } from 'rxjs';
import { Metrics } from '../metrics';
export interface NodeMetrics {
process_uuid: string;
timestamp: string;
last_update: string;
metrics: Metrics['metrics'] | null;
}
export interface MetricsRouteParams {
router: IRouter;
metrics$: Observable<Metrics>;
resetMetrics$: Subject<boolean>;
taskManagerId: string;
}
const QuerySchema = schema.object({
reset: schema.boolean({ defaultValue: true }),
});
export function metricsRoute(params: MetricsRouteParams) {
const { router, metrics$, resetMetrics$, taskManagerId } = params;
let lastMetrics: NodeMetrics | null = null;
metrics$.subscribe((metrics) => {
lastMetrics = { process_uuid: taskManagerId, timestamp: new Date().toISOString(), ...metrics };
});
router.get(
{
path: `/api/task_manager/metrics`,
options: {
access: 'public',
},
// Uncomment when we determine that we can restrict API usage to Global admins based on telemetry
// options: { tags: ['access:taskManager'] },
validate: {
query: QuerySchema,
},
},
async function (
_: RequestHandlerContext,
req: KibanaRequest<unknown, TypeOf<typeof QuerySchema>, unknown>,
res: KibanaResponseFactory
): Promise<IKibanaResponse> {
if (req.query.reset) {
resetMetrics$.next(true);
}
return res.ok({
body: lastMetrics
? lastMetrics
: { process_uuid: taskManagerId, timestamp: new Date().toISOString(), metrics: {} },
});
}
);
}

View file

@ -1298,6 +1298,45 @@ describe('TaskManagerRunner', () => {
);
});
test('emits TaskEvent when a recurring task returns a success result with hasError=true', async () => {
const id = _.random(1, 20).toString();
const runAt = minutesFromNow(_.random(5));
const onTaskEvent = jest.fn();
const { runner, instance } = await readyToRunStageSetup({
onTaskEvent,
instance: {
id,
schedule: { interval: '1m' },
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { runAt, state: {}, hasError: true };
},
}),
},
},
});
await runner.run();
expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(
asTaskRunEvent(
id,
asErr({
task: instance,
persistence: TaskPersistence.Recurring,
result: TaskRunResult.Success,
error: new Error(`Alerting task failed to run.`),
})
)
)
);
});
test('emits TaskEvent when a task run throws an error', async () => {
const id = _.random(1, 20).toString();
const error = new Error('Dangit!');

View file

@ -47,6 +47,7 @@ import {
FailedRunResult,
FailedTaskResult,
isFailedRunResult,
RunContext,
SuccessfulRunResult,
TaskDefinition,
TaskStatus,
@ -321,9 +322,9 @@ export class TaskManagerRunner implements TaskRunner {
let taskParamsValidation;
if (this.requeueInvalidTasksConfig.enabled) {
taskParamsValidation = this.validateTaskParams();
taskParamsValidation = this.validateTaskParams(modifiedContext);
if (!taskParamsValidation.error) {
taskParamsValidation = await this.validateIndirectTaskParams();
taskParamsValidation = await this.validateIndirectTaskParams(modifiedContext);
}
}
@ -359,9 +360,9 @@ export class TaskManagerRunner implements TaskRunner {
}
}
private validateTaskParams() {
private validateTaskParams({ taskInstance }: RunContext) {
let error;
const { state, taskType, params, id, numSkippedRuns = 0 } = this.instance.task;
const { state, taskType, params, id, numSkippedRuns = 0 } = taskInstance;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;
try {
@ -383,9 +384,9 @@ export class TaskManagerRunner implements TaskRunner {
return { ...(error ? { error } : {}), state };
}
private async validateIndirectTaskParams() {
private async validateIndirectTaskParams({ taskInstance }: RunContext) {
let error;
const { state, taskType, id, numSkippedRuns = 0 } = this.instance.task;
const { state, taskType, id, numSkippedRuns = 0 } = taskInstance;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;
const indirectParamsSchema = this.definition.indirectParamsSchema;
@ -735,23 +736,30 @@ export class TaskManagerRunner implements TaskRunner {
await eitherAsync(
result,
async ({ runAt, schedule }: SuccessfulRunResult) => {
this.onTaskEvent(
asTaskRunEvent(
this.id,
asOk({
task,
persistence:
schedule || task.schedule
? TaskPersistence.Recurring
: TaskPersistence.NonRecurring,
result: await (runAt || schedule || task.schedule
? this.processResultForRecurringTask(result)
: this.processResultWhenDone()),
}),
taskTiming
)
);
async ({ runAt, schedule, hasError }: SuccessfulRunResult) => {
const processedResult = {
task,
persistence:
schedule || task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring,
result: await (runAt || schedule || task.schedule
? this.processResultForRecurringTask(result)
: this.processResultWhenDone()),
};
// Alerting task runner returns SuccessfulRunResult with hasError=true
// when the alerting task fails, so we check for this condition in order
// to emit the correct task run event for metrics collection
const taskRunEvent = hasError
? asTaskRunEvent(
this.id,
asErr({
...processedResult,
error: new Error(`Alerting task failed to run.`),
}),
taskTiming
)
: asTaskRunEvent(this.id, asOk(processedResult), taskTiming);
this.onTaskEvent(taskRunEvent);
},
async ({ error }: FailedRunResult) => {
this.onTaskEvent(

View file

@ -10,6 +10,7 @@ import { FtrProviderContext } from '../../ftr_provider_context';
export default function ({ loadTestFile }: FtrProviderContext) {
describe('task_manager', function taskManagerSuite() {
loadTestFile(require.resolve('./background_task_utilization_route'));
loadTestFile(require.resolve('./metrics_route'));
loadTestFile(require.resolve('./health_route'));
loadTestFile(require.resolve('./task_management'));
loadTestFile(require.resolve('./task_management_scheduled_at'));

View file

@ -0,0 +1,227 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import url from 'url';
import supertest from 'supertest';
import { NodeMetrics } from '@kbn/task-manager-plugin/server/routes/metrics';
import { FtrProviderContext } from '../../ftr_provider_context';
export default function ({ getService }: FtrProviderContext) {
const config = getService('config');
const retry = getService('retry');
const request = supertest(url.format(config.get('servers.kibana')));
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
function getMetricsRequest(reset: boolean = false) {
return request
.get(`/api/task_manager/metrics${reset ? '' : '?reset=false'}`)
.set('kbn-xsrf', 'foo')
.expect(200)
.then((response) => response.body);
}
function getMetrics(
reset: boolean = false,
callback: (metrics: NodeMetrics) => boolean
): Promise<NodeMetrics> {
return retry.try(async () => {
const metrics = await getMetricsRequest(reset);
if (metrics.metrics && callback(metrics)) {
return metrics;
}
await delay(500);
throw new Error('Expected metrics not received');
});
}
describe('task manager metrics', () => {
describe('task claim', () => {
it('should increment task claim success/total counters', async () => {
// counters are reset every 30 seconds, so wait until the start of a
// fresh counter cycle to make sure values are incrementing
const initialMetrics = (
await getMetrics(false, (metrics) => metrics?.metrics?.task_claim?.value.total === 1)
).metrics;
expect(initialMetrics).not.to.be(null);
expect(initialMetrics?.task_claim).not.to.be(null);
expect(initialMetrics?.task_claim?.value).not.to.be(null);
let previousTaskClaimSuccess = initialMetrics?.task_claim?.value.total!;
let previousTaskClaimTotal = initialMetrics?.task_claim?.value.success!;
let previousTaskClaimTimestamp: string = initialMetrics?.task_claim?.timestamp!;
for (let i = 0; i < 5; ++i) {
const metrics = (
await getMetrics(
false,
(m: NodeMetrics) => m.metrics?.task_claim?.timestamp !== previousTaskClaimTimestamp
)
).metrics;
expect(metrics).not.to.be(null);
expect(metrics?.task_claim).not.to.be(null);
expect(metrics?.task_claim?.value).not.to.be(null);
expect(metrics?.task_claim?.value.success).to.be.greaterThan(previousTaskClaimSuccess);
expect(metrics?.task_claim?.value.total).to.be.greaterThan(previousTaskClaimTotal);
previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!;
previousTaskClaimSuccess = metrics?.task_claim?.value.success!;
previousTaskClaimTotal = metrics?.task_claim?.value.total!;
// check that duration histogram exists
expect(metrics?.task_claim?.value.duration).not.to.be(null);
expect(Array.isArray(metrics?.task_claim?.value.duration.counts)).to.be(true);
expect(Array.isArray(metrics?.task_claim?.value.duration.values)).to.be(true);
}
});
it('should reset task claim success/total counters at an interval', async () => {
const initialCounterValue = 7;
const initialMetrics = (
await getMetrics(
false,
(metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue
)
).metrics;
expect(initialMetrics).not.to.be(null);
expect(initialMetrics?.task_claim).not.to.be(null);
expect(initialMetrics?.task_claim?.value).not.to.be(null);
// retry until counter value resets
const resetMetrics = (
await getMetrics(false, (m: NodeMetrics) => m?.metrics?.task_claim?.value.total === 1)
).metrics;
expect(resetMetrics).not.to.be(null);
expect(resetMetrics?.task_claim).not.to.be(null);
expect(resetMetrics?.task_claim?.value).not.to.be(null);
});
it('should reset task claim success/total counters on request', async () => {
const initialCounterValue = 1;
const initialMetrics = (
await getMetrics(
false,
(metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue
)
).metrics;
expect(initialMetrics).not.to.be(null);
expect(initialMetrics?.task_claim).not.to.be(null);
expect(initialMetrics?.task_claim?.value).not.to.be(null);
let previousTaskClaimTimestamp: string = initialMetrics?.task_claim?.timestamp!;
for (let i = 0; i < 5; ++i) {
const metrics = (
await getMetrics(
true,
(m: NodeMetrics) => m.metrics?.task_claim?.timestamp !== previousTaskClaimTimestamp
)
).metrics;
expect(metrics).not.to.be(null);
expect(metrics?.task_claim).not.to.be(null);
expect(metrics?.task_claim?.value).not.to.be(null);
expect(metrics?.task_claim?.value.success).to.equal(1);
expect(metrics?.task_claim?.value.total).to.equal(1);
previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!;
// check that duration histogram exists
expect(metrics?.task_claim?.value.duration).not.to.be(null);
expect(Array.isArray(metrics?.task_claim?.value.duration.counts)).to.be(true);
expect(Array.isArray(metrics?.task_claim?.value.duration.values)).to.be(true);
}
});
});
describe('task run test', () => {
let ruleId: string | null = null;
before(async () => {
// create a rule that fires actions
const rule = await request
.post(`/api/alerting/rule`)
.set('kbn-xsrf', 'foo')
.send({
enabled: true,
name: 'test rule',
tags: [],
rule_type_id: '.es-query',
consumer: 'alerts',
// set schedule long so we can control when it runs
schedule: { interval: '1d' },
actions: [],
params: {
aggType: 'count',
esQuery: '{\n "query":{\n "match_all" : {}\n }\n}',
excludeHitsFromPreviousRun: false,
groupBy: 'all',
index: ['.kibana-event-log*'],
searchType: 'esQuery',
size: 100,
termSize: 5,
threshold: [0],
thresholdComparator: '>',
timeField: '@timestamp',
timeWindowSize: 5,
timeWindowUnit: 'm',
},
})
.expect(200)
.then((response) => response.body);
ruleId = rule.id;
});
after(async () => {
// delete rule
await request.delete(`/api/alerting/rule/${ruleId}`).set('kbn-xsrf', 'foo').expect(204);
});
it('should increment task run success/total counters', async () => {
const initialMetrics = (
await getMetrics(
false,
(metrics) =>
metrics?.metrics?.task_run?.value.by_type.alerting?.total === 1 &&
metrics?.metrics?.task_run?.value.by_type.alerting?.success === 1
)
).metrics;
expect(initialMetrics).not.to.be(null);
expect(initialMetrics?.task_claim).not.to.be(null);
expect(initialMetrics?.task_claim?.value).not.to.be(null);
for (let i = 0; i < 1; ++i) {
// run the rule and expect counters to increment
await request
.post('/api/sample_tasks/run_soon')
.set('kbn-xsrf', 'xxx')
.send({ task: { id: ruleId } })
.expect(200);
await getMetrics(
false,
(metrics) =>
metrics?.metrics?.task_run?.value.by_type.alerting?.total === i + 2 &&
metrics?.metrics?.task_run?.value.by_type.alerting?.success === i + 2
);
}
// counter should reset on its own
await getMetrics(
false,
(metrics) =>
metrics?.metrics?.task_run?.value.by_type.alerting?.total === 0 &&
metrics?.metrics?.task_run?.value.by_type.alerting?.success === 0
);
});
});
});
}