mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
Rewrite plugin_status logic limiting usage of Observables (reducing heap size) (#128324)
* WIP * Fix behavior when no plugins are defined * Remove unused import, reduce debounce times * Fix startup behavior * Misc improvements following PR comments * Fix plugin_status UTs * Code cleanup + enhancements * Remove fixed FIXME
This commit is contained in:
parent
52315e3ec9
commit
a645545178
5 changed files with 384 additions and 155 deletions
50
src/core/server/status/cached_plugins_status.ts
Normal file
50
src/core/server/status/cached_plugins_status.ts
Normal file
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { Observable } from 'rxjs';
|
||||
|
||||
import { type PluginName } from '../plugins';
|
||||
import { type ServiceStatus } from './types';
|
||||
|
||||
import { type Deps, PluginsStatusService as BasePluginsStatusService } from './plugins_status';
|
||||
|
||||
export class PluginsStatusService extends BasePluginsStatusService {
|
||||
private all$?: Observable<Record<PluginName, ServiceStatus>>;
|
||||
private dependenciesStatuses$: Record<PluginName, Observable<Record<PluginName, ServiceStatus>>>;
|
||||
private derivedStatuses$: Record<PluginName, Observable<ServiceStatus>>;
|
||||
|
||||
constructor(deps: Deps) {
|
||||
super(deps);
|
||||
this.dependenciesStatuses$ = {};
|
||||
this.derivedStatuses$ = {};
|
||||
}
|
||||
|
||||
public getAll$(): Observable<Record<PluginName, ServiceStatus>> {
|
||||
if (!this.all$) {
|
||||
this.all$ = super.getAll$();
|
||||
}
|
||||
|
||||
return this.all$;
|
||||
}
|
||||
|
||||
public getDependenciesStatus$(plugin: PluginName): Observable<Record<PluginName, ServiceStatus>> {
|
||||
if (!this.dependenciesStatuses$[plugin]) {
|
||||
this.dependenciesStatuses$[plugin] = super.getDependenciesStatus$(plugin);
|
||||
}
|
||||
|
||||
return this.dependenciesStatuses$[plugin];
|
||||
}
|
||||
|
||||
public getDerivedStatus$(plugin: PluginName): Observable<ServiceStatus> {
|
||||
if (!this.derivedStatuses$[plugin]) {
|
||||
this.derivedStatuses$[plugin] = super.getDerivedStatus$(plugin);
|
||||
}
|
||||
|
||||
return this.derivedStatuses$[plugin];
|
||||
}
|
||||
}
|
|
@ -10,7 +10,7 @@ import { PluginName } from '../plugins';
|
|||
import { PluginsStatusService } from './plugins_status';
|
||||
import { of, Observable, BehaviorSubject, ReplaySubject } from 'rxjs';
|
||||
import { ServiceStatusLevels, CoreStatus, ServiceStatus } from './types';
|
||||
import { first } from 'rxjs/operators';
|
||||
import { first, skip } from 'rxjs/operators';
|
||||
import { ServiceStatusLevelSnapshotSerializer } from './test_utils';
|
||||
|
||||
expect.addSnapshotSerializer(ServiceStatusLevelSnapshotSerializer);
|
||||
|
@ -215,7 +215,7 @@ describe('PluginStatusService', () => {
|
|||
service.set('a', of({ level: ServiceStatusLevels.available, summary: 'a status' }));
|
||||
|
||||
expect(await service.getAll$().pipe(first()).toPromise()).toEqual({
|
||||
a: { level: ServiceStatusLevels.available, summary: 'a status' }, // a is available depsite savedObjects being degraded
|
||||
a: { level: ServiceStatusLevels.available, summary: 'a status' }, // a is available despite savedObjects being degraded
|
||||
b: {
|
||||
level: ServiceStatusLevels.degraded,
|
||||
summary: '1 service is degraded: savedObjects',
|
||||
|
@ -239,6 +239,10 @@ describe('PluginStatusService', () => {
|
|||
const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
|
||||
const subscription = service
|
||||
.getAll$()
|
||||
// If we subscribe to the $getAll() Observable BEFORE setting a custom status Observable
|
||||
// for a given plugin ('a' in this test), then the first emission will happen
|
||||
// right after core$ services Observable emits
|
||||
.pipe(skip(1))
|
||||
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));
|
||||
|
||||
service.set('a', of({ level: ServiceStatusLevels.degraded, summary: 'a degraded' }));
|
||||
|
@ -261,6 +265,8 @@ describe('PluginStatusService', () => {
|
|||
const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
|
||||
const subscription = service
|
||||
.getAll$()
|
||||
// the first emission happens right after core services emit (see explanation above)
|
||||
.pipe(skip(1))
|
||||
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));
|
||||
|
||||
const aStatus$ = new BehaviorSubject<ServiceStatus>({
|
||||
|
@ -280,19 +286,21 @@ describe('PluginStatusService', () => {
|
|||
});
|
||||
|
||||
it('emits an unavailable status if first emission times out, then continues future emissions', async () => {
|
||||
jest.useFakeTimers();
|
||||
const service = new PluginsStatusService({
|
||||
core$: coreAllAvailable$,
|
||||
pluginDependencies: new Map([
|
||||
['a', []],
|
||||
['b', ['a']],
|
||||
]),
|
||||
});
|
||||
const service = new PluginsStatusService(
|
||||
{
|
||||
core$: coreAllAvailable$,
|
||||
pluginDependencies: new Map([
|
||||
['a', []],
|
||||
['b', ['a']],
|
||||
]),
|
||||
},
|
||||
10 // set a small timeout so that the registered status Observable for 'a' times out quickly
|
||||
);
|
||||
|
||||
const pluginA$ = new ReplaySubject<ServiceStatus>(1);
|
||||
service.set('a', pluginA$);
|
||||
const firstEmission = service.getAll$().pipe(first()).toPromise();
|
||||
jest.runAllTimers();
|
||||
// the first emission happens right after core$ services emit
|
||||
const firstEmission = service.getAll$().pipe(skip(1), first()).toPromise();
|
||||
|
||||
expect(await firstEmission).toEqual({
|
||||
a: { level: ServiceStatusLevels.unavailable, summary: 'Status check timed out after 30s' },
|
||||
|
@ -308,16 +316,16 @@ describe('PluginStatusService', () => {
|
|||
|
||||
pluginA$.next({ level: ServiceStatusLevels.available, summary: 'a available' });
|
||||
const secondEmission = service.getAll$().pipe(first()).toPromise();
|
||||
jest.runAllTimers();
|
||||
expect(await secondEmission).toEqual({
|
||||
a: { level: ServiceStatusLevels.available, summary: 'a available' },
|
||||
b: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
|
||||
});
|
||||
jest.useRealTimers();
|
||||
});
|
||||
});
|
||||
|
||||
describe('getDependenciesStatus$', () => {
|
||||
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
it('only includes dependencies of specified plugin', async () => {
|
||||
const service = new PluginsStatusService({
|
||||
core$: coreAllAvailable$,
|
||||
|
@ -357,7 +365,7 @@ describe('PluginStatusService', () => {
|
|||
|
||||
it('debounces plugins custom status registration', async () => {
|
||||
const service = new PluginsStatusService({
|
||||
core$: coreAllAvailable$,
|
||||
core$: coreOneCriticalOneDegraded$,
|
||||
pluginDependencies,
|
||||
});
|
||||
const available: ServiceStatus = {
|
||||
|
@ -375,8 +383,6 @@ describe('PluginStatusService', () => {
|
|||
|
||||
expect(statusUpdates).toStrictEqual([]);
|
||||
|
||||
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
// Waiting for the debounce timeout should cut a new update
|
||||
await delay(25);
|
||||
subscription.unsubscribe();
|
||||
|
@ -404,7 +410,6 @@ describe('PluginStatusService', () => {
|
|||
const subscription = service
|
||||
.getDependenciesStatus$('b')
|
||||
.subscribe((status) => statusUpdates.push(status));
|
||||
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
pluginA$.next(degraded);
|
||||
pluginA$.next(available);
|
||||
|
|
|
@ -5,166 +5,338 @@
|
|||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { BehaviorSubject, Observable, combineLatest, of } from 'rxjs';
|
||||
import { BehaviorSubject, Observable, ReplaySubject, Subscription } from 'rxjs';
|
||||
import {
|
||||
map,
|
||||
distinctUntilChanged,
|
||||
switchMap,
|
||||
filter,
|
||||
debounceTime,
|
||||
timeoutWith,
|
||||
startWith,
|
||||
} from 'rxjs/operators';
|
||||
import { sortBy } from 'lodash';
|
||||
import { isDeepStrictEqual } from 'util';
|
||||
|
||||
import { PluginName } from '../plugins';
|
||||
import { ServiceStatus, CoreStatus, ServiceStatusLevels } from './types';
|
||||
import { type PluginName } from '../plugins';
|
||||
import { type ServiceStatus, type CoreStatus, ServiceStatusLevels } from './types';
|
||||
import { getSummaryStatus } from './get_summary_status';
|
||||
|
||||
const STATUS_TIMEOUT_MS = 30 * 1000; // 30 seconds
|
||||
|
||||
interface Deps {
|
||||
const defaultStatus: ServiceStatus = {
|
||||
level: ServiceStatusLevels.unavailable,
|
||||
summary: `Status check timed out after ${STATUS_TIMEOUT_MS / 1000}s`,
|
||||
};
|
||||
|
||||
export interface Deps {
|
||||
core$: Observable<CoreStatus>;
|
||||
pluginDependencies: ReadonlyMap<PluginName, PluginName[]>;
|
||||
}
|
||||
|
||||
interface PluginData {
|
||||
[name: PluginName]: {
|
||||
name: PluginName;
|
||||
depth: number; // depth of this plugin in the dependency tree (root plugins will have depth = 1)
|
||||
dependencies: PluginName[];
|
||||
reverseDependencies: PluginName[];
|
||||
reportedStatus?: ServiceStatus;
|
||||
derivedStatus: ServiceStatus;
|
||||
};
|
||||
}
|
||||
interface PluginStatus {
|
||||
[name: PluginName]: ServiceStatus;
|
||||
}
|
||||
|
||||
interface ReportedStatusSubscriptions {
|
||||
[name: PluginName]: Subscription;
|
||||
}
|
||||
|
||||
export class PluginsStatusService {
|
||||
private readonly pluginStatuses = new Map<PluginName, Observable<ServiceStatus>>();
|
||||
private readonly derivedStatuses = new Map<PluginName, Observable<ServiceStatus>>();
|
||||
private readonly dependenciesStatuses = new Map<
|
||||
PluginName,
|
||||
Observable<Record<PluginName, ServiceStatus>>
|
||||
>();
|
||||
private allPluginsStatuses?: Observable<Record<PluginName, ServiceStatus>>;
|
||||
|
||||
private readonly update$ = new BehaviorSubject(true);
|
||||
private readonly defaultInheritedStatus$: Observable<ServiceStatus>;
|
||||
private coreStatus: CoreStatus = { elasticsearch: defaultStatus, savedObjects: defaultStatus };
|
||||
private pluginData: PluginData;
|
||||
private rootPlugins: PluginName[]; // root plugins are those that do not have any dependencies
|
||||
private orderedPluginNames: PluginName[];
|
||||
private pluginData$ = new ReplaySubject<PluginData>(1);
|
||||
private pluginStatus: PluginStatus = {};
|
||||
private pluginStatus$ = new BehaviorSubject<PluginStatus>(this.pluginStatus);
|
||||
private reportedStatusSubscriptions: ReportedStatusSubscriptions = {};
|
||||
private isReportingStatus: Record<PluginName, boolean> = {};
|
||||
private newRegistrationsAllowed = true;
|
||||
private coreSubscription: Subscription;
|
||||
|
||||
constructor(private readonly deps: Deps) {
|
||||
this.defaultInheritedStatus$ = this.deps.core$.pipe(
|
||||
map((coreStatus) => {
|
||||
return getSummaryStatus(Object.entries(coreStatus), {
|
||||
allAvailableSummary: `All dependencies are available`,
|
||||
});
|
||||
})
|
||||
);
|
||||
constructor(deps: Deps, private readonly statusTimeoutMs: number = STATUS_TIMEOUT_MS) {
|
||||
this.pluginData = this.initPluginData(deps.pluginDependencies);
|
||||
this.rootPlugins = this.getRootPlugins();
|
||||
this.orderedPluginNames = this.getOrderedPluginNames();
|
||||
|
||||
this.coreSubscription = deps.core$
|
||||
.pipe(debounceTime(10))
|
||||
.subscribe((coreStatus: CoreStatus) => this.updateCoreAndPluginStatuses(coreStatus));
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a status Observable for a specific plugin
|
||||
* @param {PluginName} plugin The name of the plugin
|
||||
* @param {Observable<ServiceStatus>} status$ An external Observable that must be trusted as the source of truth for the status of the plugin
|
||||
* @throws An error if the status registrations are not allowed
|
||||
*/
|
||||
public set(plugin: PluginName, status$: Observable<ServiceStatus>) {
|
||||
if (!this.newRegistrationsAllowed) {
|
||||
throw new Error(
|
||||
`Custom statuses cannot be registered after setup, plugin [${plugin}] attempted`
|
||||
);
|
||||
}
|
||||
this.pluginStatuses.set(plugin, status$);
|
||||
this.update$.next(true); // trigger all existing Observables to update from the new source Observable
|
||||
|
||||
this.isReportingStatus[plugin] = true;
|
||||
// unsubscribe from any previous subscriptions. Ideally plugins should register a status Observable only once
|
||||
this.reportedStatusSubscriptions[plugin]?.unsubscribe();
|
||||
|
||||
// delete any derived statuses calculated before the custom status Observable was registered
|
||||
delete this.pluginStatus[plugin];
|
||||
|
||||
this.reportedStatusSubscriptions[plugin] = status$
|
||||
// Set a timeout for externally-defined status Observables
|
||||
.pipe(timeoutWith(this.statusTimeoutMs, status$.pipe(startWith(defaultStatus))))
|
||||
.subscribe((status) => this.updatePluginReportedStatus(plugin, status));
|
||||
}
|
||||
|
||||
/**
|
||||
* Prevent plugins from registering status Observables
|
||||
*/
|
||||
public blockNewRegistrations() {
|
||||
this.newRegistrationsAllowed = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain an Observable of the status of all the plugins
|
||||
* @returns {Observable<Record<PluginName, ServiceStatus>>} An Observable that will yield the current status of all plugins
|
||||
*/
|
||||
public getAll$(): Observable<Record<PluginName, ServiceStatus>> {
|
||||
if (!this.allPluginsStatuses) {
|
||||
this.allPluginsStatuses = this.getPluginStatuses$([...this.deps.pluginDependencies.keys()]);
|
||||
}
|
||||
return this.allPluginsStatuses;
|
||||
}
|
||||
|
||||
public getDependenciesStatus$(plugin: PluginName): Observable<Record<PluginName, ServiceStatus>> {
|
||||
const dependencies = this.deps.pluginDependencies.get(plugin);
|
||||
if (!dependencies) {
|
||||
throw new Error(`Unknown plugin: ${plugin}`);
|
||||
}
|
||||
if (!this.dependenciesStatuses.has(plugin)) {
|
||||
this.dependenciesStatuses.set(
|
||||
plugin,
|
||||
this.getPluginStatuses$(dependencies).pipe(
|
||||
// Prevent many emissions at once from dependency status resolution from making this too noisy
|
||||
debounceTime(25)
|
||||
)
|
||||
);
|
||||
}
|
||||
return this.dependenciesStatuses.get(plugin)!;
|
||||
}
|
||||
|
||||
public getDerivedStatus$(plugin: PluginName): Observable<ServiceStatus> {
|
||||
if (!this.derivedStatuses.has(plugin)) {
|
||||
this.derivedStatuses.set(
|
||||
plugin,
|
||||
this.update$.pipe(
|
||||
debounceTime(25), // Avoid calling the plugin's custom status logic for every plugin that depends on it.
|
||||
switchMap(() => {
|
||||
// Only go up the dependency tree if any of this plugin's dependencies have a custom status
|
||||
// Helps eliminate memory overhead of creating thousands of Observables unnecessarily.
|
||||
if (this.anyCustomStatuses(plugin)) {
|
||||
return combineLatest([this.deps.core$, this.getDependenciesStatus$(plugin)]).pipe(
|
||||
map(([coreStatus, pluginStatuses]) => {
|
||||
return getSummaryStatus(
|
||||
[...Object.entries(coreStatus), ...Object.entries(pluginStatuses)],
|
||||
{
|
||||
allAvailableSummary: `All dependencies are available`,
|
||||
}
|
||||
);
|
||||
})
|
||||
);
|
||||
} else {
|
||||
return this.defaultInheritedStatus$;
|
||||
}
|
||||
})
|
||||
)
|
||||
);
|
||||
}
|
||||
return this.derivedStatuses.get(plugin)!;
|
||||
}
|
||||
|
||||
private getPluginStatuses$(plugins: PluginName[]): Observable<Record<PluginName, ServiceStatus>> {
|
||||
if (plugins.length === 0) {
|
||||
return of({});
|
||||
}
|
||||
|
||||
return this.update$.pipe(
|
||||
switchMap(() => {
|
||||
const pluginStatuses = plugins
|
||||
.map((depName) => {
|
||||
const pluginStatus = this.pluginStatuses.get(depName)
|
||||
? this.pluginStatuses.get(depName)!.pipe(
|
||||
timeoutWith(
|
||||
STATUS_TIMEOUT_MS,
|
||||
this.pluginStatuses.get(depName)!.pipe(
|
||||
startWith({
|
||||
level: ServiceStatusLevels.unavailable,
|
||||
summary: `Status check timed out after ${STATUS_TIMEOUT_MS / 1000}s`,
|
||||
})
|
||||
)
|
||||
)
|
||||
)
|
||||
: this.getDerivedStatus$(depName);
|
||||
return [depName, pluginStatus] as [PluginName, Observable<ServiceStatus>];
|
||||
})
|
||||
.map(([pName, status$]) =>
|
||||
status$.pipe(map((status) => [pName, status] as [PluginName, ServiceStatus]))
|
||||
);
|
||||
|
||||
return combineLatest(pluginStatuses).pipe(
|
||||
map((statuses) => Object.fromEntries(statuses)),
|
||||
distinctUntilChanged<Record<PluginName, ServiceStatus>>(isDeepStrictEqual)
|
||||
);
|
||||
})
|
||||
return this.pluginStatus$.asObservable().pipe(
|
||||
// do not emit until we have a status for all plugins
|
||||
filter((all) => Object.keys(all).length === this.orderedPluginNames.length),
|
||||
distinctUntilChanged<Record<PluginName, ServiceStatus>>(isDeepStrictEqual)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether or not this plugin or any plugin in it's dependency tree have a custom status registered.
|
||||
* Obtain an Observable of the status of the dependencies of the given plugin
|
||||
* @param {PluginName} plugin the name of the plugin whose dependencies' status must be retreived
|
||||
* @returns {Observable<Record<PluginName, ServiceStatus>>} An Observable that will yield the current status of the plugin's dependencies
|
||||
*/
|
||||
private anyCustomStatuses(plugin: PluginName): boolean {
|
||||
if (this.pluginStatuses.get(plugin)) {
|
||||
return true;
|
||||
public getDependenciesStatus$(plugin: PluginName): Observable<Record<PluginName, ServiceStatus>> {
|
||||
const directDependencies = this.pluginData[plugin].dependencies;
|
||||
|
||||
return this.getAll$().pipe(
|
||||
map((allStatus) => {
|
||||
const dependenciesStatus: Record<PluginName, ServiceStatus> = {};
|
||||
directDependencies.forEach((dep) => (dependenciesStatus[dep] = allStatus[dep]));
|
||||
return dependenciesStatus;
|
||||
}),
|
||||
debounceTime(10)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain an Observable of the derived status of the given plugin
|
||||
* @param {PluginName} plugin the name of the plugin whose derived status must be retrieved
|
||||
* @returns {Observable<ServiceStatus>} An Observable that will yield the derived status of the plugin
|
||||
*/
|
||||
public getDerivedStatus$(plugin: PluginName): Observable<ServiceStatus> {
|
||||
return this.pluginData$.asObservable().pipe(
|
||||
map((pluginData) => pluginData[plugin]?.derivedStatus),
|
||||
filter((status: ServiceStatus | undefined): status is ServiceStatus => !!status),
|
||||
distinctUntilChanged<ServiceStatus>(isDeepStrictEqual)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook to be called at the stop lifecycle event
|
||||
*/
|
||||
public stop() {
|
||||
// Cancel all active subscriptions
|
||||
this.coreSubscription.unsubscribe();
|
||||
Object.values(this.reportedStatusSubscriptions).forEach((subscription) => {
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a convenience data structure
|
||||
* that maintain up-to-date information about the plugins and their statuses
|
||||
* @param {ReadonlyMap<PluginName, PluginName[]>} pluginDependencies Information about the different plugins and their dependencies
|
||||
* @returns {PluginData}
|
||||
*/
|
||||
private initPluginData(pluginDependencies: ReadonlyMap<PluginName, PluginName[]>): PluginData {
|
||||
const pluginData: PluginData = {};
|
||||
|
||||
if (pluginDependencies) {
|
||||
pluginDependencies.forEach((dependencies, name) => {
|
||||
pluginData[name] = {
|
||||
name,
|
||||
depth: 0,
|
||||
dependencies,
|
||||
reverseDependencies: [],
|
||||
derivedStatus: defaultStatus,
|
||||
};
|
||||
});
|
||||
|
||||
pluginDependencies.forEach((dependencies, name) => {
|
||||
dependencies.forEach((dependency) => {
|
||||
pluginData[dependency].reverseDependencies.push(name);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return this.deps.pluginDependencies
|
||||
.get(plugin)!
|
||||
.reduce((acc, depName) => acc || this.anyCustomStatuses(depName), false as boolean);
|
||||
return pluginData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a list with all the root plugins.
|
||||
* Root plugins are all those plugins that do not have any dependency.
|
||||
* @returns {PluginName[]} a list with all the root plugins present in the provided deps
|
||||
*/
|
||||
private getRootPlugins(): PluginName[] {
|
||||
return Object.keys(this.pluginData).filter(
|
||||
(plugin) => this.pluginData[plugin].dependencies.length === 0
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a list of plugins names, ordered by depth.
|
||||
* @see {calculateDepthRecursive}
|
||||
* @returns {PluginName[]} a list of plugins, ordered by depth + name
|
||||
*/
|
||||
private getOrderedPluginNames(): PluginName[] {
|
||||
this.rootPlugins.forEach((plugin) => {
|
||||
this.calculateDepthRecursive(plugin, 1);
|
||||
});
|
||||
|
||||
return sortBy(Object.values(this.pluginData), ['depth', 'name']).map(({ name }) => name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the depth of the given plugin, knowing that it's has at least the specified depth
|
||||
* The depth of a plugin is determined by how many levels of dependencies the plugin has above it.
|
||||
* We define root plugins as depth = 1, plugins that only depend on root plugins will have depth = 2
|
||||
* and so on so forth
|
||||
* @param {PluginName} plugin the name of the plugin whose depth must be calculated
|
||||
* @param {number} depth the minimum depth that we know for sure this plugin has
|
||||
*/
|
||||
private calculateDepthRecursive(plugin: PluginName, depth: number): void {
|
||||
const pluginData = this.pluginData[plugin];
|
||||
pluginData.depth = Math.max(pluginData.depth, depth);
|
||||
const newDepth = depth + 1;
|
||||
pluginData.reverseDependencies.forEach((revDep) =>
|
||||
this.calculateDepthRecursive(revDep, newDepth)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the core services statuses and plugins' statuses
|
||||
* according to the latest status reported by core services.
|
||||
* @param {CoreStatus} coreStatus the latest status of core services
|
||||
*/
|
||||
private updateCoreAndPluginStatuses(coreStatus: CoreStatus): void {
|
||||
this.coreStatus = coreStatus!;
|
||||
const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), {
|
||||
allAvailableSummary: `All dependencies are available`,
|
||||
});
|
||||
|
||||
this.rootPlugins.forEach((plugin) => {
|
||||
this.pluginData[plugin].derivedStatus = derivedStatus;
|
||||
if (!this.isReportingStatus[plugin]) {
|
||||
// this root plugin has NOT registered any status Observable. Thus, its status is derived from core
|
||||
this.pluginStatus[plugin] = derivedStatus;
|
||||
}
|
||||
});
|
||||
|
||||
this.updatePluginsStatuses(this.rootPlugins);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the derived statuses of the specified plugins and their dependencies,
|
||||
* updating them on the pluginData structure
|
||||
* Optionally, if the plugins have not registered a custom status Observable, update their "current" status as well.
|
||||
* @param {PluginName[]} plugins The names of the plugins to be updated
|
||||
*/
|
||||
private updatePluginsStatuses(plugins: PluginName[]): void {
|
||||
const toCheck = new Set<PluginName>(plugins);
|
||||
|
||||
// Note that we are updating the plugins in an ordered fashion.
|
||||
// This way, when updating plugin X (at depth = N),
|
||||
// all of its dependencies (at depth < N) have already been updated
|
||||
for (let i = 0; i < this.orderedPluginNames.length; ++i) {
|
||||
const current = this.orderedPluginNames[i];
|
||||
if (toCheck.has(current)) {
|
||||
// update the current plugin status
|
||||
this.updatePluginStatus(current);
|
||||
// flag all its reverse dependencies to be checked
|
||||
// TODO flag them only IF the status of this plugin has changed, seems to break some tests
|
||||
this.pluginData[current].reverseDependencies.forEach((revDep) => toCheck.add(revDep));
|
||||
}
|
||||
}
|
||||
|
||||
this.pluginData$.next(this.pluginData);
|
||||
this.pluginStatus$.next({ ...this.pluginStatus });
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the derived status of the specified plugin and update it on the pluginData structure
|
||||
* Optionally, if the plugin has not registered a custom status Observable, update its "current" status as well
|
||||
* @param {PluginName} plugin The name of the plugin to be updated
|
||||
*/
|
||||
private updatePluginStatus(plugin: PluginName): void {
|
||||
const newStatus = this.determinePluginStatus(plugin);
|
||||
this.pluginData[plugin].derivedStatus = newStatus;
|
||||
|
||||
if (!this.isReportingStatus[plugin]) {
|
||||
// this plugin has NOT registered any status Observable.
|
||||
// Thus, its status is derived from its dependencies + core
|
||||
this.pluginStatus[plugin] = newStatus;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deterime the current plugin status, taking into account its reported status, its derived status
|
||||
* and the status of the core services
|
||||
* @param {PluginName} plugin the name of the plugin whose status must be determined
|
||||
* @returns {ServiceStatus} The status of the plugin
|
||||
*/
|
||||
private determinePluginStatus(plugin: PluginName): ServiceStatus {
|
||||
const coreStatus: Array<[PluginName, ServiceStatus]> = Object.entries(this.coreStatus);
|
||||
const newLocal = this.pluginData[plugin];
|
||||
|
||||
let depsStatus: Array<[PluginName, ServiceStatus]> = [];
|
||||
|
||||
if (Object.keys(this.isReportingStatus).length) {
|
||||
// if at least one plugin has registered a status Observable... take into account plugin dependencies
|
||||
depsStatus = newLocal.dependencies.map((dependency) => [
|
||||
dependency,
|
||||
this.pluginData[dependency].reportedStatus || this.pluginData[dependency].derivedStatus,
|
||||
]);
|
||||
}
|
||||
|
||||
const newStatus = getSummaryStatus([...coreStatus, ...depsStatus], {
|
||||
allAvailableSummary: `All dependencies are available`,
|
||||
});
|
||||
|
||||
return newStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the reported status for the given plugin, along with the status of its dependencies tree.
|
||||
* @param {PluginName} plugin The name of the plugin whose reported status must be updated
|
||||
* @param {ServiceStatus} reportedStatus The newly reported status for that plugin
|
||||
*/
|
||||
private updatePluginReportedStatus(plugin: PluginName, reportedStatus: ServiceStatus): void {
|
||||
const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level;
|
||||
|
||||
this.pluginData[plugin].reportedStatus = reportedStatus;
|
||||
this.pluginStatus[plugin] = reportedStatus;
|
||||
|
||||
if (reportedStatus.level !== previousReportedLevel) {
|
||||
this.updatePluginsStatuses([plugin]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -239,20 +239,20 @@ describe('StatusService', () => {
|
|||
|
||||
// Wait for timers to ensure that duplicate events are still filtered out regardless of debouncing.
|
||||
elasticsearch$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
elasticsearch$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
elasticsearch$.next({
|
||||
level: ServiceStatusLevels.available,
|
||||
summary: `Wow another summary`,
|
||||
});
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(degraded);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
subscription.unsubscribe();
|
||||
|
||||
expect(statusUpdates).toMatchInlineSnapshot(`
|
||||
|
@ -300,9 +300,9 @@ describe('StatusService', () => {
|
|||
savedObjects$.next(available);
|
||||
savedObjects$.next(degraded);
|
||||
// Waiting for the debounce timeout should cut a new update
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
subscription.unsubscribe();
|
||||
|
||||
expect(statusUpdates).toMatchInlineSnapshot(`
|
||||
|
@ -410,20 +410,20 @@ describe('StatusService', () => {
|
|||
|
||||
// Wait for timers to ensure that duplicate events are still filtered out regardless of debouncing.
|
||||
elasticsearch$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
elasticsearch$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
elasticsearch$.next({
|
||||
level: ServiceStatusLevels.available,
|
||||
summary: `Wow another summary`,
|
||||
});
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(degraded);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
subscription.unsubscribe();
|
||||
|
||||
expect(statusUpdates).toMatchInlineSnapshot(`
|
||||
|
@ -471,9 +471,9 @@ describe('StatusService', () => {
|
|||
savedObjects$.next(available);
|
||||
savedObjects$.next(degraded);
|
||||
// Waiting for the debounce timeout should cut a new update
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
savedObjects$.next(available);
|
||||
await delay(500);
|
||||
await delay(100);
|
||||
subscription.unsubscribe();
|
||||
|
||||
expect(statusUpdates).toMatchInlineSnapshot(`
|
||||
|
|
|
@ -25,7 +25,7 @@ import type { InternalCoreUsageDataSetup } from '../core_usage_data';
|
|||
import { config, StatusConfigType } from './status_config';
|
||||
import { ServiceStatus, CoreStatus, InternalStatusServiceSetup } from './types';
|
||||
import { getSummaryStatus } from './get_summary_status';
|
||||
import { PluginsStatusService } from './plugins_status';
|
||||
import { PluginsStatusService } from './cached_plugins_status';
|
||||
import { getOverallStatusChanges } from './log_overall_status';
|
||||
|
||||
interface StatusLogMeta extends LogMeta {
|
||||
|
@ -71,7 +71,7 @@ export class StatusService implements CoreService<InternalStatusServiceSetup> {
|
|||
|
||||
this.overall$ = combineLatest([core$, this.pluginsStatus.getAll$()]).pipe(
|
||||
// Prevent many emissions at once from dependency status resolution from making this too noisy
|
||||
debounceTime(500),
|
||||
debounceTime(80),
|
||||
map(([coreStatus, pluginsStatus]) => {
|
||||
const summary = getSummaryStatus([
|
||||
...Object.entries(coreStatus),
|
||||
|
@ -174,6 +174,8 @@ export class StatusService implements CoreService<InternalStatusServiceSetup> {
|
|||
this.subscriptions.forEach((subscription) => {
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
this.pluginsStatus?.stop();
|
||||
this.subscriptions = [];
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue