[Monitoring] Bulk Uploader uses new ES client (#94908) (#95174)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
# Conflicts:
#	x-pack/plugins/monitoring/server/plugin.test.ts
#	x-pack/plugins/monitoring/server/plugin.ts
#	x-pack/plugins/monitoring/server/types.ts
This commit is contained in:
Alejandro Fernández Haro 2021-03-23 18:11:36 +01:00 committed by GitHub
parent 562d3d1d9b
commit ee91d0fdce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 66 deletions

View file

@ -5,29 +5,28 @@
* 2.0.
*/
import { Observable, Subscription } from 'rxjs';
import type { Observable, Subscription } from 'rxjs';
import { take } from 'rxjs/operators';
import moment from 'moment';
import {
ElasticsearchServiceSetup,
ILegacyCustomClusterClient,
import type {
ElasticsearchClient,
Logger,
OpsMetrics,
ServiceStatus,
ServiceStatusLevel,
ServiceStatusLevels,
} from '../../../../../src/core/server';
} from 'src/core/server';
import { ServiceStatusLevels } from '../../../../../src/core/server';
import { KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE } from '../../common/constants';
import { sendBulkPayload, monitoringBulk } from './lib';
import { sendBulkPayload } from './lib';
import { getKibanaSettings } from './collectors';
import { MonitoringConfig } from '../config';
import type { MonitoringConfig } from '../config';
import type { IBulkUploader } from '../types';
export interface BulkUploaderOptions {
log: Logger;
config: MonitoringConfig;
interval: number;
elasticsearch: ElasticsearchServiceSetup;
statusGetter$: Observable<ServiceStatus>;
opsMetrics$: Observable<OpsMetrics>;
kibanaStats: KibanaStats;
@ -61,11 +60,11 @@ export interface KibanaStats {
* @param {Object} server HapiJS server instance
* @param {Object} xpackInfo server.plugins.xpack_main.info object
*/
export class BulkUploader {
export class BulkUploader implements IBulkUploader {
private readonly _log: Logger;
private readonly _cluster: ILegacyCustomClusterClient;
private readonly kibanaStats: KibanaStats;
private readonly kibanaStatusGetter$: Subscription;
private readonly kibanaStatusGetter$: Observable<ServiceStatus>;
private kibanaStatusSubscription?: Subscription;
private readonly opsMetrics$: Observable<OpsMetrics>;
private kibanaStatus: ServiceStatusLevel | null;
private _timer: NodeJS.Timer | null;
@ -75,7 +74,6 @@ export class BulkUploader {
log,
config,
interval,
elasticsearch,
statusGetter$,
opsMetrics$,
kibanaStats,
@ -91,16 +89,10 @@ export class BulkUploader {
this._interval = interval;
this._log = log;
this._cluster = elasticsearch.legacy.createClient('admin', {
plugins: [monitoringBulk],
});
this.kibanaStats = kibanaStats;
this.kibanaStatus = null;
this.kibanaStatusGetter$ = statusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});
this.kibanaStatusGetter$ = statusGetter$;
}
/*
@ -108,17 +100,21 @@ export class BulkUploader {
* @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval
* @return undefined
*/
public start() {
public start(esClient: ElasticsearchClient) {
this._log.info('Starting monitoring stats collection');
this.kibanaStatusSubscription = this.kibanaStatusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});
if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(); // initial fetch
this._fetchAndUpload(esClient); // initial fetch
}
this._timer = setInterval(() => {
this._fetchAndUpload();
this._fetchAndUpload(esClient);
}, this._interval);
}
@ -131,8 +127,7 @@ export class BulkUploader {
if (this._timer) clearInterval(this._timer);
this._timer = null;
this.kibanaStatusGetter$.unsubscribe();
this._cluster.close();
this.kibanaStatusSubscription?.unsubscribe();
const prefix = logPrefix ? logPrefix + ':' : '';
this._log.info(prefix + 'Monitoring stats collection is stopped');
@ -168,7 +163,7 @@ export class BulkUploader {
};
}
private async _fetchAndUpload() {
private async _fetchAndUpload(esClient: ElasticsearchClient) {
const data = await Promise.all([
{ type: KIBANA_STATS_TYPE_MONITORING, result: await this.getOpsMetrics() },
{ type: KIBANA_SETTINGS_TYPE, result: await getKibanaSettings(this._log, this.config) },
@ -178,7 +173,7 @@ export class BulkUploader {
if (payload && payload.length > 0) {
try {
this._log.debug(`Uploading bulk stats payload to the local cluster`);
await this._onPayload(payload);
await this._onPayload(esClient, payload);
this._log.debug(`Uploaded bulk stats payload to the local cluster`);
} catch (err) {
this._log.warn(err.stack);
@ -189,8 +184,8 @@ export class BulkUploader {
}
}
private async _onPayload(payload: object[]) {
return await sendBulkPayload(this._cluster, this._interval, payload);
private async _onPayload(esClient: ElasticsearchClient, payload: object[]) {
return await sendBulkPayload(esClient, this._interval, payload);
}
private getConvertedKibanaStatus() {

View file

@ -5,21 +5,22 @@
* 2.0.
*/
import { ILegacyClusterClient } from 'src/core/server';
import type { ElasticsearchClient } from 'src/core/server';
import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';
/*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/
export async function sendBulkPayload(
cluster: ILegacyClusterClient,
esClient: ElasticsearchClient,
interval: number,
payload: object[]
) {
return cluster.callAsInternalUser('monitoring.bulk', {
const { body } = await esClient.monitoring.bulk({
system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION,
interval: interval + 'ms',
body: payload,
});
return body;
}

View file

@ -32,7 +32,6 @@ jest.mock('./config', () => ({
describe('Monitoring plugin', () => {
const coreSetup = coreMock.createSetup();
coreSetup.http.getServerInfo.mockReturnValue({ port: 5601 } as any);
coreSetup.status.overall$.subscribe = jest.fn();
const setupPlugins = {
usageCollection: {
@ -60,13 +59,13 @@ describe('Monitoring plugin', () => {
afterEach(() => {
(setupPlugins.alerts.registerType as jest.Mock).mockReset();
(coreSetup.status.overall$.subscribe as jest.Mock).mockReset();
});
it('always create the bulk uploader', async () => {
const plugin = new MonitoringPlugin(initializerContext as any);
await plugin.setup(coreSetup, setupPlugins as any);
expect(coreSetup.status.overall$.subscribe).toHaveBeenCalled();
// eslint-disable-next-line dot-notation
expect(plugin['bulkUploader']).not.toBeUndefined();
});
it('should register all alerts', async () => {

View file

@ -151,7 +151,6 @@ export class MonitoringPlugin
// Always create the bulk uploader
const kibanaMonitoringLog = this.getLogger(KIBANA_MONITORING_LOGGING_TAG);
const bulkUploader = (this.bulkUploader = initBulkUploader({
elasticsearch: core.elasticsearch,
config,
log: kibanaMonitoringLog,
opsMetrics$: core.metrics.getOpsMetrics$(),
@ -169,34 +168,6 @@ export class MonitoringPlugin
},
}));
// If collection is enabled, start it
const kibanaCollectionEnabled = config.kibana.collection.enabled;
if (kibanaCollectionEnabled) {
// Do not use `this.licenseService` as that looks at the monitoring cluster
// whereas we want to check the production cluster here
if (plugins.licensing) {
plugins.licensing.license$.subscribe((license: any) => {
// use updated xpack license info to start/stop bulk upload
const mainMonitoring = license.getFeature('monitoring');
const monitoringBulkEnabled =
mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled;
if (monitoringBulkEnabled) {
bulkUploader.start();
} else {
bulkUploader.handleNotEnabled();
}
});
} else {
kibanaMonitoringLog.warn(
'Internal collection for Kibana monitoring is disabled due to missing license information.'
);
}
} else {
kibanaMonitoringLog.info(
'Internal collection for Kibana monitoring is disabled per configuration.'
);
}
// If the UI is enabled, then we want to register it so it shows up
// and start any other UI-related setup tasks
if (config.ui.enabled) {
@ -228,7 +199,38 @@ export class MonitoringPlugin
};
}
start() {}
start(core: CoreStart, { licensing }: PluginsStart) {
const config = createConfig(this.initializerContext.config.get<TypeOf<typeof configSchema>>());
// If collection is enabled, start it
const kibanaMonitoringLog = this.getLogger(KIBANA_MONITORING_LOGGING_TAG);
const kibanaCollectionEnabled = config.kibana.collection.enabled;
if (kibanaCollectionEnabled) {
// Do not use `this.licenseService` as that looks at the monitoring cluster
// whereas we want to check the production cluster here
if (this.bulkUploader && licensing) {
licensing.license$.subscribe((license: any) => {
// use updated xpack license info to start/stop bulk upload
const mainMonitoring = license.getFeature('monitoring');
const monitoringBulkEnabled =
mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled;
if (monitoringBulkEnabled) {
this.bulkUploader?.start(core.elasticsearch.client.asInternalUser);
} else {
this.bulkUploader?.handleNotEnabled();
}
});
} else {
kibanaMonitoringLog.warn(
'Internal collection for Kibana monitoring is disabled due to missing license information.'
);
}
} else {
kibanaMonitoringLog.info(
'Internal collection for Kibana monitoring is disabled per configuration.'
);
}
}
stop() {
if (this.cluster) {

View file

@ -12,6 +12,7 @@ import type {
Logger,
ILegacyCustomClusterClient,
RequestHandlerContext,
ElasticsearchClient,
} from 'kibana/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { LicenseFeature, ILicense } from '../../licensing/server';
@ -25,7 +26,7 @@ import {
PluginSetupContract as AlertingPluginSetupContract,
} from '../../alerts/server';
import { InfraPluginSetup } from '../../infra/server';
import { LicensingPluginSetup } from '../../licensing/server';
import { LicensingPluginSetup, LicensingPluginStart } from '../../licensing/server';
import { PluginSetupContract as FeaturesPluginSetupContract } from '../../features/server';
import { EncryptedSavedObjectsPluginSetup } from '../../encrypted_saved_objects/server';
import { CloudSetup } from '../../cloud/server';
@ -62,6 +63,7 @@ export interface RequestHandlerContextMonitoringPlugin extends RequestHandlerCon
export interface PluginsStart {
alerts: AlertingPluginStartContract;
actions: ActionsPluginsStartContact;
licensing: LicensingPluginStart;
}
export interface MonitoringCoreConfig {
@ -92,6 +94,8 @@ export interface LegacyShimDependencies {
export interface IBulkUploader {
getKibanaStats: () => any;
stop: () => void;
start: (esClient: ElasticsearchClient) => void;
handleNotEnabled: () => void;
}
export interface MonitoringPluginSetup {