mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 01:13:23 -04:00
[ResponseOps][Alerting] Update Kibana Alerting scaling metrics to be running counts (#145296)
Resolves https://github.com/elastic/kibana/issues/145289 ## Summary This pr removed a field and fixes a bug with the previous metrics and keeps a running count for the life of the kibana. ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios ### To verify - Create a rule and hit the `/internal/task_manager/_background_task_utilization` api - Verify that the counts are increasing Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
40a424b5ea
commit
105a15c5c2
7 changed files with 80 additions and 260 deletions
|
@ -16,7 +16,6 @@ import {
|
|||
secondsFromDate,
|
||||
asInterval,
|
||||
maxIntervalFromDate,
|
||||
parseIntervalAsMinute,
|
||||
} from './intervals';
|
||||
|
||||
let fakeTimer: sinon.SinonFakeTimers;
|
||||
|
@ -66,44 +65,6 @@ describe('taskIntervals', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('parseIntervalAsMinute', () => {
|
||||
test('it accepts intervals in the form `Nm`', () => {
|
||||
expect(() => parseIntervalAsMinute(`${_.random(1, 1000)}m`)).not.toThrow();
|
||||
});
|
||||
|
||||
test('it accepts intervals in the form `Ns`', () => {
|
||||
expect(() => parseIntervalAsMinute(`${_.random(1, 1000)}s`)).not.toThrow();
|
||||
});
|
||||
|
||||
test('it rejects 0 based intervals', () => {
|
||||
expect(() => parseIntervalAsMinute('0m')).toThrow(
|
||||
/Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/
|
||||
);
|
||||
expect(() => parseIntervalAsMinute('0s')).toThrow(
|
||||
/Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/
|
||||
);
|
||||
});
|
||||
|
||||
test('it rejects intervals are not of the form `Nm` or `Ns`', () => {
|
||||
expect(() => parseIntervalAsMinute(`5m 2s`)).toThrow(
|
||||
/Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/
|
||||
);
|
||||
expect(() => parseIntervalAsMinute(`hello`)).toThrow(
|
||||
/Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/
|
||||
);
|
||||
});
|
||||
|
||||
test('returns an interval as m', () => {
|
||||
expect(parseIntervalAsMinute('5s')).toEqual(5 / 60);
|
||||
expect(parseIntervalAsMinute('15s')).toEqual(15 / 60);
|
||||
expect(parseIntervalAsMinute('20m')).toEqual(20);
|
||||
expect(parseIntervalAsMinute('61m')).toEqual(61);
|
||||
expect(parseIntervalAsMinute('90m')).toEqual(90);
|
||||
expect(parseIntervalAsMinute('2h')).toEqual(2 * 60);
|
||||
expect(parseIntervalAsMinute('9d')).toEqual(9 * 60 * 24);
|
||||
});
|
||||
});
|
||||
|
||||
describe('parseIntervalAsMillisecond', () => {
|
||||
test('it accepts intervals in the form `Nm`', () => {
|
||||
expect(() => parseIntervalAsMillisecond(`${_.random(1, 1000)}m`)).not.toThrow();
|
||||
|
|
|
@ -114,10 +114,6 @@ export const parseIntervalAsSecond = memoize((interval: Interval): number => {
|
|||
return Math.round(parseIntervalAsMillisecond(interval) / 1000);
|
||||
});
|
||||
|
||||
export const parseIntervalAsMinute = memoize((interval: Interval): number => {
|
||||
return parseIntervalAsMillisecond(interval) / (1000 * 60);
|
||||
});
|
||||
|
||||
export const parseIntervalAsMillisecond = memoize((interval: Interval): number => {
|
||||
const numericAsStr: string = interval.slice(0, -1);
|
||||
const numeric: number = parseInt(numericAsStr, 10);
|
||||
|
|
|
@ -18,10 +18,7 @@ import { taskPollingLifecycleMock } from '../polling_lifecycle.mock';
|
|||
import {
|
||||
BackgroundTaskUtilizationStat,
|
||||
createBackgroundTaskUtilizationAggregator,
|
||||
SummarizedBackgroundTaskUtilizationStat,
|
||||
summarizeUtilizationStat,
|
||||
} from './background_task_utilization_statistics';
|
||||
import { parseIntervalAsMinute } from '../lib/intervals';
|
||||
import { AdHocTaskCounter } from '../lib/adhoc_task_counter';
|
||||
import { sum } from 'lodash';
|
||||
|
||||
|
@ -49,7 +46,7 @@ describe('Task Run Statistics', () => {
|
|||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.adhoc.ran.service_time.actual).toEqual(sum(window));
|
||||
|
@ -68,20 +65,20 @@ describe('Task Run Statistics', () => {
|
|||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
value,
|
||||
})),
|
||||
take(serviceTimes.length),
|
||||
bufferCount(serviceTimes.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1));
|
||||
expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2));
|
||||
expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3));
|
||||
expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4));
|
||||
expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(1, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(2, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(3, 8));
|
||||
expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(0, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(0, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(0, 8));
|
||||
resolve();
|
||||
});
|
||||
|
||||
|
@ -108,7 +105,7 @@ describe('Task Run Statistics', () => {
|
|||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.adhoc.ran.service_time.adjusted).toEqual(sum(window));
|
||||
|
@ -127,20 +124,20 @@ describe('Task Run Statistics', () => {
|
|||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
value,
|
||||
})),
|
||||
take(serviceTimes.length),
|
||||
bufferCount(serviceTimes.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3));
|
||||
expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3));
|
||||
expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3));
|
||||
expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3));
|
||||
expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(1, 6), 3));
|
||||
expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(2, 7), 3));
|
||||
expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(3, 8), 3));
|
||||
expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(0, 6), 3));
|
||||
expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(0, 7), 3));
|
||||
expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(0, 8), 3));
|
||||
resolve();
|
||||
});
|
||||
|
||||
|
@ -167,7 +164,7 @@ describe('Task Run Statistics', () => {
|
|||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.adhoc.ran.service_time.task_counter).toEqual(window.length);
|
||||
|
@ -181,20 +178,20 @@ describe('Task Run Statistics', () => {
|
|||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
value,
|
||||
})),
|
||||
take(tasks.length),
|
||||
bufferCount(tasks.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1));
|
||||
expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2));
|
||||
expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3));
|
||||
expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4));
|
||||
expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], tasks.slice(1, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], tasks.slice(2, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], tasks.slice(3, 8));
|
||||
expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8));
|
||||
resolve();
|
||||
});
|
||||
|
||||
|
@ -221,7 +218,7 @@ describe('Task Run Statistics', () => {
|
|||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.adhoc.created.counter).toEqual(sum(window));
|
||||
|
@ -235,20 +232,20 @@ describe('Task Run Statistics', () => {
|
|||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
value,
|
||||
})),
|
||||
take(tasks.length),
|
||||
bufferCount(tasks.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1));
|
||||
expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2));
|
||||
expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3));
|
||||
expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4));
|
||||
expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], tasks.slice(1, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], tasks.slice(2, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], tasks.slice(3, 8));
|
||||
expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8));
|
||||
resolve();
|
||||
});
|
||||
|
||||
|
@ -276,7 +273,7 @@ describe('Task Run Statistics', () => {
|
|||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.recurring.ran.service_time.actual).toEqual(sum(window));
|
||||
|
@ -295,20 +292,20 @@ describe('Task Run Statistics', () => {
|
|||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
value,
|
||||
})),
|
||||
take(serviceTimes.length),
|
||||
bufferCount(serviceTimes.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1));
|
||||
expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2));
|
||||
expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3));
|
||||
expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4));
|
||||
expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(1, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(2, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(3, 8));
|
||||
expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(0, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(0, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(0, 8));
|
||||
resolve();
|
||||
});
|
||||
|
||||
|
@ -335,7 +332,7 @@ describe('Task Run Statistics', () => {
|
|||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.recurring.ran.service_time.adjusted).toEqual(sum(window));
|
||||
|
@ -354,20 +351,20 @@ describe('Task Run Statistics', () => {
|
|||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
value,
|
||||
})),
|
||||
take(serviceTimes.length),
|
||||
bufferCount(serviceTimes.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3));
|
||||
expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3));
|
||||
expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3));
|
||||
expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3));
|
||||
expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(1, 6), 3));
|
||||
expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(2, 7), 3));
|
||||
expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(3, 8), 3));
|
||||
expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(0, 6), 3));
|
||||
expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(0, 7), 3));
|
||||
expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(0, 8), 3));
|
||||
resolve();
|
||||
});
|
||||
|
||||
|
@ -394,7 +391,7 @@ describe('Task Run Statistics', () => {
|
|||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.recurring.ran.service_time.task_counter).toEqual(window.length);
|
||||
|
@ -408,20 +405,20 @@ describe('Task Run Statistics', () => {
|
|||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
value,
|
||||
})),
|
||||
take(tasks.length),
|
||||
bufferCount(tasks.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1));
|
||||
expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2));
|
||||
expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3));
|
||||
expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4));
|
||||
expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], tasks.slice(1, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], tasks.slice(2, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], tasks.slice(3, 8));
|
||||
expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6));
|
||||
expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7));
|
||||
expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8));
|
||||
resolve();
|
||||
});
|
||||
|
||||
|
@ -432,60 +429,6 @@ describe('Task Run Statistics', () => {
|
|||
}
|
||||
});
|
||||
});
|
||||
|
||||
test('returns a running count of recurring tasks_per_min', async () => {
|
||||
const intervals = ['1h', '5m', '2h', '30m', '10m', '1m', '5h', '120m'];
|
||||
const events$ = new Subject<TaskLifecycleEvent>();
|
||||
const taskPollingLifecycle = taskPollingLifecycleMock.create({
|
||||
events$: events$ as Observable<TaskLifecycleEvent>,
|
||||
});
|
||||
const adHocTaskCounter = new AdHocTaskCounter();
|
||||
|
||||
const runningAverageWindowSize = 5;
|
||||
const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator(
|
||||
taskPollingLifecycle,
|
||||
runningAverageWindowSize,
|
||||
adHocTaskCounter,
|
||||
pollInterval
|
||||
);
|
||||
|
||||
function expectWindowEqualsUpdate(
|
||||
taskStat: AggregatedStat<SummarizedBackgroundTaskUtilizationStat>,
|
||||
window: number[]
|
||||
) {
|
||||
expect(taskStat.value.recurring.tasks_per_min).toEqual(sum(window));
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
BackgroundTaskUtilizationAggregator.pipe(
|
||||
// skip initial stat which is just initialized data which
|
||||
// ensures we don't stall on combineLatest
|
||||
skip(1),
|
||||
// Use 'summarizeUtilizationStat' to receive summarize stats
|
||||
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
|
||||
key,
|
||||
value: summarizeUtilizationStat(value).value,
|
||||
})),
|
||||
take(intervals.length),
|
||||
bufferCount(intervals.length)
|
||||
).subscribe((taskStats: Array<AggregatedStat<SummarizedBackgroundTaskUtilizationStat>>) => {
|
||||
expectWindowEqualsUpdate(taskStats[0], mapInterval(intervals.slice(0, 1)));
|
||||
expectWindowEqualsUpdate(taskStats[1], mapInterval(intervals.slice(0, 2)));
|
||||
expectWindowEqualsUpdate(taskStats[2], mapInterval(intervals.slice(0, 3)));
|
||||
expectWindowEqualsUpdate(taskStats[3], mapInterval(intervals.slice(0, 4)));
|
||||
expectWindowEqualsUpdate(taskStats[4], mapInterval(intervals.slice(0, 5)));
|
||||
// from the 6th value, begin to drop old values as out window is 5
|
||||
expectWindowEqualsUpdate(taskStats[5], mapInterval(intervals.slice(1, 6)));
|
||||
expectWindowEqualsUpdate(taskStats[6], mapInterval(intervals.slice(2, 7)));
|
||||
expectWindowEqualsUpdate(taskStats[7], mapInterval(intervals.slice(3, 8)));
|
||||
resolve();
|
||||
});
|
||||
|
||||
for (const i of intervals) {
|
||||
events$.next(mockTaskRunEvent({ schedule: { interval: i } }, { start: 0, stop: 0 }));
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function runAtMillisecondsAgo(now: number, ms: number): Date {
|
||||
|
@ -497,13 +440,6 @@ function roundUpToNearestSec(duration: number[], s: number): number[] {
|
|||
return duration.map((d) => Math.ceil(d / pollInterval) * pollInterval);
|
||||
}
|
||||
|
||||
function mapInterval(intervals: string[]): number[] {
|
||||
return intervals.map((i) => {
|
||||
const interval = parseIntervalAsMinute(i);
|
||||
return 1 / interval;
|
||||
});
|
||||
}
|
||||
|
||||
const mockTaskRunEvent = (
|
||||
overrides: Partial<ConcreteTaskInstance> = {},
|
||||
timing: TaskTiming,
|
||||
|
|
|
@ -9,62 +9,31 @@ import { JsonObject } from '@kbn/utility-types';
|
|||
import { get } from 'lodash';
|
||||
import { combineLatest, filter, map, Observable, startWith } from 'rxjs';
|
||||
import { AdHocTaskCounter } from '../lib/adhoc_task_counter';
|
||||
import { parseIntervalAsMinute } from '../lib/intervals';
|
||||
import { unwrap } from '../lib/result_type';
|
||||
import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle';
|
||||
import { ConcreteTaskInstance } from '../task';
|
||||
import { isTaskRunEvent, TaskRun, TaskTiming } from '../task_events';
|
||||
import { MonitoredStat } from './monitoring_stats_stream';
|
||||
import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator';
|
||||
import { createRunningAveragedStat } from './task_run_calcultors';
|
||||
|
||||
export interface BackgroundTaskUtilizationStat extends JsonObject {
|
||||
adhoc: AdhocTaskStat;
|
||||
recurring: RecurringTaskStat;
|
||||
recurring: TaskStat;
|
||||
}
|
||||
|
||||
interface TaskStat extends JsonObject {
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: number[]; // total service time for running recurring tasks
|
||||
adjusted: number[]; // total service time adjusted for polling interval
|
||||
task_counter: number[]; // recurring tasks counter, only increases for the lifetime of the process
|
||||
actual: number; // total service time for running recurring tasks
|
||||
adjusted: number; // total service time adjusted for polling interval
|
||||
task_counter: number; // recurring tasks counter, only increases for the lifetime of the process
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
interface AdhocTaskStat extends TaskStat {
|
||||
created: {
|
||||
counter: number[]; // counter for number of ad hoc tasks created
|
||||
};
|
||||
}
|
||||
|
||||
interface RecurringTaskStat extends TaskStat {
|
||||
tasks_per_min: number[];
|
||||
}
|
||||
|
||||
export interface SummarizedBackgroundTaskUtilizationStat extends JsonObject {
|
||||
adhoc: {
|
||||
created: {
|
||||
counter: number;
|
||||
};
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: number;
|
||||
adjusted: number;
|
||||
task_counter: number;
|
||||
};
|
||||
};
|
||||
};
|
||||
recurring: {
|
||||
tasks_per_min: number;
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: number;
|
||||
adjusted: number;
|
||||
task_counter: number;
|
||||
};
|
||||
};
|
||||
counter: number; // counter for number of ad hoc tasks created
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -107,13 +76,13 @@ export function createBackgroundTaskUtilizationAggregator(
|
|||
startWith({
|
||||
adhoc: {
|
||||
created: {
|
||||
counter: [],
|
||||
counter: 0,
|
||||
},
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: [],
|
||||
adjusted: [],
|
||||
task_counter: [],
|
||||
actual: 0,
|
||||
adjusted: 0,
|
||||
task_counter: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -122,12 +91,11 @@ export function createBackgroundTaskUtilizationAggregator(
|
|||
taskRunRecurringEvents$.pipe(
|
||||
startWith({
|
||||
recurring: {
|
||||
tasks_per_min: [],
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: [],
|
||||
adjusted: [],
|
||||
task_counter: [],
|
||||
actual: 0,
|
||||
adjusted: 0,
|
||||
task_counter: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -155,37 +123,6 @@ function hasTiming(taskEvent: TaskLifecycleEvent) {
|
|||
return !!taskEvent?.timing;
|
||||
}
|
||||
|
||||
export function summarizeUtilizationStat({ adhoc, recurring }: BackgroundTaskUtilizationStat): {
|
||||
value: SummarizedBackgroundTaskUtilizationStat;
|
||||
} {
|
||||
return {
|
||||
value: {
|
||||
adhoc: {
|
||||
created: {
|
||||
counter: calculateSum(adhoc.created.counter),
|
||||
},
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: calculateSum(adhoc.ran.service_time.actual),
|
||||
adjusted: calculateSum(adhoc.ran.service_time.adjusted),
|
||||
task_counter: calculateSum(adhoc.ran.service_time.task_counter),
|
||||
},
|
||||
},
|
||||
},
|
||||
recurring: {
|
||||
tasks_per_min: calculateSum(recurring.tasks_per_min),
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: calculateSum(recurring.ran.service_time.actual),
|
||||
adjusted: calculateSum(recurring.ran.service_time.adjusted),
|
||||
task_counter: calculateSum(recurring.ran.service_time.task_counter),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function summarizeUtilizationStats({
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
last_update,
|
||||
|
@ -195,24 +132,26 @@ export function summarizeUtilizationStats({
|
|||
stats: MonitoredStat<BackgroundTaskUtilizationStat> | undefined;
|
||||
}): {
|
||||
last_update: string;
|
||||
stats: MonitoredStat<SummarizedBackgroundTaskUtilizationStat> | null;
|
||||
stats: MonitoredStat<BackgroundTaskUtilizationStat> | null;
|
||||
} {
|
||||
const utilizationStats = stats?.value;
|
||||
return {
|
||||
last_update,
|
||||
stats: stats
|
||||
? {
|
||||
timestamp: stats.timestamp,
|
||||
...summarizeUtilizationStat(stats.value),
|
||||
}
|
||||
: null,
|
||||
stats:
|
||||
stats && utilizationStats
|
||||
? {
|
||||
timestamp: stats.timestamp,
|
||||
value: utilizationStats,
|
||||
}
|
||||
: null,
|
||||
};
|
||||
}
|
||||
|
||||
function createTaskRunEventToAdhocStat(runningAverageWindowSize: number) {
|
||||
const createdCounterQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
const actualQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
const adjustedQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
const taskCounterQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
let createdCounter = 0;
|
||||
let actualCounter = 0;
|
||||
let adjustedCounter = 0;
|
||||
let taskCounter = 0;
|
||||
return (
|
||||
timing: TaskTiming,
|
||||
adHocTaskCounter: AdHocTaskCounter,
|
||||
|
@ -224,13 +163,13 @@ function createTaskRunEventToAdhocStat(runningAverageWindowSize: number) {
|
|||
return {
|
||||
adhoc: {
|
||||
created: {
|
||||
counter: createdCounterQueue(created),
|
||||
counter: (createdCounter += created),
|
||||
},
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: actualQueue(duration),
|
||||
adjusted: adjustedQueue(adjusted),
|
||||
task_counter: taskCounterQueue(1),
|
||||
actual: (actualCounter += duration),
|
||||
adjusted: (adjustedCounter += adjusted),
|
||||
task_counter: (taskCounter += 1),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -239,25 +178,22 @@ function createTaskRunEventToAdhocStat(runningAverageWindowSize: number) {
|
|||
}
|
||||
|
||||
function createTaskRunEventToRecurringStat(runningAverageWindowSize: number) {
|
||||
const tasksPerMinQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
const actualQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
const adjustedQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
const taskCounterQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
|
||||
let actualCounter = 0;
|
||||
let adjustedCounter = 0;
|
||||
let taskCounter = 0;
|
||||
return (
|
||||
timing: TaskTiming,
|
||||
task: ConcreteTaskInstance,
|
||||
pollInterval: number
|
||||
): Pick<BackgroundTaskUtilizationStat, 'recurring'> => {
|
||||
const { duration, adjusted } = getServiceTimeStats(timing, pollInterval);
|
||||
const interval = parseIntervalAsMinute(task.schedule?.interval!);
|
||||
return {
|
||||
recurring: {
|
||||
tasks_per_min: tasksPerMinQueue(1 / interval),
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: actualQueue(duration),
|
||||
adjusted: adjustedQueue(adjusted),
|
||||
task_counter: taskCounterQueue(1),
|
||||
actual: (actualCounter += duration),
|
||||
adjusted: (adjustedCounter += adjusted),
|
||||
task_counter: (taskCounter += 1),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -270,7 +206,3 @@ function getServiceTimeStats(timing: TaskTiming, pollInterval: number) {
|
|||
const adjusted = Math.ceil(duration / pollInterval) * pollInterval;
|
||||
return { duration, adjusted };
|
||||
}
|
||||
|
||||
function calculateSum(arr: number[]) {
|
||||
return arr.reduce((acc, s) => (acc += s), 0);
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import { UsageCounter } from '@kbn/usage-collection-plugin/server';
|
|||
import { MonitoringStats } from '../monitoring';
|
||||
import { TaskManagerConfig } from '../config';
|
||||
import {
|
||||
SummarizedBackgroundTaskUtilizationStat,
|
||||
BackgroundTaskUtilizationStat,
|
||||
summarizeUtilizationStats,
|
||||
} from '../monitoring/background_task_utilization_statistics';
|
||||
import { MonitoredStat } from '../monitoring/monitoring_stats_stream';
|
||||
|
@ -29,7 +29,7 @@ export interface MonitoredUtilization {
|
|||
process_uuid: string;
|
||||
timestamp: string;
|
||||
last_update: string;
|
||||
stats: MonitoredStat<SummarizedBackgroundTaskUtilizationStat> | null;
|
||||
stats: MonitoredStat<BackgroundTaskUtilizationStat> | null;
|
||||
}
|
||||
|
||||
export interface BackgroundTaskUtilRouteParams {
|
||||
|
|
|
@ -321,7 +321,6 @@ function getMockMonitoredUtilization(overrides = {}): MonitoredUtilization {
|
|||
},
|
||||
},
|
||||
recurring: {
|
||||
tasks_per_min: 2500,
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: 1000,
|
||||
|
|
|
@ -29,7 +29,6 @@ interface MonitoringStats {
|
|||
};
|
||||
};
|
||||
recurring: {
|
||||
tasks_per_min: number;
|
||||
ran: {
|
||||
service_time: {
|
||||
actual: number;
|
||||
|
@ -78,13 +77,10 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
it('should return the task manager background task utilization for recurring stats', async () => {
|
||||
const {
|
||||
value: {
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
recurring: { tasks_per_min, ran },
|
||||
recurring: { ran },
|
||||
},
|
||||
} = (await getBackgroundTaskUtilization()).stats;
|
||||
const serviceTime = ran.service_time;
|
||||
expect(typeof tasks_per_min).to.eql('number');
|
||||
|
||||
expect(typeof serviceTime.actual).to.eql('number');
|
||||
expect(typeof serviceTime.adjusted).to.eql('number');
|
||||
expect(typeof serviceTime.task_counter).to.eql('number');
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue