mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
# Backport This will backport the following commits from `main` to `9.0`: - [[Status logging] More performant throttling (#216534)](https://github.com/elastic/kibana/pull/216534) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Alejandro Fernández Haro","email":"alejandro.haro@elastic.co"},"sourceCommit":{"committedDate":"2025-04-02T19:10:47Z","message":"[Status logging] More performant throttling (#216534)","sha":"7bf76b0e7a0de7a6b01fccedd2915e36a9b49fb6","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Team:Core","performance","release_note:skip","backport:prev-minor","backport:prev-major","v9.1.0"],"title":"[Status logging] More performant throttling","number":216534,"url":"https://github.com/elastic/kibana/pull/216534","mergeCommit":{"message":"[Status logging] More performant throttling (#216534)","sha":"7bf76b0e7a0de7a6b01fccedd2915e36a9b49fb6"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/216534","number":216534,"mergeCommit":{"message":"[Status logging] More performant throttling (#216534)","sha":"7bf76b0e7a0de7a6b01fccedd2915e36a9b49fb6"}}]}] BACKPORT--> Co-authored-by: Alejandro Fernández Haro <alejandro.haro@elastic.co>
This commit is contained in:
parent
e7d58ae49b
commit
558e132557
6 changed files with 253 additions and 94 deletions
|
@ -14,8 +14,7 @@ import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
|
|||
import { type CoreStatus, ServiceStatusLevels, ServiceStatus } from '@kbn/core-status-common';
|
||||
import { logCoreStatusChanges } from './log_core_services_status';
|
||||
|
||||
const delay = async (millis: number = 10) =>
|
||||
await new Promise((resolve) => setTimeout(resolve, millis));
|
||||
const delay = async (millis: number = 10) => await jest.advanceTimersByTimeAsync(millis);
|
||||
|
||||
describe('logCoreStatusChanges', () => {
|
||||
const serviceUnavailable: ServiceStatus = {
|
||||
|
@ -33,6 +32,7 @@ describe('logCoreStatusChanges', () => {
|
|||
let l: Logger; // using short name for clarity
|
||||
|
||||
beforeEach(() => {
|
||||
jest.useFakeTimers();
|
||||
core$ = new Subject<CoreStatus>();
|
||||
stop$ = new Subject<void>();
|
||||
loggerFactory = loggingSystemMock.create();
|
||||
|
@ -40,6 +40,8 @@ describe('logCoreStatusChanges', () => {
|
|||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.useRealTimers();
|
||||
stop$.next();
|
||||
stop$.complete();
|
||||
loggingSystemMock.clear(loggerFactory);
|
||||
|
@ -130,7 +132,7 @@ describe('logCoreStatusChanges', () => {
|
|||
core$.next({ savedObjects: serviceAvailable, elasticsearch: serviceAvailable });
|
||||
|
||||
// give the 'bufferTime' operator enough time to emit and log
|
||||
await delay(20);
|
||||
await delay(1_000);
|
||||
|
||||
expect(l.get).toBeCalledWith('elasticsearch');
|
||||
expect(l.get).toBeCalledWith('savedObjects');
|
||||
|
@ -221,7 +223,7 @@ describe('logCoreStatusChanges', () => {
|
|||
});
|
||||
|
||||
// give the 'bufferTime' operator enough time to emit and log
|
||||
await delay(20);
|
||||
await delay(1_000);
|
||||
|
||||
// emit a last message (some time after)
|
||||
core$.next({
|
||||
|
|
|
@ -7,11 +7,21 @@
|
|||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import { uniq } from 'lodash';
|
||||
import { merge, type Observable, Subject, type Subscription } from 'rxjs';
|
||||
import { pairwise, takeUntil, map, startWith, bufferTime, concatAll, filter } from 'rxjs';
|
||||
import {
|
||||
merge,
|
||||
type Observable,
|
||||
Subject,
|
||||
type Subscription,
|
||||
pairwise,
|
||||
takeUntil,
|
||||
map,
|
||||
startWith,
|
||||
concatAll,
|
||||
filter,
|
||||
} from 'rxjs';
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import { type CoreStatus, ServiceStatusLevels } from '@kbn/core-status-common';
|
||||
import { createLogThrottledBuffer } from './log_throttled_buffer';
|
||||
import type { LoggableServiceStatus } from './types';
|
||||
|
||||
// let services log up to 3 status changes every 30s (extra messages will be throttled / aggregated)
|
||||
|
@ -36,46 +46,13 @@ export const logCoreStatusChanges = ({
|
|||
throttleIntervalMillis = THROTTLE_INTERVAL_MILLIS,
|
||||
maxThrottledMessages = MAX_THROTTLED_MESSAGES,
|
||||
}: LogCoreStatusChangesParams): Subscription => {
|
||||
const buffer = new Subject<LoggableServiceStatus>();
|
||||
const throttled$: Observable<LoggableServiceStatus | string> = buffer.asObservable().pipe(
|
||||
takeUntil(stop$),
|
||||
bufferTime(maxMessagesPerServicePerInterval),
|
||||
map((statuses) => {
|
||||
const aggregated = // aggregate repeated messages, and count nbr. of repetitions
|
||||
statuses.filter((candidateStatus, index) => {
|
||||
const firstMessageIndex = statuses.findIndex(
|
||||
(status) =>
|
||||
candidateStatus.name === status.name &&
|
||||
candidateStatus.level === status.level &&
|
||||
candidateStatus.summary === status.summary
|
||||
);
|
||||
if (index !== firstMessageIndex) {
|
||||
// this is not the first time this message is logged, increase 'repeats' counter for the first occurrence
|
||||
statuses[firstMessageIndex].repeats = (statuses[firstMessageIndex].repeats ?? 1) + 1;
|
||||
return false;
|
||||
} else {
|
||||
// this is the first time this message is logged, let it through
|
||||
return true;
|
||||
}
|
||||
});
|
||||
const buffer$ = new Subject<LoggableServiceStatus>();
|
||||
|
||||
if (aggregated.length > maxThrottledMessages) {
|
||||
const list: string = uniq(
|
||||
aggregated.slice(maxThrottledMessages).map(({ name }) => name)
|
||||
).join(', ');
|
||||
|
||||
return [
|
||||
...aggregated.slice(0, maxThrottledMessages),
|
||||
`${
|
||||
aggregated.length - maxThrottledMessages
|
||||
} other status updates from [${list}] have been truncated to avoid flooding the logs`,
|
||||
];
|
||||
} else {
|
||||
return aggregated;
|
||||
}
|
||||
}),
|
||||
concatAll()
|
||||
);
|
||||
const throttled$ = createLogThrottledBuffer({
|
||||
buffer$,
|
||||
stop$,
|
||||
maxThrottledMessages,
|
||||
});
|
||||
|
||||
const lastMessagesTimestamps: Record<string, number[]> = {};
|
||||
|
||||
|
@ -97,7 +74,7 @@ export const logCoreStatusChanges = ({
|
|||
|
||||
if (pluginQuota.length >= maxMessagesPerServicePerInterval) {
|
||||
// we're still over quota, throttle the message
|
||||
buffer.next(serviceStatus);
|
||||
buffer$.next(serviceStatus);
|
||||
return false;
|
||||
} else {
|
||||
// let the message pass through
|
||||
|
|
|
@ -15,8 +15,7 @@ import { ServiceStatusLevels } from '@kbn/core-status-common';
|
|||
import { logPluginsStatusChanges } from './log_plugins_status';
|
||||
import type { PluginStatus } from './types';
|
||||
|
||||
const delay = async (millis: number = 10) =>
|
||||
await new Promise((resolve) => setTimeout(resolve, millis));
|
||||
const delay = async (millis: number = 10) => await jest.advanceTimersByTimeAsync(millis);
|
||||
|
||||
describe('logPluginsStatusChanges', () => {
|
||||
const reportedUnavailable: PluginStatus = {
|
||||
|
@ -46,6 +45,7 @@ describe('logPluginsStatusChanges', () => {
|
|||
let l: Logger; // using short name for clarity
|
||||
|
||||
beforeEach(() => {
|
||||
jest.useFakeTimers();
|
||||
plugins$ = new Subject<Record<string, PluginStatus>>();
|
||||
stop$ = new Subject<void>();
|
||||
loggerFactory = loggingSystemMock.create();
|
||||
|
@ -53,6 +53,8 @@ describe('logPluginsStatusChanges', () => {
|
|||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.useRealTimers();
|
||||
stop$.next();
|
||||
stop$.complete();
|
||||
loggingSystemMock.clear(loggerFactory);
|
||||
|
@ -142,7 +144,7 @@ describe('logPluginsStatusChanges', () => {
|
|||
plugins$.next({ A: reportedAvailable, B: reportedAvailable });
|
||||
|
||||
// give the 'bufferTime' operator enough time to emit and log
|
||||
await delay(20);
|
||||
await delay(1_000);
|
||||
|
||||
expect(l.get).toBeCalledWith('A');
|
||||
expect(l.get).toBeCalledWith('B');
|
||||
|
@ -186,7 +188,7 @@ describe('logPluginsStatusChanges', () => {
|
|||
plugins$.next({ A: { ...reportedUnavailable, summary: `attempt #${++attempt}` } });
|
||||
|
||||
// give the 'bufferTime' operator enough time to emit and log
|
||||
await delay(20);
|
||||
await delay(1_000);
|
||||
|
||||
// emit a last message (some time after)
|
||||
plugins$.next({ A: { ...reportedAvailable, summary: `attempt #${++attempt}` } });
|
||||
|
|
|
@ -7,12 +7,22 @@
|
|||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import { uniq } from 'lodash';
|
||||
import { merge, type Observable, Subject, type Subscription } from 'rxjs';
|
||||
import { pairwise, takeUntil, map, startWith, bufferTime, filter, concatAll } from 'rxjs';
|
||||
import {
|
||||
merge,
|
||||
type Observable,
|
||||
Subject,
|
||||
type Subscription,
|
||||
pairwise,
|
||||
takeUntil,
|
||||
map,
|
||||
startWith,
|
||||
filter,
|
||||
concatAll,
|
||||
} from 'rxjs';
|
||||
import { Logger } from '@kbn/logging';
|
||||
import type { PluginName } from '@kbn/core-base-common';
|
||||
import { ServiceStatusLevels } from '@kbn/core-status-common';
|
||||
import { createLogThrottledBuffer } from './log_throttled_buffer';
|
||||
import type { LoggablePluginStatus, PluginStatus } from './types';
|
||||
|
||||
// let plugins log up to 3 status changes every 30s (extra messages will be throttled / aggregated)
|
||||
|
@ -37,46 +47,13 @@ export const logPluginsStatusChanges = ({
|
|||
throttleIntervalMillis = THROTTLE_INTERVAL_MILLIS,
|
||||
maxThrottledMessages = MAX_THROTTLED_MESSAGES,
|
||||
}: LogPluginsStatusChangesParams): Subscription => {
|
||||
const buffer = new Subject<LoggablePluginStatus>();
|
||||
const throttled$: Observable<LoggablePluginStatus | string> = buffer.asObservable().pipe(
|
||||
takeUntil(stop$),
|
||||
bufferTime(maxMessagesPerPluginPerInterval),
|
||||
map((statuses) => {
|
||||
const aggregated = // aggregate repeated messages, and count nbr. of repetitions
|
||||
statuses.filter((candidateStatus, index) => {
|
||||
const firstMessageIndex = statuses.findIndex(
|
||||
(status) =>
|
||||
candidateStatus.name === status.name &&
|
||||
candidateStatus.level === status.level &&
|
||||
candidateStatus.summary === status.summary
|
||||
);
|
||||
if (index !== firstMessageIndex) {
|
||||
// this is not the first time this message is logged, increase 'repeats' counter for the first occurrence
|
||||
statuses[firstMessageIndex].repeats = (statuses[firstMessageIndex].repeats ?? 1) + 1;
|
||||
return false;
|
||||
} else {
|
||||
// this is the first time this message is logged, let it through
|
||||
return true;
|
||||
}
|
||||
});
|
||||
const buffer$ = new Subject<LoggablePluginStatus>();
|
||||
|
||||
if (aggregated.length > maxThrottledMessages) {
|
||||
const list: string = uniq(
|
||||
aggregated.slice(maxThrottledMessages).map(({ name }) => name)
|
||||
).join(', ');
|
||||
|
||||
return [
|
||||
...aggregated.slice(0, maxThrottledMessages),
|
||||
`${
|
||||
aggregated.length - maxThrottledMessages
|
||||
} other status updates from [${list}] have been truncated to avoid flooding the logs`,
|
||||
];
|
||||
} else {
|
||||
return aggregated;
|
||||
}
|
||||
}),
|
||||
concatAll()
|
||||
);
|
||||
const throttled$ = createLogThrottledBuffer({
|
||||
buffer$,
|
||||
stop$,
|
||||
maxThrottledMessages,
|
||||
});
|
||||
|
||||
const lastMessagesTimestamps: Record<string, number[]> = {};
|
||||
|
||||
|
@ -98,7 +75,7 @@ export const logPluginsStatusChanges = ({
|
|||
|
||||
if (pluginQuota.length >= maxMessagesPerPluginPerInterval) {
|
||||
// we're still over quota, throttle the message
|
||||
buffer.next(pluginStatus);
|
||||
buffer$.next(pluginStatus);
|
||||
return false;
|
||||
} else {
|
||||
// let the message pass through
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import { Observable, Subject } from 'rxjs';
|
||||
import { ServiceStatusLevels } from '@kbn/core-status-common';
|
||||
import type { LoggableServiceStatus } from './types';
|
||||
import { createLogThrottledBuffer } from './log_throttled_buffer';
|
||||
|
||||
describe('createLogThrottledBuffer', () => {
|
||||
let buffer$: Subject<LoggableServiceStatus>;
|
||||
let throttled$: Observable<LoggableServiceStatus | string>;
|
||||
let loggedMessages: Array<LoggableServiceStatus | string>;
|
||||
|
||||
const stop$ = new Subject<void>();
|
||||
const bufferTimeMillis = 1000;
|
||||
const maxThrottledMessages = 10;
|
||||
|
||||
const baseStatus: LoggableServiceStatus = {
|
||||
name: 'test-service',
|
||||
level: ServiceStatusLevels.available,
|
||||
summary: 'this is a test',
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
jest.useFakeTimers();
|
||||
buffer$ = new Subject<LoggableServiceStatus>();
|
||||
loggedMessages = [];
|
||||
throttled$ = createLogThrottledBuffer({
|
||||
buffer$,
|
||||
stop$,
|
||||
bufferTimeMillis,
|
||||
maxThrottledMessages,
|
||||
});
|
||||
throttled$.subscribe((message) => {
|
||||
loggedMessages.push(message);
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
stop$.next();
|
||||
jest.clearAllTimers();
|
||||
});
|
||||
|
||||
test('returns an observable', async () => {
|
||||
// Make sure that it doesn't expose the original buffer's `.next` method
|
||||
expect(throttled$).toBeInstanceOf(Observable);
|
||||
expect(throttled$).not.toBeInstanceOf(Subject);
|
||||
expect(throttled$).not.toHaveProperty('next');
|
||||
});
|
||||
|
||||
test('buffers until at least 3 messages are provided (with a debounce)', async () => {
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
|
||||
// not logged yet since the debounce time hasn't passed
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`Array []`);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(bufferTimeMillis / 2); // Half the buffer time
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`Array []`);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(bufferTimeMillis);
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "available",
|
||||
"name": "test-service",
|
||||
"repeats": 3,
|
||||
"summary": "this is a test",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
test('no need to wait for 3 messages', async () => {
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
|
||||
// not logged yet since the debounce time hasn't passed
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`Array []`);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(bufferTimeMillis / 2); // Half the buffer time
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`Array []`);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(bufferTimeMillis);
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "available",
|
||||
"name": "test-service",
|
||||
"repeats": 2,
|
||||
"summary": "this is a test",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
test('buffers all the messages received during the interval time', async () => {
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
|
||||
// not logged yet since the debounce time hasn't passed
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`Array []`);
|
||||
|
||||
await jest.advanceTimersByTimeAsync(bufferTimeMillis / 2); // Half the buffer time
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`Array []`);
|
||||
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
buffer$.next({ ...baseStatus });
|
||||
|
||||
await jest.advanceTimersByTimeAsync(bufferTimeMillis);
|
||||
expect(loggedMessages).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
Object {
|
||||
"level": "available",
|
||||
"name": "test-service",
|
||||
"repeats": 9,
|
||||
"summary": "this is a test",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import { concatAll, map, type Observable, type Subject, takeUntil, bufferTime } from 'rxjs';
|
||||
import type { LoggableServiceStatus } from './types';
|
||||
|
||||
export interface CreateLogThrottledBufferOptions<LoggableStatus extends LoggableServiceStatus> {
|
||||
buffer$: Subject<LoggableStatus>;
|
||||
stop$: Observable<void>;
|
||||
bufferTimeMillis?: number;
|
||||
maxThrottledMessages: number;
|
||||
}
|
||||
|
||||
export function createLogThrottledBuffer<LoggableStatus extends LoggableServiceStatus>({
|
||||
buffer$,
|
||||
stop$,
|
||||
maxThrottledMessages,
|
||||
bufferTimeMillis = 1_000,
|
||||
}: CreateLogThrottledBufferOptions<LoggableStatus>): Observable<LoggableStatus | string> {
|
||||
const throttled$: Observable<LoggableStatus | string> = buffer$.asObservable().pipe(
|
||||
takeUntil(stop$),
|
||||
bufferTime(bufferTimeMillis),
|
||||
map((statuses) => {
|
||||
const aggregated = // aggregate repeated messages, and count nbr. of repetitions
|
||||
statuses.filter((candidateStatus, index) => {
|
||||
const firstMessageIndex = statuses.findIndex(
|
||||
(status) =>
|
||||
candidateStatus.name === status.name &&
|
||||
candidateStatus.level === status.level &&
|
||||
candidateStatus.summary === status.summary
|
||||
);
|
||||
if (index !== firstMessageIndex) {
|
||||
// this is not the first time this message is logged, increase 'repeats' counter for the first occurrence
|
||||
statuses[firstMessageIndex].repeats = (statuses[firstMessageIndex].repeats ?? 1) + 1;
|
||||
return false;
|
||||
} else {
|
||||
// this is the first time this message is logged, let it through
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
if (aggregated.length > maxThrottledMessages) {
|
||||
const list: string = [
|
||||
...new Set(aggregated.slice(maxThrottledMessages).map(({ name }) => name)),
|
||||
].join(', ');
|
||||
|
||||
return [
|
||||
...aggregated.slice(0, maxThrottledMessages),
|
||||
`${
|
||||
aggregated.length - maxThrottledMessages
|
||||
} other status updates from [${list}] have been truncated to avoid flooding the logs`,
|
||||
];
|
||||
} else {
|
||||
return aggregated;
|
||||
}
|
||||
}),
|
||||
concatAll()
|
||||
);
|
||||
|
||||
return throttled$;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue