[Response Ops] Conditionally run background tasks based on node.roles configuration (#134388)

* Initial commit to optionally disable task polling

* Stops polling when flag is false. Removes runtime from health API. Updates health check to not use runtime when not polling

* Fixing types

* Updating tests

* Updating task manager plugin start to use node roles and added tests

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2022-07-05 18:06:09 -04:00 committed by GitHub
parent 575aed237b
commit 6a738e3175
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 331 additions and 224 deletions

View file

@ -14,6 +14,7 @@ import { TaskManagerConfig } from '../config';
export function calculateHealthStatus(
summarizedStats: RawMonitoringStats,
config: TaskManagerConfig,
shouldRunTasks: boolean,
logger: Logger
): HealthStatus {
const now = Date.now();
@ -30,9 +31,12 @@ export function calculateHealthStatus(
return HealthStatus.Error;
}
if (hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness)) {
logger.debug('setting HealthStatus.Error because of expired hot timestamps');
return HealthStatus.Error;
// Hot timestamps look at runtime stats which are not available when tasks are not running
if (shouldRunTasks) {
if (hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness)) {
logger.debug('setting HealthStatus.Error because of expired hot timestamps');
return HealthStatus.Error;
}
}
if (hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)) {

View file

@ -39,16 +39,16 @@ describe('logHealthMetrics', () => {
// We must change from OK to Warning
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.OK);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(
() => HealthStatus.Warning
);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
// We must change from OK to Error
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.OK);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.Error);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
const debugCalls = (logger as jest.Mocked<Logger>).debug.mock.calls;
const performanceMessage = /^Task Manager detected a degradation in performance/;
@ -76,9 +76,9 @@ describe('logHealthMetrics', () => {
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(
() => HealthStatus.Warning
);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.OK);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
expect((logger as jest.Mocked<Logger>).warn).not.toHaveBeenCalled();
});
@ -96,9 +96,9 @@ describe('logHealthMetrics', () => {
// We must change from Error to OK
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.Error);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.OK);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
expect((logger as jest.Mocked<Logger>).warn).not.toHaveBeenCalled();
});
@ -112,7 +112,7 @@ describe('logHealthMetrics', () => {
});
const health = getMockMonitoredHealth();
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
const firstDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '')
@ -130,7 +130,7 @@ describe('logHealthMetrics', () => {
});
const health = getMockMonitoredHealth();
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
const firstDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '')
@ -152,7 +152,7 @@ describe('logHealthMetrics', () => {
() => HealthStatus.Warning
);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
const logMessage = JSON.parse(
((logger as jest.Mocked<Logger>).warn.mock.calls[0][0] as string).replace(
@ -175,7 +175,7 @@ describe('logHealthMetrics', () => {
const { calculateHealthStatus } = jest.requireMock('./calculate_health_status');
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.Error);
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
const logMessage = JSON.parse(
((logger as jest.Mocked<Logger>).error.mock.calls[0][0] as string).replace(
@ -214,7 +214,7 @@ describe('logHealthMetrics', () => {
},
});
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
expect((logger as jest.Mocked<Logger>).warn.mock.calls[0][0] as string).toBe(
`Detected delay task start of 60s for task(s) \"taskType:test\" (which exceeds configured value of 60s)`
@ -257,7 +257,7 @@ describe('logHealthMetrics', () => {
},
});
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
expect((logger as jest.Mocked<Logger>).warn.mock.calls[0][0] as string).toBe(
`Detected delay task start of 60s for task(s) \"taskType:test, taskType:test2\" (which exceeds configured value of 60s)`
@ -288,7 +288,43 @@ describe('logHealthMetrics', () => {
stats: {},
};
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
const firstDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '')
);
expect(firstDebug).toMatchObject(health);
});
it('should log as debug if shouldRunTasks is false', () => {
const logger = loggingSystemMock.create().get();
const config = getTaskManagerConfig({
monitored_stats_health_verbose_log: {
enabled: true,
warn_delayed_task_start_in_seconds: 60,
},
});
const health = getMockMonitoredHealth({
stats: {
runtime: {
value: {
drift_by_type: {
'taskType:test': {
p99: 60000,
},
'taskType:test2': {
p99: 60000,
},
},
drift: {
p99: 60000,
},
},
},
},
});
logHealthMetrics(health, logger, config, false);
const firstDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '')
@ -312,7 +348,7 @@ describe('logHealthMetrics', () => {
},
});
logHealthMetrics(health, logger, config);
logHealthMetrics(health, logger, config, true);
const { calculateHealthStatus } = jest.requireMock('./calculate_health_status');
expect(calculateHealthStatus).toBeCalledTimes(1);

View file

@ -27,7 +27,8 @@ export function resetLastLogLevel() {
export function logHealthMetrics(
monitoredHealth: MonitoredHealth,
logger: Logger,
config: TaskManagerConfig
config: TaskManagerConfig,
shouldRunTasks: boolean
) {
let logLevel: LogLevel = LogLevel.Debug;
const enabled = config.monitored_stats_health_verbose_log.enabled;
@ -38,7 +39,12 @@ export function logHealthMetrics(
capacity_estimation: undefined,
},
};
const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config, logger);
const statusWithoutCapacity = calculateHealthStatus(
healthWithoutCapacity,
config,
shouldRunTasks,
logger
);
if (statusWithoutCapacity === HealthStatus.Warning) {
logLevel = LogLevel.Warn;
} else if (statusWithoutCapacity === HealthStatus.Error && !isEmpty(monitoredHealth.stats)) {
@ -51,7 +57,9 @@ export function logHealthMetrics(
const docLink = `https://www.elastic.co/guide/en/kibana/${docsBranch}/task-manager-health-monitoring.html`;
const detectedProblemMessage = `Task Manager detected a degradation in performance. This is usually temporary, and Kibana can recover automatically. If the problem persists, check the docs for troubleshooting information: ${docLink} .`;
if (enabled) {
// Drift looks at runtime stats which are not available when task manager is not running tasks
if (enabled && shouldRunTasks) {
const driftInSeconds = (monitoredHealth.stats.runtime?.value.drift.p99 ?? 0) / 1000;
if (
driftInSeconds >= config.monitored_stats_health_verbose_log.warn_delayed_task_start_in_seconds

View file

@ -27,23 +27,23 @@ export {
} from './monitoring_stats_stream';
export function createMonitoringStats(
taskPollingLifecycle: TaskPollingLifecycle,
ephemeralTaskLifecycle: EphemeralTaskLifecycle,
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
config: TaskManagerConfig,
managedConfig: ManagedConfiguration,
logger: Logger
logger: Logger,
taskPollingLifecycle?: TaskPollingLifecycle,
ephemeralTaskLifecycle?: EphemeralTaskLifecycle
): Observable<MonitoringStats> {
return createMonitoringStatsStream(
createAggregators(
taskPollingLifecycle,
ephemeralTaskLifecycle,
taskStore,
elasticsearchAndSOAvailability$,
config,
managedConfig,
logger
logger,
taskPollingLifecycle,
ephemeralTaskLifecycle
),
config
);

View file

@ -75,17 +75,17 @@ export interface RawMonitoringStats {
}
export function createAggregators(
taskPollingLifecycle: TaskPollingLifecycle,
ephemeralTaskLifecycle: EphemeralTaskLifecycle,
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
config: TaskManagerConfig,
managedConfig: ManagedConfiguration,
logger: Logger
logger: Logger,
taskPollingLifecycle?: TaskPollingLifecycle,
ephemeralTaskLifecycle?: EphemeralTaskLifecycle
): AggregatedStatProvider {
const aggregators: AggregatedStatProvider[] = [
createConfigurationAggregator(config, managedConfig),
createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window),
createWorkloadAggregator(
taskStore,
elasticsearchAndSOAvailability$,
@ -94,7 +94,12 @@ export function createAggregators(
logger
),
];
if (ephemeralTaskLifecycle.enabled) {
if (taskPollingLifecycle) {
aggregators.push(
createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window)
);
}
if (ephemeralTaskLifecycle && ephemeralTaskLifecycle.enabled) {
aggregators.push(
createEphemeralTaskAggregator(
ephemeralTaskLifecycle,

View file

@ -11,43 +11,79 @@ import { TaskManagerConfig } from './config';
import { Subject } from 'rxjs';
import { bufferCount, take } from 'rxjs/operators';
import { CoreStatus, ServiceStatusLevels } from '@kbn/core/server';
import { taskPollingLifecycleMock } from './polling_lifecycle.mock';
import { TaskPollingLifecycle } from './polling_lifecycle';
import type { TaskPollingLifecycle as TaskPollingLifecycleClass } from './polling_lifecycle';
import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import type { EphemeralTaskLifecycle as EphemeralTaskLifecycleClass } from './ephemeral_task_lifecycle';
let mockTaskPollingLifecycle = taskPollingLifecycleMock.create({});
jest.mock('./polling_lifecycle', () => {
return {
TaskPollingLifecycle: jest.fn().mockImplementation(() => {
return mockTaskPollingLifecycle;
}),
};
});
let mockEphemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({});
jest.mock('./ephemeral_task_lifecycle', () => {
return {
EphemeralTaskLifecycle: jest.fn().mockImplementation(() => {
return mockEphemeralTaskLifecycle;
}),
};
});
const coreStart = coreMock.createStart();
const pluginInitializerContextParams = {
max_workers: 10,
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
enabled: false,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
};
describe('TaskManagerPlugin', () => {
beforeEach(() => {
mockTaskPollingLifecycle = taskPollingLifecycleMock.create({});
(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).mockClear();
mockEphemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({});
(EphemeralTaskLifecycle as jest.Mock<EphemeralTaskLifecycleClass>).mockClear();
});
describe('setup', () => {
test('throws if no valid UUID is available', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
max_workers: 10,
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
enabled: false,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
});
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);
pluginInitializerContext.env.instanceUuid = '';
@ -60,39 +96,9 @@ describe('TaskManagerPlugin', () => {
});
test('throws if setup methods are called after start', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
max_workers: 10,
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
enabled: false,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
});
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
@ -135,37 +141,10 @@ describe('TaskManagerPlugin', () => {
test('it logs a warning when the unsafe `exclude_task_types` config is used', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
max_workers: 10,
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
enabled: false,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: false,
request_capacity: 10,
},
...pluginInitializerContextParams,
unsafe: {
exclude_task_types: ['*'],
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
});
const logger = pluginInitializerContext.logger.get();
@ -178,6 +157,38 @@ describe('TaskManagerPlugin', () => {
});
});
describe('start', () => {
test('should initialize task polling lifecycle if node.roles.backgroundTasks is true', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);
pluginInitializerContext.node.roles.backgroundTasks = true;
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined });
taskManagerPlugin.start(coreStart);
expect(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).toHaveBeenCalledTimes(1);
expect(
EphemeralTaskLifecycle as jest.Mock<EphemeralTaskLifecycleClass>
).toHaveBeenCalledTimes(1);
});
test('should not initialize task polling lifecycle if node.roles.backgroundTasks is false', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);
pluginInitializerContext.node.roles.backgroundTasks = false;
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined });
taskManagerPlugin.start(coreStart);
expect(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).not.toHaveBeenCalled();
expect(
EphemeralTaskLifecycle as jest.Mock<EphemeralTaskLifecycleClass>
).not.toHaveBeenCalled();
});
});
describe('getElasticsearchAndSOAvailability', () => {
test('returns true when both services are available', async () => {
const core$ = new Subject<CoreStatus>();

View file

@ -28,11 +28,10 @@ import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
import { healthRoute } from './routes';
import { createMonitoringStats, MonitoringStats } from './monitoring';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTask } from './task';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { registerTaskManagerUsageCollector } from './usage';
import { TASK_MANAGER_INDEX } from './constants';
export interface TaskManagerSetupContract {
/**
* @deprecated
@ -67,6 +66,7 @@ export class TaskManagerPlugin
private middleware: Middleware = createInitialMiddleware();
private elasticsearchAndSOAvailability$?: Observable<boolean>;
private monitoringStats$ = new Subject<MonitoringStats>();
private shouldRunBackgroundTasks: boolean;
private readonly kibanaVersion: PluginInitializerContext['env']['packageInfo']['version'];
constructor(private readonly initContext: PluginInitializerContext) {
@ -75,6 +75,7 @@ export class TaskManagerPlugin
this.config = initContext.config.get<TaskManagerConfig>();
this.definitions = new TaskTypeDictionary(this.logger);
this.kibanaVersion = initContext.env.packageInfo.version;
this.shouldRunBackgroundTasks = initContext.node.roles.backgroundTasks;
}
public setup(
@ -114,6 +115,7 @@ export class TaskManagerPlugin
kibanaIndexName: core.savedObjects.getKibanaIndex(),
getClusterClient: () =>
startServicesPromise.then(({ elasticsearch }) => elasticsearch.client),
shouldRunTasks: this.shouldRunBackgroundTasks,
});
core.status.derivedStatus$.subscribe((status) =>
@ -186,45 +188,47 @@ export class TaskManagerPlugin
startingPollInterval: this.config!.poll_interval,
});
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
unusedTypes: REMOVED_TYPES,
logger: this.logger,
executionContext,
taskStore,
usageCounter: this.usageCounter,
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
...managedConfiguration,
});
// Only poll for tasks if configured to run tasks
if (this.shouldRunBackgroundTasks) {
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
unusedTypes: REMOVED_TYPES,
logger: this.logger,
executionContext,
taskStore,
usageCounter: this.usageCounter,
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
...managedConfiguration,
});
this.ephemeralTaskLifecycle = new EphemeralTaskLifecycle({
config: this.config!,
definitions: this.definitions,
logger: this.logger,
executionContext,
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
pool: this.taskPollingLifecycle.pool,
lifecycleEvent: this.taskPollingLifecycle.events,
});
this.ephemeralTaskLifecycle = new EphemeralTaskLifecycle({
config: this.config!,
definitions: this.definitions,
logger: this.logger,
executionContext,
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
pool: this.taskPollingLifecycle.pool,
lifecycleEvent: this.taskPollingLifecycle.events,
});
}
createMonitoringStats(
this.taskPollingLifecycle,
this.ephemeralTaskLifecycle,
taskStore,
this.elasticsearchAndSOAvailability$!,
this.config!,
managedConfiguration,
this.logger
this.logger,
this.taskPollingLifecycle,
this.ephemeralTaskLifecycle
).subscribe((stat) => this.monitoringStats$.next(stat));
const taskScheduling = new TaskScheduling({
logger: this.logger,
taskStore,
middleware: this.middleware,
taskPollingLifecycle: this.taskPollingLifecycle,
ephemeralTaskLifecycle: this.ephemeralTaskLifecycle,
definitions: this.definitions,
taskManagerId: taskStore.taskManagerId,
@ -240,7 +244,8 @@ export class TaskManagerPlugin
runSoon: (...args) => taskScheduling.runSoon(...args),
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task),
supportsEphemeralTasks: () => this.config.ephemeral_tasks.enabled,
supportsEphemeralTasks: () =>
this.config.ephemeral_tasks.enabled && this.shouldRunBackgroundTasks,
};
}

View file

@ -64,6 +64,7 @@ describe('healthRoute', () => {
kibanaIndexName: '.kibana',
getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
const [config] = router.get.mock.calls[0];
@ -86,6 +87,7 @@ describe('healthRoute', () => {
kibanaIndexName: 'foo',
getClusterClient: () => Promise.resolve(mockClusterClient),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
const [, handler] = router.get.mock.calls[0];
@ -126,6 +128,7 @@ describe('healthRoute', () => {
kibanaIndexName: 'foo',
getClusterClient: () => Promise.resolve(mockClusterClient),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
const [, handler] = router.get.mock.calls[0];
@ -171,6 +174,7 @@ describe('healthRoute', () => {
kibanaVersion: '8.0',
kibanaIndexName: 'foo',
getClusterClient: () => Promise.resolve(mockClusterClient),
shouldRunTasks: true,
});
const [, handler] = router.get.mock.calls[0];
@ -212,6 +216,7 @@ describe('healthRoute', () => {
kibanaIndexName: '.kibana',
getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
stats$.next(mockStat);
@ -270,6 +275,7 @@ describe('healthRoute', () => {
kibanaIndexName: '.kibana',
getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
stats$.next(warnRuntimeStat);
@ -346,6 +352,7 @@ describe('healthRoute', () => {
kibanaIndexName: '.kibana',
getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
stats$.next(errorRuntimeStat);
@ -409,6 +416,7 @@ describe('healthRoute', () => {
kibanaIndexName: '.kibana',
getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
const serviceStatus = getLatest(serviceStatus$);
@ -490,6 +498,7 @@ describe('healthRoute', () => {
kibanaIndexName: '.kibana',
getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
await sleep(0);
@ -563,6 +572,7 @@ describe('healthRoute', () => {
kibanaIndexName: '.kibana',
getClusterClient: () => Promise.resolve(elasticsearchServiceMock.createClusterClient()),
usageCounter: mockUsageCounter,
shouldRunTasks: true,
});
await sleep(0);

View file

@ -57,6 +57,7 @@ export interface HealthRouteParams {
config: TaskManagerConfig;
kibanaVersion: string;
kibanaIndexName: string;
shouldRunTasks: boolean;
getClusterClient: () => Promise<IClusterClient>;
usageCounter?: UsageCounter;
}
@ -75,6 +76,7 @@ export function healthRoute(params: HealthRouteParams): {
kibanaIndexName,
getClusterClient,
usageCounter,
shouldRunTasks,
} = params;
// if "hot" health stats are any more stale than monitored_stats_required_freshness (pollInterval +1s buffer by default)
@ -83,7 +85,7 @@ export function healthRoute(params: HealthRouteParams): {
function getHealthStatus(monitoredStats: MonitoringStats) {
const summarizedStats = summarizeMonitoringStats(logger, monitoredStats, config);
const status = calculateHealthStatus(summarizedStats, config, logger);
const status = calculateHealthStatus(summarizedStats, config, shouldRunTasks, logger);
const now = Date.now();
const timestamp = new Date(now).toISOString();
return { id: taskManagerId, timestamp, status, ...summarizedStats };
@ -109,7 +111,7 @@ export function healthRoute(params: HealthRouteParams): {
.subscribe(([monitoredHealth, serviceStatus]) => {
serviceStatus$.next(serviceStatus);
monitoredHealth$.next(monitoredHealth);
logHealthMetrics(monitoredHealth, logger, config);
logHealthMetrics(monitoredHealth, logger, config, shouldRunTasks);
});
router.get(

View file

@ -10,7 +10,6 @@ import moment from 'moment';
import { asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { taskPollingLifecycleMock } from './polling_lifecycle.mock';
import { TaskScheduling } from './task_scheduling';
import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskStatus } from './task';
@ -35,11 +34,9 @@ jest.mock('elastic-apm-node', () => ({
describe('TaskScheduling', () => {
const mockTaskStore = taskStoreMock.create({});
const mockTaskManager = taskPollingLifecycleMock.create({});
const definitions = new TaskTypeDictionary(mockLogger());
const taskSchedulingOpts = {
taskStore: mockTaskStore,
taskPollingLifecycle: mockTaskManager,
logger: mockLogger(),
middleware: createInitialMiddleware(),
definitions,
@ -476,6 +473,28 @@ describe('TaskScheduling', () => {
`[Error: Ephemeral Task of type foo was rejected]`
);
});
test('rejects ephemeral task if ephemeralTaskLifecycle is not defined', async () => {
const ephemeralTask = mockTask({
state: {
foo: 'bar',
},
});
const middleware = createInitialMiddleware();
middleware.beforeSave = jest.fn().mockImplementation(async () => {
return { taskInstance: ephemeralTask };
});
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
middleware,
ephemeralTaskLifecycle: undefined,
});
const result = taskScheduling.ephemeralRunNow(ephemeralTask);
expect(result).rejects.toMatchInlineSnapshot(
`[Error: Ephemeral Task of type foo was rejected because ephemeral tasks are not supported]`
);
});
});
});

View file

@ -12,7 +12,7 @@ import { getOrElse, isSome, map as mapOptional, Option } from 'fp-ts/lib/Option'
import uuid from 'uuid';
import { chunk, pick } from 'lodash';
import { merge, Subject } from 'rxjs';
import { Subject } from 'rxjs';
import agent from 'elastic-apm-node';
import { Logger } from '@kbn/core/server';
import { mustBeAllOf } from './queries/query_clauses';
@ -42,7 +42,7 @@ import {
} from './task';
import { TaskStore } from './task_store';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { TaskTypeDictionary } from './task_type_dictionary';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTaskRejectedDueToCapacityError } from './task_running';
@ -52,8 +52,7 @@ const VERSION_CONFLICT_STATUS = 409;
export interface TaskSchedulingOpts {
logger: Logger;
taskStore: TaskStore;
taskPollingLifecycle: TaskPollingLifecycle;
ephemeralTaskLifecycle: EphemeralTaskLifecycle;
ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
middleware: Middleware;
definitions: TaskTypeDictionary;
taskManagerId: string;
@ -84,8 +83,7 @@ export interface RunNowResult {
export class TaskScheduling {
private store: TaskStore;
private taskPollingLifecycle: TaskPollingLifecycle;
private ephemeralTaskLifecycle: EphemeralTaskLifecycle;
private ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
private logger: Logger;
private middleware: Middleware;
private definitions: TaskTypeDictionary;
@ -99,7 +97,6 @@ export class TaskScheduling {
constructor(opts: TaskSchedulingOpts) {
this.logger = opts.logger;
this.middleware = opts.middleware;
this.taskPollingLifecycle = opts.taskPollingLifecycle;
this.ephemeralTaskLifecycle = opts.ephemeralTaskLifecycle;
this.store = opts.taskStore;
this.definitions = opts.definitions;
@ -239,6 +236,12 @@ export class TaskScheduling {
task: EphemeralTask,
options?: Record<string, unknown>
): Promise<RunNowResult> {
if (!this.ephemeralTaskLifecycle) {
throw new EphemeralTaskRejectedDueToCapacityError(
`Ephemeral Task of type ${task.taskType} was rejected because ephemeral tasks are not supported`,
task
);
}
const id = uuid.v4();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
@ -261,7 +264,7 @@ export class TaskScheduling {
.catch((err: Error) => {
reject(err);
});
const attemptToRunResult = this.ephemeralTaskLifecycle.attemptToRun({
const attemptToRunResult = this.ephemeralTaskLifecycle!.attemptToRun({
id,
scheduledAt: new Date(),
runAt: new Date(),
@ -307,64 +310,68 @@ export class TaskScheduling {
private awaitTaskRunResult(taskId: string, cancel?: Promise<void>): Promise<RunNowResult> {
return new Promise((resolve, reject) => {
if (!this.ephemeralTaskLifecycle) {
reject(
new Error(
`Failed to run task "${taskId}" because ephemeral tasks are not supported. Rescheduled the task to ensure it is picked up as soon as possible.`
)
);
}
// listen for all events related to the current task
const subscription = merge(
this.taskPollingLifecycle.events,
this.ephemeralTaskLifecycle.events
)
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: ClaimTaskErr) => {
const subscription = this.ephemeralTaskLifecycle!.events.pipe(
filter(({ id }: TaskLifecycleEvent) => id === taskId)
).subscribe((taskEvent: TaskLifecycleEvent) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: ClaimTaskErr) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (
isSome(error.task) &&
error.errorType === TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY
) {
const task = error.task.value;
const definition = this.definitions.get(task.taskType);
return reject(
new Error(
`Failed to run task "${taskId}" as we would exceed the max concurrency of "${
definition?.title ?? task.taskType
}" which is ${
definition?.maxConcurrency
}. Rescheduled the task to ensure it is picked up as soon as possible.`
)
);
} else {
return reject(await this.identifyTaskFailureReason(taskId, error.task));
}
}, taskEvent.event);
} else {
either<OkResultOf<TaskLifecycleEvent>, ErrResultOf<TaskLifecycleEvent>>(
taskEvent.event,
(taskInstance: OkResultOf<TaskLifecycleEvent>) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve(pick((taskInstance as RanTask).task, ['id', 'state']));
}
},
async (errorResult: ErrResultOf<TaskLifecycleEvent>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (
isSome(error.task) &&
error.errorType === TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY
) {
const task = error.task.value;
const definition = this.definitions.get(task.taskType);
return reject(
new Error(
`Failed to run task "${taskId}" as we would exceed the max concurrency of "${
definition?.title ?? task.taskType
}" which is ${
definition?.maxConcurrency
}. Rescheduled the task to ensure it is picked up as soon as possible.`
)
);
} else {
return reject(await this.identifyTaskFailureReason(taskId, error.task));
}
}, taskEvent.event);
} else {
either<OkResultOf<TaskLifecycleEvent>, ErrResultOf<TaskLifecycleEvent>>(
taskEvent.event,
(taskInstance: OkResultOf<TaskLifecycleEvent>) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve(pick((taskInstance as RanTask).task, ['id', 'state']));
}
},
async (errorResult: ErrResultOf<TaskLifecycleEvent>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
return reject(
new Error(
`Failed to run task "${taskId}": ${
isTaskRunRequestEvent(taskEvent)
? `Task Manager is at capacity, please try again later`
: isTaskRunEvent(taskEvent)
? `${(errorResult as ErroredTask).error}`
: `${errorResult}`
}`
)
);
}
);
}
});
return reject(
new Error(
`Failed to run task "${taskId}": ${
isTaskRunRequestEvent(taskEvent)
? `Task Manager is at capacity, please try again later`
: isTaskRunEvent(taskEvent)
? `${(errorResult as ErroredTask).error}`
: `${errorResult}`
}`
)
);
}
);
}
});
if (cancel) {
cancel.then(() => {