Add some collector classes for objects that get registered in a CollectorSet (#19098) (#19230)

* Add some collector classes for objects that get registered in a CollectorSet

* comment cleanup

* don't pass an inline-defined logger to collectorSet

* add a helper logger function so collector has access to logger at construction
This commit is contained in:
Tim Sullivan 2018-05-18 15:25:01 -07:00 committed by GitHub
parent f17b311dfe
commit 11c17c68e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 191 additions and 112 deletions

View file

@ -7,6 +7,7 @@
import { identity, noop } from 'lodash';
import sinon from 'sinon';
import expect from 'expect.js';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';
const DEBUG_LOG = [ 'debug', 'monitoring-ui', 'kibana-monitoring' ];
@ -17,32 +18,55 @@ const CHECK_DELAY = 100; // can be lower than COLLECTOR_INTERVAL because the col
describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
let log;
let server;
let init;
let cleanup;
let fetch;
beforeEach(() => {
log = sinon.spy();
server = {
log: sinon.spy()
};
init = noop;
cleanup = noop;
fetch = noop;
});
it('should skip bulk upload if payload is empty', (done) => {
const collectors = new CollectorSet({
it('should throw an error if non-Collector type of object is registered', () => {
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});
collectors.register({
const registerPojo = () => {
collectors.register({
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
};
expect(registerPojo).to.throwException(({ message }) => {
expect(message).to.be('CollectorSet can only have Collector instances registered');
});
});
it('should skip bulk upload if payload is empty', (done) => {
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
combineTypes: identity,
onPayload: identity
});
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));
collectors.start();
@ -50,19 +74,18 @@ describe('CollectorSet', () => {
setTimeout(() => {
collectors.cleanup();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
done(); // for async exit
}, CHECK_DELAY);
});
it('should run the bulk upload handler', (done) => {
const log = sinon.spy();
const combineTypes = sinon.spy(data => {
return [
data[0][0],
@ -71,21 +94,20 @@ describe('CollectorSet', () => {
});
const onPayload = sinon.spy();
const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes,
onPayload
});
fetch = () => ({ testFetch: true });
collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));
collectors.start();
@ -93,12 +115,12 @@ describe('CollectorSet', () => {
setTimeout(() => {
collectors.cleanup();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
// un-flattened
expect(combineTypes.getCall(0).args[0]).to.eql(
@ -115,29 +137,28 @@ describe('CollectorSet', () => {
});
it('should log the info-level status of stopping and restarting', (done) => {
const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});
collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));
collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
collectors.cleanup();
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
// exit
collectors.cleanup();

View file

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { getCollectorLogger } from '../lib';
export class Collector {
/*
* @param {Object} server - server object
* @param {String} properties.type - property name as the key for the data
* @param {Function} properties.init (optional) - initialization function
* @param {Function} properties.fetch - function to query data
* @param {Function} properties.cleanup (optional) - cleanup function
* @param {Boolean} properties.fetchAfterInit (optional) - if collector should fetch immediately after init
*/
constructor(server, { type, init, fetch, cleanup, fetchAfterInit }) {
this.type = type;
this.init = init;
this.fetch = fetch;
this.cleanup = cleanup;
this.fetchAfterInit = fetchAfterInit;
this.log = getCollectorLogger(server);
}
}

View file

@ -5,10 +5,10 @@
*/
import { flatten, isEmpty } from 'lodash';
import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG } from '../../../common/constants';
import Promise from 'bluebird';
const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
import { UsageCollector } from './usage_collector';
/*
* A collector object has types registered into it with the register(type)
@ -18,21 +18,18 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
export class CollectorSet {
/*
* @param options.interval {Number} in milliseconds
* @param options.logger {Function}
* @param options.combineTypes {Function}
* @param options.onPayload {Function}
* @param {Object} server - server object
* @param {Number} options.interval - in milliseconds
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
*/
constructor({ interval, logger, combineTypes, onPayload }) {
constructor(server, { interval, combineTypes, onPayload }) {
this._collectors = [];
this._timer = null;
if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}
if (typeof logger !== 'function') {
throw new Error('Logger function is required');
}
if (typeof combineTypes !== 'function') {
throw new Error('combineTypes function is required');
}
@ -40,11 +37,7 @@ export class CollectorSet {
throw new Error('onPayload function is required');
}
this._log = {
debug: message => logger(['debug', ...LOGGING_TAGS], message),
info: message => logger(['info', ...LOGGING_TAGS], message),
warn: message => logger(['warning', ...LOGGING_TAGS], message)
};
this._log = getCollectorLogger(server);
this._interval = interval;
this._combineTypes = combineTypes;
@ -52,13 +45,14 @@ export class CollectorSet {
}
/*
* @param {String} type.type
* @param {Function} type.init (optional)
* @param {Function} type.fetch
* @param {Function} type.cleanup (optional)
* @param collector {Collector} collector object
*/
register(type) {
this._collectors.push(type);
register(collector) {
// check instanceof
if (!(collector instanceof Collector)) {
throw new Error('CollectorSet can only have Collector instances registered');
}
this._collectors.push(collector);
}
/*
@ -75,10 +69,7 @@ export class CollectorSet {
collector.init();
}
if (collector.setLogger) {
this._log.debug(`Setting logger for ${collector.type} collector`);
collector.setLogger(this._log);
}
this._log.debug(`Setting logger for ${collector.type} collector`);
if (collector.fetchAfterInit) {
initialCollectors.push(collector);
@ -139,6 +130,19 @@ export class CollectorSet {
});
}
async bulkFetchUsage() {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
const bulk = await this._bulkFetch(usageCollectors);
// summarize each type of stat
return bulk.reduce((accumulatedStats, currentStat) => {
return {
...accumulatedStats,
[currentStat.type]: currentStat.result,
};
}, {});
}
cleanup() {
this._log.info(`Stopping all stats collectors`);

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Collector } from './collector';
export class UsageCollector extends Collector {
constructor(server, properties) {
super(server, properties);
}
}

View file

@ -22,7 +22,8 @@ describe('getKibanaUsageCollector', () => {
getCluster: sinon.stub()
}
},
config: () => ({ get: sinon.stub() })
config: () => ({ get: sinon.stub() }),
log: sinon.stub(),
};
serverStub.plugins.elasticsearch.getCluster.withArgs('admin').returns(clusterStub);
callClusterStub = callClusterFactory(serverStub).getCallClusterInternal();

View file

@ -6,6 +6,7 @@
import { get, snakeCase } from 'lodash';
import { KIBANA_USAGE_TYPE } from '../../../common/constants';
import { UsageCollector } from '../classes/usage_collector';
const TYPES = [
'dashboard',
@ -20,7 +21,7 @@ const TYPES = [
* Fetches saved object client counts by querying the saved object index
*/
export function getKibanaUsageCollector(server, callCluster) {
return {
return new UsageCollector(server, {
type: KIBANA_USAGE_TYPE,
async fetch() {
const index = server.config().get('kibana.index');
@ -52,9 +53,7 @@ export function getKibanaUsageCollector(server, callCluster) {
return {
index,
// combine the bucketCounts and 0s for types that don't have documents
...TYPES.reduce((acc, type) => ({
...TYPES.reduce((acc, type) => ({ // combine the bucketCounts and 0s for types that don't have documents
...acc,
[snakeCase(type)]: {
total: bucketCounts[type] || 0
@ -62,5 +61,5 @@ export function getKibanaUsageCollector(server, callCluster) {
}), {})
};
}
};
});
}

View file

@ -6,6 +6,7 @@
import { KIBANA_STATS_TYPE } from '../../../common/constants';
import { opsBuffer } from './ops_buffer';
import { Collector } from '../classes/collector';
/*
* Initialize a collector for Kibana Ops Stats
@ -20,11 +21,6 @@ export function getOpsStatsCollector(server) {
monitor.on('ops', onOps);
};
let _log;
const setLogger = logger => {
_log = logger;
};
const cleanup = () => {
if (monitor) {
monitor.removeListener('ops', onOps);
@ -34,7 +30,7 @@ export function getOpsStatsCollector(server) {
// This needs to be removed once the FIXME for 1301 is fixed
// `process` is a NodeJS global, and is always available without using require/import
process.on('SIGHUP', () => {
_log.info('Re-initializing Kibana Monitoring due to SIGHUP');
this.log.info('Re-initializing Kibana Monitoring due to SIGHUP');
/* This timeout is a temporary stop-gap until collecting stats is not bound to even-better
* and collecting stats is not interfered by logging configuration reloading
* Related to https://github.com/elastic/x-pack-kibana/issues/1301
@ -42,16 +38,15 @@ export function getOpsStatsCollector(server) {
setTimeout(() => {
cleanup();
init();
_log.info('Re-initialized Kibana Monitoring due to SIGHUP');
this.log.info('Re-initialized Kibana Monitoring due to SIGHUP');
}, 5 * 1000); // wait 5 seconds to avoid race condition with reloading logging configuration
});
return {
return new Collector(server, {
type: KIBANA_STATS_TYPE,
init,
setLogger,
fetch: buffer.flush,
fetchAfterInit: true,
cleanup
};
});
}

View file

@ -6,12 +6,13 @@
import { KIBANA_REPORTING_TYPE } from '../../../common/constants';
import { getReportingUsage } from '../../../../reporting';
import { UsageCollector } from '../';
export function getReportingUsageCollector(server, callCluster) {
return {
return new UsageCollector(server, {
type: KIBANA_REPORTING_TYPE,
fetch() {
return getReportingUsage(callCluster, server);
}
};
});
}

View file

@ -5,9 +5,9 @@
*/
import { get } from 'lodash';
import { XPACK_DEFAULT_ADMIN_EMAIL_UI_SETTING } from '../../../../../server/lib/constants';
import { KIBANA_SETTINGS_TYPE } from '../../../common/constants';
import { Collector } from '../classes/collector';
/*
* Check if Cluster Alert email notifications is enabled in config
@ -58,36 +58,28 @@ export function getSettingsCollector(server) {
const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin');
const config = server.config();
let _log;
const setLogger = logger => {
_log = logger;
};
const fetch = async () => {
let kibanaSettingsData;
const defaultAdminEmail = await checkForEmailValue(config, callWithInternalUser);
// skip everything if defaultAdminEmail === undefined
if (defaultAdminEmail || (defaultAdminEmail === null && shouldUseNull)) {
kibanaSettingsData = {
xpack: {
default_admin_email: defaultAdminEmail
}
};
_log.debug(`[${defaultAdminEmail}] default admin email setting found, sending [${KIBANA_SETTINGS_TYPE}] monitoring document.`);
} else {
_log.debug(`not sending [${KIBANA_SETTINGS_TYPE}] monitoring document because [${defaultAdminEmail}] is null or invalid.`);
}
// remember the current email so that we can mark it as successful if the bulk does not error out
shouldUseNull = !!defaultAdminEmail;
return kibanaSettingsData;
};
return {
return new Collector(server, {
type: KIBANA_SETTINGS_TYPE,
setLogger,
fetch
};
async fetch() {
let kibanaSettingsData;
const defaultAdminEmail = await checkForEmailValue(config, callWithInternalUser);
// skip everything if defaultAdminEmail === undefined
if (defaultAdminEmail || (defaultAdminEmail === null && shouldUseNull)) {
kibanaSettingsData = {
xpack: {
default_admin_email: defaultAdminEmail
}
};
this.log.debug(`[${defaultAdminEmail}] default admin email setting found, sending [${KIBANA_SETTINGS_TYPE}] monitoring document.`);
} else {
this.log.debug(`not sending [${KIBANA_SETTINGS_TYPE}] monitoring document because [${defaultAdminEmail}] is null or invalid.`);
}
// remember the current email so that we can mark it as successful if the bulk does not error out
shouldUseNull = !!defaultAdminEmail;
return kibanaSettingsData;
}
});
}

View file

@ -7,3 +7,4 @@
export { createCollectorSet } from './create_collector_set';
export { getKibanaUsageCollector } from './collectors/get_kibana_usage_collector';
export { getReportingUsageCollector } from './collectors/get_reporting_usage_collector';
export { UsageCollector } from './classes/usage_collector';

View file

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG } from '../../../common/constants';
const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
/*
* @param {Object} server
* @return {Object} helpful logger object
*/
export function getCollectorLogger(server) {
return {
debug: message => server.log(['debug', ...LOGGING_TAGS], message),
info: message => server.log(['info', ...LOGGING_TAGS], message),
warn: message => server.log(['warning', ...LOGGING_TAGS], message)
};
}

View file

@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { getCollectorLogger } from './get_collector_logger';

View file

@ -5,7 +5,7 @@
*/
import { callClusterFactory } from '../../../xpack_main';
import { CollectorSet } from './lib/collector_set';
import { CollectorSet } from './classes/collector_set';
import { getOpsStatsCollector } from './collectors/get_ops_stats_collector';
import { getSettingsCollector } from './collectors/get_settings_collector';
import { getKibanaUsageCollector } from './collectors/get_kibana_usage_collector';
@ -27,11 +27,8 @@ export function startCollectorSet(kbnServer, server, client, _sendBulkPayload =
const config = server.config();
const interval = config.get('xpack.monitoring.kibana.collection.interval');
const collectorSet = new CollectorSet({
const collectorSet = new CollectorSet(server, {
interval,
logger(...message) {
server.log(...message);
},
combineTypes: getCollectorTypesCombiner(kbnServer, config),
onPayload(payload) {
return _sendBulkPayload(client, interval, payload);