[Task Manager] Log at different levels based on the state (#101751)

* Log at different levels based on the state

* Fix types and add tests

* Remove unnecessary code

* Add more descriptive message

* Partially fix failing tests

* Move into separate function

* Get rid of customStatus in favor of moving the logging logic to a separate, mockable function

* Remove debug logging

* Do not log as an error if the stats are empty

* PR feedback

* Add docker whitelist

* alpha order

* English is hard

* Removing extra newline

* PR feedback around ignoring capacity estimation

* Move json utils
This commit is contained in:
Chris Roberson 2021-06-16 15:20:28 -04:00 committed by GitHub
parent adc95c1023
commit ab2a80f4b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 583 additions and 87 deletions

View file

@ -28,6 +28,9 @@ Task Manager runs background tasks by polling for work on an interval. You can
| `xpack.task_manager.max_workers`
| The maximum number of tasks that this Kibana instance will run simultaneously. Defaults to 10.
Starting in 8.0, it will not be possible to set the value greater than 100.
| `xpack.task_manager.monitored_stats_warn_delayed_task_start_in_seconds`
| The amount of seconds we allow a task to delay before printing a warning server log. Defaults to 60.
|===
[float]

View file

@ -322,6 +322,7 @@ kibana_vars=(
xpack.task_manager.monitored_aggregated_stats_refresh_rate
xpack.task_manager.monitored_stats_required_freshness
xpack.task_manager.monitored_stats_running_average_window
xpack.task_manager.monitored_stats_warn_delayed_task_start_in_seconds
xpack.task_manager.monitored_task_execution_thresholds
xpack.task_manager.poll_interval
xpack.task_manager.request_capacity

View file

@ -20,6 +20,7 @@ describe('config validation', () => {
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_required_freshness": 4000,
"monitored_stats_running_average_window": 50,
"monitored_stats_warn_delayed_task_start_in_seconds": 60,
"monitored_task_execution_thresholds": Object {
"custom": Object {},
"default": Object {
@ -68,6 +69,7 @@ describe('config validation', () => {
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_required_freshness": 4000,
"monitored_stats_running_average_window": 50,
"monitored_stats_warn_delayed_task_start_in_seconds": 60,
"monitored_task_execution_thresholds": Object {
"custom": Object {},
"default": Object {
@ -103,6 +105,7 @@ describe('config validation', () => {
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_required_freshness": 4000,
"monitored_stats_running_average_window": 50,
"monitored_stats_warn_delayed_task_start_in_seconds": 60,
"monitored_task_execution_thresholds": Object {
"custom": Object {
"alerting:always-fires": Object {

View file

@ -18,6 +18,7 @@ export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
// Refresh aggregated monitored stats at a default rate of once a minute
export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000;
export const DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW = 50;
export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60;
export const taskExecutionFailureThresholdSchema = schema.object(
{
@ -109,6 +110,10 @@ export const configSchema = schema.object(
defaultValue: {},
}),
}),
/* The amount of seconds we allow a task to delay before printing a warning server log */
monitored_stats_warn_delayed_task_start_in_seconds: schema.number({
defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS,
}),
},
{
validate: (config) => {

View file

@ -37,6 +37,7 @@ describe('managed configuration', () => {
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_warn_delayed_task_start_in_seconds: 60,
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
const createCalculateHealthStatusMock = () => {
return jest.fn();
};
export const calculateHealthStatusMock = {
create: createCalculateHealthStatusMock,
};

View file

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isString } from 'lodash';
import { JsonValue } from '@kbn/common-utils';
import { HealthStatus, RawMonitoringStats } from '../monitoring';
import { TaskManagerConfig } from '../config';
export function calculateHealthStatus(
summarizedStats: RawMonitoringStats,
config: TaskManagerConfig
): HealthStatus {
const now = Date.now();
// if "hot" health stats are any more stale than monitored_stats_required_freshness (pollInterval +1s buffer by default)
// consider the system unhealthy
const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness;
// if "cold" health stats are any more stale than the configured refresh (+ a buffer), consider the system unhealthy
const requiredColdStatsFreshness: number = config.monitored_aggregated_stats_refresh_rate * 1.5;
/**
* If the monitored stats aren't fresh, return a red status
*/
const healthStatus =
hasStatus(summarizedStats.stats, HealthStatus.Error) ||
hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness) ||
hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)
? HealthStatus.Error
: hasStatus(summarizedStats.stats, HealthStatus.Warning)
? HealthStatus.Warning
: HealthStatus.OK;
return healthStatus;
}
function hasStatus(stats: RawMonitoringStats['stats'], status: HealthStatus): boolean {
return Object.values(stats)
.map((stat) => stat?.status === status)
.includes(true);
}
/**
* If certain "hot" stats are not fresh, then the _health api will should return a Red status
* @param monitoringStats The monitored stats
* @param now The time to compare against
* @param requiredFreshness How fresh should these stats be
*/
function hasExpiredHotTimestamps(
monitoringStats: RawMonitoringStats,
now: number,
requiredFreshness: number
): boolean {
const diff =
now -
getOldestTimestamp(
monitoringStats.last_update,
monitoringStats.stats.runtime?.value.polling.last_successful_poll
);
return diff > requiredFreshness;
}
function hasExpiredColdTimestamps(
monitoringStats: RawMonitoringStats,
now: number,
requiredFreshness: number
): boolean {
return now - getOldestTimestamp(monitoringStats.stats.workload?.timestamp) > requiredFreshness;
}
function getOldestTimestamp(...timestamps: Array<JsonValue | undefined>): number {
const validTimestamps = timestamps
.map((timestamp) => (isString(timestamp) ? Date.parse(timestamp) : NaN))
.filter((timestamp) => !isNaN(timestamp));
return validTimestamps.length ? Math.min(...validTimestamps) : 0;
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
const createLogHealthMetricsMock = () => {
return jest.fn();
};
export const logHealthMetricsMock = {
create: createLogHealthMetricsMock,
};

View file

@ -0,0 +1,262 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { merge } from 'lodash';
import { loggingSystemMock } from 'src/core/server/mocks';
import { configSchema, TaskManagerConfig } from '../config';
import { HealthStatus } from '../monitoring';
import { TaskPersistence } from '../monitoring/task_run_statistics';
import { MonitoredHealth } from '../routes/health';
import { logHealthMetrics } from './log_health_metrics';
import { Logger } from '../../../../../src/core/server';
jest.mock('./calculate_health_status', () => ({
calculateHealthStatus: jest.fn(),
}));
describe('logHealthMetrics', () => {
afterEach(() => {
const { calculateHealthStatus } = jest.requireMock('./calculate_health_status');
(calculateHealthStatus as jest.Mock<HealthStatus>).mockReset();
});
it('should log as debug if status is OK', () => {
const logger = loggingSystemMock.create().get();
const config = getTaskManagerConfig({
monitored_stats_warn_delayed_task_start_in_seconds: 60,
});
const health = getMockMonitoredHealth();
logHealthMetrics(health, logger, config);
const firstDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '')
);
expect(firstDebug).toMatchObject(health);
});
it('should log as warn if status is Warn', () => {
const logger = loggingSystemMock.create().get();
const config = getTaskManagerConfig({
monitored_stats_warn_delayed_task_start_in_seconds: 60,
});
const health = getMockMonitoredHealth();
const { calculateHealthStatus } = jest.requireMock('./calculate_health_status');
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(
() => HealthStatus.Warning
);
logHealthMetrics(health, logger, config);
const logMessage = JSON.parse(
((logger as jest.Mocked<Logger>).warn.mock.calls[0][0] as string).replace(
'Latest Monitored Stats: ',
''
)
);
expect(logMessage).toMatchObject(health);
});
it('should log as error if status is Error', () => {
const logger = loggingSystemMock.create().get();
const config = getTaskManagerConfig({
monitored_stats_warn_delayed_task_start_in_seconds: 60,
});
const health = getMockMonitoredHealth();
const { calculateHealthStatus } = jest.requireMock('./calculate_health_status');
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.Error);
logHealthMetrics(health, logger, config);
const logMessage = JSON.parse(
((logger as jest.Mocked<Logger>).error.mock.calls[0][0] as string).replace(
'Latest Monitored Stats: ',
''
)
);
expect(logMessage).toMatchObject(health);
});
it('should log as warn if drift exceeds the threshold', () => {
const logger = loggingSystemMock.create().get();
const config = getTaskManagerConfig({
monitored_stats_warn_delayed_task_start_in_seconds: 60,
});
const health = getMockMonitoredHealth({
stats: {
runtime: {
value: {
drift: {
p99: 60000,
},
},
},
},
});
logHealthMetrics(health, logger, config);
expect((logger as jest.Mocked<Logger>).warn.mock.calls[0][0] as string).toBe(
`Detected delay task start of 60s (which exceeds configured value of 60s)`
);
const secondMessage = JSON.parse(
((logger as jest.Mocked<Logger>).warn.mock.calls[1][0] as string).replace(
`Latest Monitored Stats: `,
''
)
);
expect(secondMessage).toMatchObject(health);
});
it('should log as debug if there are no stats', () => {
const logger = loggingSystemMock.create().get();
const config = getTaskManagerConfig({
monitored_stats_warn_delayed_task_start_in_seconds: 60,
});
const health = {
id: '1',
status: HealthStatus.OK,
timestamp: new Date().toISOString(),
last_update: new Date().toISOString(),
stats: {},
};
logHealthMetrics(health, logger, config);
const firstDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '')
);
expect(firstDebug).toMatchObject(health);
});
it('should ignore capacity estimation status', () => {
const logger = loggingSystemMock.create().get();
const config = getTaskManagerConfig({
monitored_stats_warn_delayed_task_start_in_seconds: 60,
});
const health = getMockMonitoredHealth({
stats: {
capacity_estimation: {
status: HealthStatus.Warning,
},
},
});
logHealthMetrics(health, logger, config);
const { calculateHealthStatus } = jest.requireMock('./calculate_health_status');
expect(calculateHealthStatus).toBeCalledTimes(1);
expect(calculateHealthStatus.mock.calls[0][0].stats.capacity_estimation).toBeUndefined();
});
});
function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
const stub: MonitoredHealth = {
id: '1',
status: HealthStatus.OK,
timestamp: new Date().toISOString(),
last_update: new Date().toISOString(),
stats: {
configuration: {
timestamp: new Date().toISOString(),
status: HealthStatus.OK,
value: {
max_workers: 10,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
},
},
workload: {
timestamp: new Date().toISOString(),
status: HealthStatus.OK,
value: {
count: 4,
task_types: {
actions_telemetry: { count: 2, status: { idle: 2 } },
alerting_telemetry: { count: 1, status: { idle: 1 } },
session_cleanup: { count: 1, status: { idle: 1 } },
},
schedule: [],
overdue: 0,
overdue_non_recurring: 0,
estimatedScheduleDensity: [],
non_recurring: 20,
owner_ids: 2,
estimated_schedule_density: [],
capacity_requirments: {
per_minute: 150,
per_hour: 360,
per_day: 820,
},
},
},
runtime: {
timestamp: new Date().toISOString(),
status: HealthStatus.OK,
value: {
drift: {
p50: 1000,
p90: 2000,
p95: 2500,
p99: 3000,
},
drift_by_type: {},
load: {
p50: 1000,
p90: 2000,
p95: 2500,
p99: 3000,
},
execution: {
duration: {},
duration_by_persistence: {},
persistence: {
[TaskPersistence.Recurring]: 10,
[TaskPersistence.NonRecurring]: 10,
[TaskPersistence.Ephemeral]: 10,
},
result_frequency_percent_as_number: {},
},
polling: {
last_successful_poll: new Date().toISOString(),
duration: [500, 400, 3000],
claim_conflicts: [0, 100, 75],
claim_mismatches: [0, 100, 75],
result_frequency_percent_as_number: [
'NoTasksClaimed',
'NoTasksClaimed',
'NoTasksClaimed',
],
},
},
},
},
};
return (merge(stub, overrides) as unknown) as MonitoredHealth;
}
function getTaskManagerConfig(overrides: Partial<TaskManagerConfig> = {}) {
return configSchema.validate(
overrides.monitored_stats_required_freshness
? {
// use `monitored_stats_required_freshness` as poll interval otherwise we might
// fail validation as it must be greather than the poll interval
poll_interval: overrides.monitored_stats_required_freshness,
...overrides,
}
: overrides
);
}

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isEmpty } from 'lodash';
import { Logger } from '../../../../../src/core/server';
import { HealthStatus } from '../monitoring';
import { TaskManagerConfig } from '../config';
import { MonitoredHealth } from '../routes/health';
import { calculateHealthStatus } from './calculate_health_status';
export function logHealthMetrics(
monitoredHealth: MonitoredHealth,
logger: Logger,
config: TaskManagerConfig
) {
const healthWithoutCapacity: MonitoredHealth = {
...monitoredHealth,
stats: {
...monitoredHealth.stats,
capacity_estimation: undefined,
},
};
const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config);
let logAsWarn = statusWithoutCapacity === HealthStatus.Warning;
const logAsError =
statusWithoutCapacity === HealthStatus.Error && !isEmpty(monitoredHealth.stats);
const driftInSeconds = (monitoredHealth.stats.runtime?.value.drift.p99 ?? 0) / 1000;
if (driftInSeconds >= config.monitored_stats_warn_delayed_task_start_in_seconds) {
logger.warn(
`Detected delay task start of ${driftInSeconds}s (which exceeds configured value of ${config.monitored_stats_warn_delayed_task_start_in_seconds}s)`
);
logAsWarn = true;
}
if (logAsError) {
logger.error(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`);
} else if (logAsWarn) {
logger.warn(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`);
} else {
logger.debug(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`);
}
}

View file

@ -23,6 +23,7 @@ describe('Configuration Statistics Aggregator', () => {
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_warn_delayed_task_start_in_seconds: 60,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {

View file

@ -27,6 +27,7 @@ describe('createMonitoringStatsStream', () => {
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_warn_delayed_task_start_in_seconds: 60,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {

View file

@ -51,7 +51,6 @@ interface MonitoredStat<T> {
timestamp: string;
value: T;
}
export type RawMonitoredStat<T extends JsonObject> = MonitoredStat<T> & {
status: HealthStatus;
};

View file

@ -25,6 +25,7 @@ describe('TaskManagerPlugin', () => {
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_warn_delayed_task_start_in_seconds: 60,
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
@ -55,6 +56,7 @@ describe('TaskManagerPlugin', () => {
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_warn_delayed_task_start_in_seconds: 60,
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {

View file

@ -45,6 +45,7 @@ describe('TaskPollingLifecycle', () => {
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_warn_delayed_task_start_in_seconds: 60,
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {

View file

@ -14,10 +14,19 @@ import { healthRoute } from './health';
import { mockHandlerArguments } from './_mock_handler_arguments';
import { sleep } from '../test_utils';
import { loggingSystemMock } from '../../../../../src/core/server/mocks';
import { Logger } from '../../../../../src/core/server';
import { MonitoringStats, RawMonitoringStats, summarizeMonitoringStats } from '../monitoring';
import {
HealthStatus,
MonitoringStats,
RawMonitoringStats,
summarizeMonitoringStats,
} from '../monitoring';
import { ServiceStatusLevels } from 'src/core/server';
import { configSchema, TaskManagerConfig } from '../config';
import { calculateHealthStatusMock } from '../lib/calculate_health_status.mock';
jest.mock('../lib/log_health_metrics', () => ({
logHealthMetrics: jest.fn(),
}));
describe('healthRoute', () => {
beforeEach(() => {
@ -38,6 +47,9 @@ describe('healthRoute', () => {
it('logs the Task Manager stats at a fixed interval', async () => {
const router = httpServiceMock.createRouter();
const logger = loggingSystemMock.create().get();
const calculateHealthStatus = calculateHealthStatusMock.create();
calculateHealthStatus.mockImplementation(() => HealthStatus.OK);
const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics');
const mockStat = mockHealthStats();
await sleep(10);
@ -55,6 +67,7 @@ describe('healthRoute', () => {
id,
getTaskManagerConfig({
monitored_stats_required_freshness: 1000,
monitored_stats_warn_delayed_task_start_in_seconds: 100,
monitored_aggregated_stats_refresh_rate: 60000,
})
);
@ -65,35 +78,137 @@ describe('healthRoute', () => {
await sleep(600);
stats$.next(nextMockStat);
const firstDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '')
);
expect(firstDebug).toMatchObject({
expect(logHealthMetrics).toBeCalledTimes(2);
expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(summarizeMonitoringStats(mockStat, getTaskManagerConfig({}))),
});
const secondDebug = JSON.parse(
(logger as jest.Mocked<Logger>).debug.mock.calls[1][0].replace('Latest Monitored Stats: ', '')
);
expect(secondDebug).not.toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(skippedMockStat, getTaskManagerConfig({}))
),
});
expect(secondDebug).toMatchObject({
expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(summarizeMonitoringStats(nextMockStat, getTaskManagerConfig({}))),
});
});
expect(logger.debug).toHaveBeenCalledTimes(2);
it(`logs at a warn level if the status is warning`, async () => {
const router = httpServiceMock.createRouter();
const logger = loggingSystemMock.create().get();
const calculateHealthStatus = calculateHealthStatusMock.create();
calculateHealthStatus.mockImplementation(() => HealthStatus.Warning);
const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics');
const warnRuntimeStat = mockHealthStats();
const warnConfigurationStat = mockHealthStats();
const warnWorkloadStat = mockHealthStats();
const stats$ = new Subject<MonitoringStats>();
const id = uuid.v4();
healthRoute(
router,
stats$,
logger,
id,
getTaskManagerConfig({
monitored_stats_required_freshness: 1000,
monitored_stats_warn_delayed_task_start_in_seconds: 120,
monitored_aggregated_stats_refresh_rate: 60000,
})
);
stats$.next(warnRuntimeStat);
await sleep(1001);
stats$.next(warnConfigurationStat);
await sleep(1001);
stats$.next(warnWorkloadStat);
expect(logHealthMetrics).toBeCalledTimes(3);
expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(warnRuntimeStat, getTaskManagerConfig({}))
),
});
expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(warnConfigurationStat, getTaskManagerConfig({}))
),
});
expect(logHealthMetrics.mock.calls[2][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(warnWorkloadStat, getTaskManagerConfig({}))
),
});
});
it(`logs at an error level if the status is error`, async () => {
const router = httpServiceMock.createRouter();
const logger = loggingSystemMock.create().get();
const calculateHealthStatus = calculateHealthStatusMock.create();
calculateHealthStatus.mockImplementation(() => HealthStatus.Error);
const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics');
const errorRuntimeStat = mockHealthStats();
const errorConfigurationStat = mockHealthStats();
const errorWorkloadStat = mockHealthStats();
const stats$ = new Subject<MonitoringStats>();
const id = uuid.v4();
healthRoute(
router,
stats$,
logger,
id,
getTaskManagerConfig({
monitored_stats_required_freshness: 1000,
monitored_stats_warn_delayed_task_start_in_seconds: 120,
monitored_aggregated_stats_refresh_rate: 60000,
})
);
stats$.next(errorRuntimeStat);
await sleep(1001);
stats$.next(errorConfigurationStat);
await sleep(1001);
stats$.next(errorWorkloadStat);
expect(logHealthMetrics).toBeCalledTimes(3);
expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(errorRuntimeStat, getTaskManagerConfig({}))
),
});
expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(errorConfigurationStat, getTaskManagerConfig({}))
),
});
expect(logHealthMetrics.mock.calls[2][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(errorWorkloadStat, getTaskManagerConfig({}))
),
});
});
it('returns a error status if the overall stats have not been updated within the required hot freshness', async () => {

View file

@ -15,8 +15,6 @@ import {
import { Observable, Subject } from 'rxjs';
import { tap, map } from 'rxjs/operators';
import { throttleTime } from 'rxjs/operators';
import { isString } from 'lodash';
import { JsonValue } from '@kbn/common-utils';
import { Logger, ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import {
MonitoringStats,
@ -25,8 +23,14 @@ import {
RawMonitoringStats,
} from '../monitoring';
import { TaskManagerConfig } from '../config';
import { logHealthMetrics } from '../lib/log_health_metrics';
import { calculateHealthStatus } from '../lib/calculate_health_status';
type MonitoredHealth = RawMonitoringStats & { id: string; status: HealthStatus; timestamp: string };
export type MonitoredHealth = RawMonitoringStats & {
id: string;
status: HealthStatus;
timestamp: string;
};
const LEVEL_SUMMARY = {
[ServiceStatusLevels.available.toString()]: 'Task Manager is healthy',
@ -54,26 +58,12 @@ export function healthRoute(
// consider the system unhealthy
const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness;
// if "cold" health stats are any more stale than the configured refresh (+ a buffer), consider the system unhealthy
const requiredColdStatsFreshness: number = config.monitored_aggregated_stats_refresh_rate * 1.5;
function calculateStatus(monitoredStats: MonitoringStats): MonitoredHealth {
function getHealthStatus(monitoredStats: MonitoringStats) {
const summarizedStats = summarizeMonitoringStats(monitoredStats, config);
const status = calculateHealthStatus(summarizedStats, config);
const now = Date.now();
const timestamp = new Date(now).toISOString();
const summarizedStats = summarizeMonitoringStats(monitoredStats, config);
/**
* If the monitored stats aren't fresh, return a red status
*/
const healthStatus =
hasStatus(summarizedStats.stats, HealthStatus.Error) ||
hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness) ||
hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)
? HealthStatus.Error
: hasStatus(summarizedStats.stats, HealthStatus.Warning)
? HealthStatus.Warning
: HealthStatus.OK;
return { id: taskManagerId, timestamp, status: healthStatus, ...summarizedStats };
return { id: taskManagerId, timestamp, status, ...summarizedStats };
}
const serviceStatus$: Subject<TaskManagerServiceStatus> = new Subject<TaskManagerServiceStatus>();
@ -90,11 +80,11 @@ export function healthRoute(
}),
// Only calculate the summerized stats (calculates all runnign averages and evaluates state)
// when needed by throttling down to the requiredHotStatsFreshness
map((stats) => withServiceStatus(calculateStatus(stats)))
map((stats) => withServiceStatus(getHealthStatus(stats)))
)
.subscribe(([monitoredHealth, serviceStatus]) => {
serviceStatus$.next(serviceStatus);
logger.debug(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`);
logHealthMetrics(monitoredHealth, logger, config);
});
router.get(
@ -109,7 +99,7 @@ export function healthRoute(
): Promise<IKibanaResponse> {
return res.ok({
body: lastMonitoredStats
? calculateStatus(lastMonitoredStats)
? getHealthStatus(lastMonitoredStats)
: { id: taskManagerId, timestamp: new Date().toISOString(), status: HealthStatus.Error },
});
}
@ -134,45 +124,3 @@ export function withServiceStatus(
},
];
}
/**
* If certain "hot" stats are not fresh, then the _health api will should return a Red status
* @param monitoringStats The monitored stats
* @param now The time to compare against
* @param requiredFreshness How fresh should these stats be
*/
function hasExpiredHotTimestamps(
monitoringStats: RawMonitoringStats,
now: number,
requiredFreshness: number
): boolean {
return (
now -
getOldestTimestamp(
monitoringStats.last_update,
monitoringStats.stats.runtime?.value.polling.last_successful_poll
) >
requiredFreshness
);
}
function hasExpiredColdTimestamps(
monitoringStats: RawMonitoringStats,
now: number,
requiredFreshness: number
): boolean {
return now - getOldestTimestamp(monitoringStats.stats.workload?.timestamp) > requiredFreshness;
}
function hasStatus(stats: RawMonitoringStats['stats'], status: HealthStatus): boolean {
return Object.values(stats)
.map((stat) => stat?.status === status)
.includes(true);
}
function getOldestTimestamp(...timestamps: Array<JsonValue | undefined>): number {
const validTimestamps = timestamps
.map((timestamp) => (isString(timestamp) ? Date.parse(timestamp) : NaN))
.filter((timestamp) => !isNaN(timestamp));
return validTimestamps.length ? Math.min(...validTimestamps) : 0;
}