[ResponseOps][task manager] log event loop delay for tasks when over configured limit (#126300)

resolves https://github.com/elastic/kibana/issues/124366

Adds new task manager configuration keys.

- `xpack.task_manager.event_loop_delay.monitor` - whether to monitor
  event loop delay or not; added in case this specific monitoring
  causes other issues and we'd want to disable it.  We don't know
  of any cases where we'd need this today

- `xpack.task_manager.event_loop_delay.warn_threshold` - the number
  of milliseconds of event loop delay before logging a warning

This code uses the `perf_hooks.monitorEventLoopDelay()` API[1] to collect
the event loop delay while a task is running.

[1] https://nodejs.org/api/perf_hooks.html#perf_hooksmonitoreventloopdelayoptions

When a significant event loop delay is encountered, it's very likely
that other tasks running at the same time will be affected, and so
will also end up having a long event loop delay value, and warnings
will be logged on those.  Over time, though, tasks which have consistently
long event loop delays will outnumber those unfortunate peer tasks, and
be obvious from the volume in the logs.

To make it a bit easier to find these when viewing Kibana logs in Discover,
tags are added to the logged messages to make it easier to find them.  One
tag is `event-loop-blocked`, second is the task type, and the third is a string 
consisting of the task type and task id.
This commit is contained in:
Patrick Mueller 2022-03-23 10:28:43 -04:00 committed by GitHub
parent d7e17d78eb
commit b028cf97ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 194 additions and 3 deletions

View file

@ -40,6 +40,11 @@ These non-persisted action tasks have a risk that they won't be run at all if th
`xpack.task_manager.ephemeral_tasks.request_capacity`::
Sets the size of the ephemeral queue defined above. Defaults to 10.
`xpack.task_manager.event_loop_delay.monitor`::
Enables event loop delay monitoring, which will log a warning when a task causes an event loop delay which exceeds the `warn_threshold` setting. Defaults to true.
`xpack.task_manager.event_loop_delay.warn_threshold`::
Sets the amount of event loop delay during a task execution which will cause a warning to be logged. Defaults to 5000 milliseconds (5 seconds).
[float]
[[task-manager-health-settings]]

View file

@ -379,6 +379,8 @@ kibana_vars=(
xpack.task_manager.poll_interval
xpack.task_manager.request_capacity
xpack.task_manager.version_conflict_threshold
xpack.task_manager.event_loop_delay.monitor
xpack.task_manager.event_loop_delay.warn_threshold
xpack.uptime.index
)

View file

@ -16,6 +16,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
@ -62,6 +66,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
@ -106,6 +114,10 @@ describe('config validation', () => {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,

View file

@ -41,6 +41,14 @@ export const taskExecutionFailureThresholdSchema = schema.object(
}
);
const eventLoopDelaySchema = schema.object({
monitor: schema.boolean({ defaultValue: true }),
warn_threshold: schema.number({
defaultValue: 5000,
min: 10,
}),
});
export const configSchema = schema.object(
{
/* The maximum number of times a task will be attempted before being abandoned as failed */
@ -118,6 +126,7 @@ export const configSchema = schema.object(
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
event_loop_delay: eventLoopDelaySchema,
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
@ -138,3 +147,4 @@ export const configSchema = schema.object(
export type TaskManagerConfig = TypeOf<typeof configSchema>;
export type TaskExecutionFailureThreshold = TypeOf<typeof taskExecutionFailureThresholdSchema>;
export type EventLoopDelayConfig = TypeOf<typeof eventLoopDelaySchema>;

View file

@ -69,6 +69,10 @@ describe('EphemeralTaskLifecycle', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
...config,
},
elasticsearchAndSOAvailability$,

View file

@ -57,6 +57,10 @@ describe.skip('managed configuration', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
});
logger = context.logger.get('taskManager');

View file

@ -40,6 +40,10 @@ describe('Configuration Statistics Aggregator', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
};
const managedConfig = {

View file

@ -44,6 +44,10 @@ describe('createMonitoringStatsStream', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
};
it('returns the initial config used to configure Task Manager', async () => {

View file

@ -43,6 +43,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
});
pluginInitializerContext.env.instanceUuid = '';
@ -84,6 +88,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
});
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
@ -154,6 +162,10 @@ describe('TaskManagerPlugin', () => {
unsafe: {
exclude_task_types: ['*'],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
});
const logger = pluginInitializerContext.logger.get();

View file

@ -67,6 +67,10 @@ describe('TaskPollingLifecycle', () => {
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
},
taskStore: mockTaskStore,
logger: taskManagerLogger,

View file

@ -91,6 +91,7 @@ export class TaskPollingLifecycle {
private middleware: Middleware;
private usageCounter?: UsageCounter;
private config: TaskManagerConfig;
/**
* Initializes the task manager, preventing any further addition of middleware,
@ -117,6 +118,7 @@ export class TaskPollingLifecycle {
this.store = taskStore;
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.config = config;
const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event);
@ -240,6 +242,7 @@ export class TaskPollingLifecycle {
defaultMaxAttempts: this.taskClaiming.maxAttempts,
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
});
};

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { startTaskTimer, startTaskTimerWithEventLoopMonitoring } from './task_events';
const DelayIterations = 4;
const DelayMillis = 250;
const DelayTotal = DelayIterations * DelayMillis;
async function nonBlockingDelay(millis: number) {
await new Promise((resolve) => setTimeout(resolve, millis));
}
async function blockingDelay(millis: number) {
// get task in async queue
await nonBlockingDelay(0);
const end = Date.now() + millis;
// eslint-disable-next-line no-empty
while (Date.now() < end) {}
}
async function nonBlockingTask() {
for (let i = 0; i < DelayIterations; i++) {
await nonBlockingDelay(DelayMillis);
}
}
async function blockingTask() {
for (let i = 0; i < DelayIterations; i++) {
await blockingDelay(DelayMillis);
}
}
describe('task_events', () => {
test('startTaskTimer', async () => {
const stopTaskTimer = startTaskTimer();
await nonBlockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBe(undefined);
});
describe('startTaskTimerWithEventLoopMonitoring', () => {
test('non-blocking', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: true,
warn_threshold: 5000,
});
await nonBlockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBeLessThan(DelayMillis);
});
test('blocking', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: true,
warn_threshold: 5000,
});
await blockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).not.toBeLessThan(DelayMillis);
});
test('not monitoring', async () => {
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring({
monitor: false,
warn_threshold: 5000,
});
await blockingTask();
const result = stopTaskTimer();
expect(result.stop - result.start).not.toBeLessThan(DelayTotal);
expect(result.eventLoopBlockMs).toBe(0);
});
});
});

View file

@ -5,6 +5,8 @@
* 2.0.
*/
import { monitorEventLoopDelay } from 'perf_hooks';
import { Option } from 'fp-ts/lib/Option';
import { ConcreteTaskInstance } from './task';
@ -14,6 +16,7 @@ import { ClaimAndFillPoolResult } from './lib/fill_pool';
import { PollingError } from './polling';
import { TaskRunResult } from './task_running';
import { EphemeralTaskInstanceRequest } from './ephemeral_task_lifecycle';
import type { EventLoopDelayConfig } from './config';
export enum TaskPersistence {
Recurring = 'recurring',
@ -40,6 +43,7 @@ export enum TaskClaimErrorType {
export interface TaskTiming {
start: number;
stop: number;
eventLoopBlockMs?: number;
}
export type WithTaskTiming<T> = T & { timing: TaskTiming };
@ -48,6 +52,22 @@ export function startTaskTimer(): () => TaskTiming {
return () => ({ start, stop: Date.now() });
}
export function startTaskTimerWithEventLoopMonitoring(
eventLoopDelayConfig: EventLoopDelayConfig
): () => TaskTiming {
const stopTaskTimer = startTaskTimer();
const eldHistogram = eventLoopDelayConfig.monitor ? monitorEventLoopDelay() : null;
eldHistogram?.enable();
return () => {
const { start, stop } = stopTaskTimer();
eldHistogram?.disable();
const eldMax = eldHistogram?.max ?? 0;
const eventLoopBlockMs = Math.round(eldMax / 1000 / 1000); // original in nanoseconds
return { start, stop, eventLoopBlockMs };
};
}
export interface TaskEvent<OkResult, ErrorResult, ID = string> {
id?: ID;
timing?: TaskTiming;

View file

@ -1528,7 +1528,11 @@ describe('TaskManagerRunner', () => {
function withAnyTiming(taskRun: TaskRun) {
return {
...taskRun,
timing: { start: expect.any(Number), stop: expect.any(Number) },
timing: {
start: expect.any(Number),
stop: expect.any(Number),
eventLoopBlockMs: expect.any(Number),
},
};
}
@ -1590,6 +1594,10 @@ describe('TaskManagerRunner', () => {
onTaskEvent: opts.onTaskEvent,
executionContext,
usageCounter,
eventLoopDelayConfig: {
monitor: true,
warn_threshold: 5000,
},
});
if (stage === TaskRunningStage.READY_TO_RUN) {

View file

@ -38,7 +38,7 @@ import {
TaskMarkRunning,
asTaskRunEvent,
asTaskMarkRunningEvent,
startTaskTimer,
startTaskTimerWithEventLoopMonitoring,
TaskTiming,
TaskPersistence,
} from '../task_events';
@ -56,6 +56,7 @@ import {
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig } from '../config';
const defaultBackoffPerFailure = 5 * 60 * 1000;
export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };
@ -105,6 +106,7 @@ type Opts = {
defaultMaxAttempts: number;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;
export enum TaskRunResult {
@ -152,6 +154,7 @@ export class TaskManagerRunner implements TaskRunner {
private uuid: string;
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
/**
* Creates an instance of TaskManagerRunner.
@ -174,6 +177,7 @@ export class TaskManagerRunner implements TaskRunner {
onTaskEvent = identity,
executionContext,
usageCounter,
eventLoopDelayConfig,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
@ -186,6 +190,7 @@ export class TaskManagerRunner implements TaskRunner {
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.uuid = uuid.v4();
this.eventLoopDelayConfig = eventLoopDelayConfig;
}
/**
@ -292,7 +297,7 @@ export class TaskManagerRunner implements TaskRunner {
taskInstance: this.instance.task,
});
const stopTaskTimer = startTaskTimer();
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);
try {
this.task = this.definition.createTaskRunner(modifiedContext);
@ -617,6 +622,18 @@ export class TaskManagerRunner implements TaskRunner {
);
}
);
const { eventLoopBlockMs = 0 } = taskTiming;
const taskLabel = `${this.taskType} ${this.instance.task.id}`;
if (eventLoopBlockMs > this.eventLoopDelayConfig.warn_threshold) {
this.logger.warn(
`event loop blocked for at least ${eventLoopBlockMs} ms while running task ${taskLabel}`,
{
tags: [this.taskType, taskLabel, 'event-loop-blocked'],
}
);
}
return result;
}