[Response Ops] Calculating overdue metrics using aggregation instead of fetch (#135824)

* Switching to aggregation for overdue tasks with runtime mapping

* Adding try/catch to gracefully handle search errors

* Adding task manager library function for generating required query

* Adding functional tests

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2022-07-12 15:02:35 -04:00 committed by GitHub
parent 1956eb67c6
commit b286519a3d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 650 additions and 162 deletions

View file

@ -18,7 +18,7 @@ jest.setSystemTime(new Date('2020-03-09').getTime());
describe('registerClusterCollector()', () => {
const monitoringCollection = monitoringCollectionMock.createSetup();
const coreSetup = coreMock.createSetup() as unknown as CoreSetup<ActionsPluginsStart, unknown>;
const taskManagerFetch = jest.fn();
const taskManagerAggregate = jest.fn();
beforeEach(() => {
(coreSetup.getStartServices as jest.Mock).mockImplementation(async () => {
@ -26,7 +26,7 @@ describe('registerClusterCollector()', () => {
undefined,
{
taskManager: {
fetch: taskManagerFetch,
aggregate: taskManagerAggregate,
},
},
];
@ -44,22 +44,19 @@ describe('registerClusterCollector()', () => {
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_actions');
const nowInMs = +new Date();
const docs = [
{
runAt: nowInMs - 1000,
},
{
retryAt: nowInMs - 1000,
},
];
taskManagerFetch.mockImplementation(async () => ({ docs }));
taskManagerAggregate.mockImplementation(async () => ({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 63, relation: 'eq' }, max_score: null, hits: [] },
aggregations: { overdueByPercentiles: { values: { '50.0': 125369, '99.0': 229561.87 } } },
}));
const result = (await metrics.cluster_actions.fetch()) as ClusterActionsMetric;
expect(result.overdue.count).toBe(docs.length);
expect(result.overdue.delay.p50).toBe(1000);
expect(result.overdue.delay.p99).toBe(1000);
expect(taskManagerFetch).toHaveBeenCalledWith({
expect(result.overdue.count).toEqual(63);
expect(result.overdue.delay.p50).toEqual(125369);
expect(result.overdue.delay.p99).toEqual(229561.87);
expect(taskManagerAggregate).toHaveBeenCalledWith({
query: {
bool: {
must: [
@ -126,10 +123,38 @@ describe('registerClusterCollector()', () => {
],
},
},
runtime_mappings: {
overdueBy: {
type: 'long',
script: {
source: `
def runAt = doc['task.runAt'];
if(!runAt.empty) {
emit(new Date().getTime() - runAt.value.getMillis());
} else {
def retryAt = doc['task.retryAt'];
if(!retryAt.empty) {
emit(new Date().getTime() - retryAt.value.getMillis());
} else {
emit(0);
}
}
`,
},
},
},
aggs: {
overdueByPercentiles: {
percentiles: {
field: 'overdueBy',
percents: [50, 99],
},
},
},
});
});
it('should calculate accurate p50 and p99', async () => {
it('should handle null results', async () => {
const metrics: Record<string, Metric<unknown>> = {};
monitoringCollection.registerMetric.mockImplementation((metric) => {
metrics[metric.type] = metric;
@ -140,19 +165,61 @@ describe('registerClusterCollector()', () => {
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_actions');
const nowInMs = +new Date();
const docs = [
{ runAt: nowInMs - 1000 },
{ runAt: nowInMs - 2000 },
{ runAt: nowInMs - 3000 },
{ runAt: nowInMs - 4000 },
{ runAt: nowInMs - 40000 },
];
taskManagerFetch.mockImplementation(async () => ({ docs }));
taskManagerAggregate.mockImplementation(async () => ({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: null, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
}));
const result = (await metrics.cluster_actions.fetch()) as ClusterActionsMetric;
expect(result.overdue.count).toBe(docs.length);
expect(result.overdue.delay.p50).toBe(3000);
expect(result.overdue.delay.p99).toBe(40000);
expect(result.overdue.count).toEqual(0);
expect(result.overdue.delay.p50).toEqual(0);
expect(result.overdue.delay.p99).toEqual(0);
});
it('should handle null percentile values', async () => {
const metrics: Record<string, Metric<unknown>> = {};
monitoringCollection.registerMetric.mockImplementation((metric) => {
metrics[metric.type] = metric;
});
registerClusterCollector({ monitoringCollection, core: coreSetup });
const metricTypes = Object.keys(metrics);
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_actions');
taskManagerAggregate.mockImplementation(async () => ({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: null, relation: 'eq' }, max_score: null, hits: [] },
aggregations: { overdueByPercentiles: { values: { '50.0': null, '99.0': null } } },
}));
const result = (await metrics.cluster_actions.fetch()) as ClusterActionsMetric;
expect(result.overdue.count).toEqual(0);
expect(result.overdue.delay.p50).toEqual(0);
expect(result.overdue.delay.p99).toEqual(0);
});
it('should gracefully handle search errors', async () => {
const metrics: Record<string, Metric<unknown>> = {};
monitoringCollection.registerMetric.mockImplementation((metric) => {
metrics[metric.type] = metric;
});
registerClusterCollector({ monitoringCollection, core: coreSetup });
const metricTypes = Object.keys(metrics);
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_actions');
taskManagerAggregate.mockRejectedValue(new Error('Failure'));
const result = (await metrics.cluster_actions.fetch()) as ClusterActionsMetric;
expect(result.overdue.count).toEqual(0);
expect(result.overdue.delay.p50).toEqual(0);
expect(result.overdue.delay.p99).toEqual(0);
});
});

View file

@ -4,15 +4,15 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import stats from 'stats-lite';
import type {
AggregationsKeyedPercentiles,
AggregationsPercentilesAggregateBase,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { MonitoringCollectionSetup } from '@kbn/monitoring-collection-plugin/server';
import {
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
} from '@kbn/task-manager-plugin/server';
import { aggregateTaskOverduePercentilesForType } from '@kbn/task-manager-plugin/server';
import { CoreSetup } from '@kbn/core/server';
import { ActionsPluginsStart } from '../plugin';
import { ClusterActionsMetric } from './types';
import { ClusterActionsMetric, EMPTY_CLUSTER_ACTIONS_METRICS } from './types';
export function registerClusterCollector({
monitoringCollection,
@ -39,52 +39,56 @@ export function registerClusterCollector({
},
},
fetch: async () => {
const [_, pluginStart] = await core.getStartServices();
const nowInMs = +new Date();
const { docs: overdueTasks } = await pluginStart.taskManager.fetch({
query: {
bool: {
must: [
{
term: {
'task.scope': {
value: 'actions',
},
},
},
{
bool: {
should: [IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt],
},
},
],
try {
const [_, pluginStart] = await core.getStartServices();
const results = await pluginStart.taskManager.aggregate(
aggregateTaskOverduePercentilesForType('actions')
);
const totalOverdueTasks =
typeof results.hits.total === 'number' ? results.hits.total : results.hits.total?.value;
const aggregations = results.aggregations as {
overdueByPercentiles: AggregationsPercentilesAggregateBase;
};
const overdueByValues: AggregationsKeyedPercentiles =
(aggregations?.overdueByPercentiles?.values as AggregationsKeyedPercentiles) ?? {};
/**
* Response format
* {
* "aggregations": {
* "overdueBy": {
* "values": {
* "50.0": 3027400
* "99.0": 3035402
* }
* }
* }
* }
*/
const metrics: ClusterActionsMetric = {
overdue: {
count: totalOverdueTasks ?? 0,
delay: {
p50: (overdueByValues['50.0'] as number) ?? 0,
p99: (overdueByValues['99.0'] as number) ?? 0,
},
},
},
});
};
const overdueTasksDelay = overdueTasks.map(
(overdueTask) => nowInMs - +new Date(overdueTask.runAt || overdueTask.retryAt)
);
if (isNaN(metrics.overdue.delay.p50)) {
metrics.overdue.delay.p50 = 0;
}
const metrics: ClusterActionsMetric = {
overdue: {
count: overdueTasks.length,
delay: {
p50: stats.percentile(overdueTasksDelay, 0.5),
p99: stats.percentile(overdueTasksDelay, 0.99),
},
},
};
if (isNaN(metrics.overdue.delay.p99)) {
metrics.overdue.delay.p99 = 0;
}
if (isNaN(metrics.overdue.delay.p50)) {
metrics.overdue.delay.p50 = 0;
return metrics;
} catch (err) {
return EMPTY_CLUSTER_ACTIONS_METRICS;
}
if (isNaN(metrics.overdue.delay.p99)) {
metrics.overdue.delay.p99 = 0;
}
return metrics;
},
});
}

View file

@ -6,6 +6,16 @@
*/
import { MetricResult } from '@kbn/monitoring-collection-plugin/server';
export const EMPTY_CLUSTER_ACTIONS_METRICS: ClusterActionsMetric = {
overdue: {
count: 0,
delay: {
p50: 0,
p99: 0,
},
},
};
export type ClusterActionsMetric = MetricResult<{
overdue: {
count: number;

View file

@ -18,7 +18,7 @@ jest.setSystemTime(new Date('2020-03-09').getTime());
describe('registerClusterCollector()', () => {
const monitoringCollection = monitoringCollectionMock.createSetup();
const coreSetup = coreMock.createSetup() as unknown as CoreSetup<AlertingPluginsStart, unknown>;
const taskManagerFetch = jest.fn();
const taskManagerAggregate = jest.fn();
beforeEach(() => {
(coreSetup.getStartServices as jest.Mock).mockImplementation(async () => {
@ -26,7 +26,7 @@ describe('registerClusterCollector()', () => {
undefined,
{
taskManager: {
fetch: taskManagerFetch,
aggregate: taskManagerAggregate,
},
},
];
@ -44,22 +44,19 @@ describe('registerClusterCollector()', () => {
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_rules');
const nowInMs = +new Date();
const docs = [
{
runAt: nowInMs - 1000,
},
{
retryAt: nowInMs - 1000,
},
];
taskManagerFetch.mockImplementation(async () => ({ docs }));
taskManagerAggregate.mockImplementation(async () => ({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 12, relation: 'eq' }, max_score: null, hits: [] },
aggregations: { overdueByPercentiles: { values: { '50.0': 108525, '99.0': 322480 } } },
}));
const result = (await metrics.cluster_rules.fetch()) as ClusterRulesMetric;
expect(result.overdue.count).toBe(docs.length);
expect(result.overdue.delay.p50).toBe(1000);
expect(result.overdue.delay.p99).toBe(1000);
expect(taskManagerFetch).toHaveBeenCalledWith({
expect(result.overdue.count).toEqual(12);
expect(result.overdue.delay.p50).toEqual(108525);
expect(result.overdue.delay.p99).toEqual(322480);
expect(taskManagerAggregate).toHaveBeenCalledWith({
query: {
bool: {
must: [
@ -126,10 +123,38 @@ describe('registerClusterCollector()', () => {
],
},
},
runtime_mappings: {
overdueBy: {
type: 'long',
script: {
source: `
def runAt = doc['task.runAt'];
if(!runAt.empty) {
emit(new Date().getTime() - runAt.value.getMillis());
} else {
def retryAt = doc['task.retryAt'];
if(!retryAt.empty) {
emit(new Date().getTime() - retryAt.value.getMillis());
} else {
emit(0);
}
}
`,
},
},
},
aggs: {
overdueByPercentiles: {
percentiles: {
field: 'overdueBy',
percents: [50, 99],
},
},
},
});
});
it('should calculate accurate p50 and p99', async () => {
it('should handle null results', async () => {
const metrics: Record<string, Metric<unknown>> = {};
monitoringCollection.registerMetric.mockImplementation((metric) => {
metrics[metric.type] = metric;
@ -140,19 +165,61 @@ describe('registerClusterCollector()', () => {
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_rules');
const nowInMs = +new Date();
const docs = [
{ runAt: nowInMs - 1000 },
{ runAt: nowInMs - 2000 },
{ runAt: nowInMs - 3000 },
{ runAt: nowInMs - 4000 },
{ runAt: nowInMs - 40000 },
];
taskManagerFetch.mockImplementation(async () => ({ docs }));
taskManagerAggregate.mockImplementation(async () => ({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: null, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
}));
const result = (await metrics.cluster_rules.fetch()) as ClusterRulesMetric;
expect(result.overdue.count).toBe(docs.length);
expect(result.overdue.delay.p50).toBe(3000);
expect(result.overdue.delay.p99).toBe(40000);
expect(result.overdue.count).toEqual(0);
expect(result.overdue.delay.p50).toEqual(0);
expect(result.overdue.delay.p99).toEqual(0);
});
it('should handle null percentile values', async () => {
const metrics: Record<string, Metric<unknown>> = {};
monitoringCollection.registerMetric.mockImplementation((metric) => {
metrics[metric.type] = metric;
});
registerClusterCollector({ monitoringCollection, core: coreSetup });
const metricTypes = Object.keys(metrics);
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_rules');
taskManagerAggregate.mockImplementation(async () => ({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: null, relation: 'eq' }, max_score: null, hits: [] },
aggregations: { overdueByPercentiles: { values: { '50.0': null, '99.0': null } } },
}));
const result = (await metrics.cluster_rules.fetch()) as ClusterRulesMetric;
expect(result.overdue.count).toEqual(0);
expect(result.overdue.delay.p50).toEqual(0);
expect(result.overdue.delay.p99).toEqual(0);
});
it('should gracefully handle search errors', async () => {
const metrics: Record<string, Metric<unknown>> = {};
monitoringCollection.registerMetric.mockImplementation((metric) => {
metrics[metric.type] = metric;
});
registerClusterCollector({ monitoringCollection, core: coreSetup });
const metricTypes = Object.keys(metrics);
expect(metricTypes.length).toBe(1);
expect(metricTypes[0]).toBe('cluster_rules');
taskManagerAggregate.mockRejectedValue(new Error('Failure'));
const result = (await metrics.cluster_rules.fetch()) as ClusterRulesMetric;
expect(result.overdue.count).toEqual(0);
expect(result.overdue.delay.p50).toEqual(0);
expect(result.overdue.delay.p99).toEqual(0);
});
});

View file

@ -4,15 +4,15 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import stats from 'stats-lite';
import type {
AggregationsKeyedPercentiles,
AggregationsPercentilesAggregateBase,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { MonitoringCollectionSetup } from '@kbn/monitoring-collection-plugin/server';
import {
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
} from '@kbn/task-manager-plugin/server';
import { aggregateTaskOverduePercentilesForType } from '@kbn/task-manager-plugin/server';
import { CoreSetup } from '@kbn/core/server';
import { AlertingPluginsStart } from '../plugin';
import { ClusterRulesMetric } from './types';
import { ClusterRulesMetric, EMPTY_CLUSTER_RULES_METRICS } from './types';
export function registerClusterCollector({
monitoringCollection,
@ -39,52 +39,56 @@ export function registerClusterCollector({
},
},
fetch: async () => {
const [_, pluginStart] = await core.getStartServices();
const now = +new Date();
const { docs: overdueTasks } = await pluginStart.taskManager.fetch({
query: {
bool: {
must: [
{
term: {
'task.scope': {
value: 'alerting',
},
},
},
{
bool: {
should: [IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt],
},
},
],
try {
const [_, pluginStart] = await core.getStartServices();
const results = await pluginStart.taskManager.aggregate(
aggregateTaskOverduePercentilesForType('alerting')
);
const totalOverdueTasks =
typeof results.hits.total === 'number' ? results.hits.total : results.hits.total?.value;
const aggregations = results.aggregations as {
overdueByPercentiles: AggregationsPercentilesAggregateBase;
};
const overdueByValues: AggregationsKeyedPercentiles =
(aggregations?.overdueByPercentiles?.values as AggregationsKeyedPercentiles) ?? {};
/**
* Response format
* {
* "aggregations": {
* "overdueBy": {
* "values": {
* "50.0": 3027400
* "99.0": 3035402
* }
* }
* }
* }
*/
const metrics: ClusterRulesMetric = {
overdue: {
count: totalOverdueTasks ?? 0,
delay: {
p50: (overdueByValues['50.0'] as number) ?? 0,
p99: (overdueByValues['99.0'] as number) ?? 0,
},
},
},
});
};
const overdueTasksDelay = overdueTasks.map(
(overdueTask) => now - +new Date(overdueTask.runAt || overdueTask.retryAt)
);
if (isNaN(metrics.overdue.delay.p50)) {
metrics.overdue.delay.p50 = 0;
}
const metrics: ClusterRulesMetric = {
overdue: {
count: overdueTasks.length,
delay: {
p50: stats.percentile(overdueTasksDelay, 0.5),
p99: stats.percentile(overdueTasksDelay, 0.99),
},
},
};
if (isNaN(metrics.overdue.delay.p99)) {
metrics.overdue.delay.p99 = 0;
}
if (isNaN(metrics.overdue.delay.p50)) {
metrics.overdue.delay.p50 = 0;
return metrics;
} catch (err) {
return EMPTY_CLUSTER_RULES_METRICS;
}
if (isNaN(metrics.overdue.delay.p99)) {
metrics.overdue.delay.p99 = 0;
}
return metrics;
},
});
}

View file

@ -6,6 +6,16 @@
*/
import { MetricResult } from '@kbn/monitoring-collection-plugin/server';
export const EMPTY_CLUSTER_RULES_METRICS: ClusterRulesMetric = {
overdue: {
count: 0,
delay: {
p50: 0,
p99: 0,
},
},
};
export type ClusterRulesMetric = MetricResult<{
overdue: {
count: number;

View file

@ -36,6 +36,7 @@ export {
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
} from './queries/mark_available_tasks_as_claimed';
export { aggregateTaskOverduePercentilesForType } from './queries/aggregate_task_overdue_percentiles_for_type';
export type {
TaskManagerPlugin as TaskManager,

View file

@ -20,6 +20,7 @@ const createStartMock = () => {
const mock: jest.Mocked<TaskManagerStartContract> = {
fetch: jest.fn(),
get: jest.fn(),
aggregate: jest.fn(),
remove: jest.fn(),
schedule: jest.fn(),
runSoon: jest.fn(),

View file

@ -7,6 +7,7 @@
import { combineLatest, Observable, Subject } from 'rxjs';
import { map, distinctUntilChanged } from 'rxjs/operators';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { UsageCollectionSetup, UsageCounter } from '@kbn/usage-collection-plugin/server';
import {
PluginInitializerContext,
@ -23,13 +24,13 @@ import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib
import { removeIfExists } from './lib/remove_if_exists';
import { setupSavedObjects } from './saved_objects';
import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary';
import { FetchResult, SearchOpts, TaskStore } from './task_store';
import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store';
import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
import { healthRoute } from './routes';
import { createMonitoringStats, MonitoringStats } from './monitoring';
import { EphemeralTask } from './task';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTask, ConcreteTaskInstance } from './task';
import { registerTaskManagerUsageCollector } from './usage';
import { TASK_MANAGER_INDEX } from './constants';
export interface TaskManagerSetupContract {
@ -49,7 +50,7 @@ export type TaskManagerStartContract = Pick<
TaskScheduling,
'schedule' | 'runSoon' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules'
> &
Pick<TaskStore, 'fetch' | 'get' | 'remove'> & {
Pick<TaskStore, 'fetch' | 'aggregate' | 'get' | 'remove'> & {
removeIfExists: TaskStore['remove'];
} & { supportsEphemeralTasks: () => boolean };
@ -236,6 +237,8 @@ export class TaskManagerPlugin
return {
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),
aggregate: (opts: AggregationOpts): Promise<estypes.SearchResponse<ConcreteTaskInstance>> =>
taskStore.aggregate(opts),
get: (id: string) => taskStore.get(id),
remove: (id: string) => taskStore.remove(id),
removeIfExists: (id: string) => removeIfExists(taskStore, id),

View file

@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { aggregateTaskOverduePercentilesForType } from './aggregate_task_overdue_percentiles_for_type';
describe('aggregateTaskOverduePercentilesForType', () => {
test('correctly generates query, runtime_field and aggregation for determining overdue percentiles for a given type', () => {
expect(aggregateTaskOverduePercentilesForType('foo')).toEqual({
query: {
bool: {
must: [
{
term: {
'task.scope': {
value: 'foo',
},
},
},
{
bool: {
should: [
{
bool: {
must: [
{
term: {
'task.status': 'idle',
},
},
{
range: {
'task.runAt': {
lte: 'now',
},
},
},
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{
term: {
'task.status': 'running',
},
},
{
term: {
'task.status': 'claiming',
},
},
],
},
},
{
range: {
'task.retryAt': {
lte: 'now',
},
},
},
],
},
},
],
},
},
],
},
},
runtime_mappings: {
overdueBy: {
type: 'long',
script: {
source: `
def runAt = doc['task.runAt'];
if(!runAt.empty) {
emit(new Date().getTime() - runAt.value.getMillis());
} else {
def retryAt = doc['task.retryAt'];
if(!retryAt.empty) {
emit(new Date().getTime() - retryAt.value.getMillis());
} else {
emit(0);
}
}
`,
},
},
},
aggs: {
overdueByPercentiles: {
percentiles: {
field: 'overdueBy',
percents: [50, 99],
},
},
},
});
});
});

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type {
AggregationsAggregationContainer,
QueryDslQueryContainer,
MappingRuntimeFields,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import {
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
} from './mark_available_tasks_as_claimed';
export function aggregateTaskOverduePercentilesForType(type: string): {
aggs: Record<string, AggregationsAggregationContainer>;
query: QueryDslQueryContainer;
runtime_mappings: MappingRuntimeFields;
} {
return {
query: {
bool: {
must: [
{
term: {
'task.scope': {
value: type,
},
},
},
{
bool: {
should: [IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt],
},
},
],
},
},
runtime_mappings: {
overdueBy: {
type: 'long',
script: {
source: `
def runAt = doc['task.runAt'];
if(!runAt.empty) {
emit(new Date().getTime() - runAt.value.getMillis());
} else {
def retryAt = doc['task.retryAt'];
if(!retryAt.empty) {
emit(new Date().getTime() - retryAt.value.getMillis());
} else {
emit(0);
}
}
`,
},
},
},
aggs: {
overdueByPercentiles: {
percentiles: {
field: 'overdueBy',
percents: [50, 99],
},
},
},
};
}

View file

@ -16,7 +16,7 @@ import {
SerializedConcreteTaskInstance,
} from './task';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { TaskStore, SearchOpts } from './task_store';
import { TaskStore, SearchOpts, AggregationOpts } from './task_store';
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import {
SavedObjectsSerializer,
@ -262,6 +262,103 @@ describe('TaskStore', () => {
});
});
describe('aggregate', () => {
let store: TaskStore;
let esClient: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];
beforeAll(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
esClient,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});
async function testAggregate(
opts: AggregationOpts,
hits: Array<estypes.SearchHit<unknown>> = []
) {
esClient.search.mockResponse({
hits: { hits, total: hits.length },
aggregations: {},
} as estypes.SearchResponse);
const result = await store.aggregate(opts);
expect(esClient.search).toHaveBeenCalledTimes(1);
return {
result,
args: esClient.search.mock.calls[0][0],
};
}
test('empty call filters by type, sets size to 0, passes aggregation to esClient', async () => {
const { args } = await testAggregate({
aggs: { testAgg: { terms: { field: 'task.taskType' } } },
});
expect(args).toMatchObject({
index: 'tasky',
body: {
size: 0,
query: { bool: { filter: [{ term: { type: 'task' } }] } },
aggs: { testAgg: { terms: { field: 'task.taskType' } } },
},
});
});
test('allows custom queries', async () => {
const { args } = await testAggregate({
aggs: { testAgg: { terms: { field: 'task.taskType' } } },
query: {
term: { 'task.taskType': 'bar' },
},
});
expect(args).toMatchObject({
body: {
size: 0,
query: {
bool: {
must: [
{ bool: { filter: [{ term: { type: 'task' } }] } },
{ term: { 'task.taskType': 'bar' } },
],
},
},
aggs: { testAgg: { terms: { field: 'task.taskType' } } },
},
});
});
test('allows runtime mappings', async () => {
const { args } = await testAggregate({
aggs: { testAgg: { terms: { field: 'task.taskType' } } },
runtime_mappings: { testMapping: { type: 'long', script: { source: `` } } },
});
expect(args).toMatchObject({
body: {
size: 0,
query: { bool: { filter: [{ term: { type: 'task' } }] } },
aggs: { testAgg: { terms: { field: 'task.taskType' } } },
runtime_mappings: { testMapping: { type: 'long', script: { source: `` } } },
},
});
});
test('throws error when esClient.search throws error', async () => {
esClient.search.mockRejectedValue(new Error('Failure'));
await expect(store.aggregate({ aggs: {} })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure"`
);
});
});
describe('update', () => {
let store: TaskStore;
let esClient: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];

View file

@ -54,6 +54,7 @@ export interface SearchOpts {
export interface AggregationOpts {
aggs: Record<string, estypes.AggregationsAggregationContainer>;
query?: estypes.QueryDslQueryContainer;
runtime_mappings?: estypes.MappingRuntimeFields;
size?: number;
}
@ -332,6 +333,8 @@ export class TaskStore {
public async aggregate<TSearchRequest extends AggregationOpts>({
aggs,
query,
// eslint-disable-next-line @typescript-eslint/naming-convention
runtime_mappings,
size = 0,
}: TSearchRequest): Promise<estypes.SearchResponse<ConcreteTaskInstance>> {
const body = await this.esClient.search<
@ -344,6 +347,7 @@ export class TaskStore {
body: ensureAggregationOnlyReturnsTaskObjects({
query,
aggs,
runtime_mappings,
size,
}),
});

View file

@ -20,6 +20,7 @@ export default function actionsTests({ loadTestFile, getService }: FtrProviderCo
loadTestFile(require.resolve('./get'));
loadTestFile(require.resolve('./connector_types'));
loadTestFile(require.resolve('./update'));
loadTestFile(require.resolve('./monitoring_collection'));
loadTestFile(require.resolve('./execute'));
loadTestFile(require.resolve('./enqueue'));
loadTestFile(require.resolve('./builtin_action_types/email'));

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../common/ftr_provider_context';
const CLUSTER_ACTIONS_MONITORING_COLLECTION_URL = `/api/monitoring_collection/cluster_actions`;
// eslint-disable-next-line import/no-default-export
export default function actionsMonitoringCollectionTests({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
describe('monitoring_collection', () => {
it('should calculate overdue task count and percentiles', async () => {
// We're not forcing any overdue tasks for this test, just testing that the
// route returns successfully and that the expected fields are there
const getResponse = await supertest.get(CLUSTER_ACTIONS_MONITORING_COLLECTION_URL);
expect(getResponse.status).to.eql(200);
expect(typeof getResponse.body.cluster_actions.overdue.count).to.eql('number');
expect(typeof getResponse.body.cluster_actions.overdue.delay.p50).to.eql('number');
expect(typeof getResponse.body.cluster_actions.overdue.delay.p99).to.eql('number');
});
});
}

View file

@ -27,7 +27,7 @@ export default function alertingTests({ loadTestFile, getService }: FtrProviderC
loadTestFile(require.resolve('./rule_types'));
loadTestFile(require.resolve('./event_log'));
loadTestFile(require.resolve('./execution_status'));
loadTestFile(require.resolve('./in_memory_metrics'));
loadTestFile(require.resolve('./monitoring_collection'));
loadTestFile(require.resolve('./monitoring'));
loadTestFile(require.resolve('./mute_all'));
loadTestFile(require.resolve('./mute_instance'));

View file

@ -19,13 +19,14 @@ import { FtrProviderContext } from '../../../common/ftr_provider_context';
import { createEsDocuments } from './builtin_alert_types/lib/create_test_data';
const NODE_RULES_MONITORING_COLLECTION_URL = `/api/monitoring_collection/node_rules`;
const CLUSTER_RULES_MONITORING_COLLECTION_URL = `/api/monitoring_collection/cluster_rules`;
const RULE_INTERVAL_SECONDS = 6;
const RULE_INTERVALS_TO_WRITE = 5;
const RULE_INTERVAL_MILLIS = RULE_INTERVAL_SECONDS * 1000;
const ES_GROUPS_TO_WRITE = 3;
// eslint-disable-next-line import/no-default-export
export default function inMemoryMetricsAlertTests({ getService }: FtrProviderContext) {
export default function alertingMonitoringCollectionTests({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const es = getService('es');
const log = getService('log');
@ -33,7 +34,7 @@ export default function inMemoryMetricsAlertTests({ getService }: FtrProviderCon
const esTestIndexTool = new ESTestIndexTool(es, retry);
const waitForExecutionCount = createWaitForExecutionCount(supertest, Spaces.space1.id);
describe('inMemoryMetrics', () => {
describe('monitoring_collection', () => {
let endDate: string;
const objectRemover = new ObjectRemover(supertest);
@ -48,7 +49,7 @@ export default function inMemoryMetricsAlertTests({ getService }: FtrProviderCon
after(async () => await objectRemover.removeAll());
it('should count executions', async () => {
it('inMemoryMetrics should count executions', async () => {
const createResponse = await supertest
.post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`)
.set('kbn-xsrf', 'foo')
@ -67,7 +68,7 @@ export default function inMemoryMetricsAlertTests({ getService }: FtrProviderCon
expect(getResponse.body.node_rules.executions).to.greaterThan(0);
});
it('should count failures', async () => {
it('inMemoryMetrics should count failures', async () => {
const pattern = [false]; // Once we start failing, the rule type doesn't update state so the failures have to be at the end
const createResponse = await supertest
.post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`)
@ -95,7 +96,7 @@ export default function inMemoryMetricsAlertTests({ getService }: FtrProviderCon
expect(getResponse.body.node_rules.failures).to.greaterThan(0);
});
it('should count timeouts', async () => {
it('inMemoryMetrics should count timeouts', async () => {
const body = await es.info();
if (!body.version.number.includes('SNAPSHOT')) {
log.debug('Skipping because this build does not have the required shard_delay agg');
@ -145,5 +146,15 @@ export default function inMemoryMetricsAlertTests({ getService }: FtrProviderCon
expect(getResponse.status).to.eql(200);
expect(getResponse.body.node_rules.timeouts).to.greaterThan(0);
});
it('should calculate overdue task count and percentiles', async () => {
// We're not forcing any overdue tasks for this test, just testing that the
// route returns successfully and that the expected fields are there
const getResponse = await supertest.get(CLUSTER_RULES_MONITORING_COLLECTION_URL);
expect(getResponse.status).to.eql(200);
expect(typeof getResponse.body.cluster_rules.overdue.count).to.eql('number');
expect(typeof getResponse.body.cluster_rules.overdue.delay.p50).to.eql('number');
expect(typeof getResponse.body.cluster_rules.overdue.delay.p99).to.eql('number');
});
});
}