Separate bulk upload behavior from CollectorSet (#19691) (#20129)

* Separate bulk upload behavior from CollectorSet

 - takes out a lot of behavior from CollectorSet and moves it to a class called BulkUploader
 - simplifies kibana monitoring init by taking out the indirection that startCollectorSet / createCollectorSet had
 - removes start() method from CollectorSet and calls the collector objects' init() functions from CollectorSet.register()
 - removes cleanup method from collectorSet since that was doing work for bulk uploading

* remove cleanup and fetchAfterInit methods

* test for bulk_uploader class

* improve test for collector_set

* fix reporting

* expose collectorSet if there actually is a collectorSet

* comment for enclosed function

* make collectorSet creation/expose unconditional, bulkUploader more conditional

* fix collector_set tests

* lifecycle events

* stab at collectorSet error logging from the API call

* clean up comments

* clean up comments

* fix BulkUploader mocha test

* check kibanaCollectionEnabled config before registering bulk upload and the plugin status listeners

* no singleton timer object

* just log a warning if bulk uploader start called twice

* normal quotes

* check if bulk is enabled inside of the _fetchAndUpload method

* log for stopping bulk stats

* call bulkUploader.start with the collectorSet object

* call bulkUploader.start with the collectorSet object

* roll back change for module scoped variable

* oops I broke init

* init and logging: if / elseif / elseif

* remove unnecessary check/log

* help log

* remove redundant, use data.filter.map

* use xpackInfo.onLicenseInfoChange not xpackMainPlugin.status.on('green')

* help logging

* fix unit test

* remove handler that stops upload when connection is lost
This commit is contained in:
Tim Sullivan 2018-06-21 15:03:29 -07:00 committed by GitHub
parent 1bb5dd1c11
commit ab4f76bfed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 412 additions and 367 deletions

View file

@ -4,28 +4,49 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG, } from './common/constants';
import { requireUIRoutes } from './server/routes';
import { instantiateClient } from './server/es_client/instantiate_client';
import { initMonitoringXpackInfo } from './server/init_monitoring_xpack_info';
import { createCollectorSet } from './server/kibana_monitoring';
import { CollectorSet } from './server/kibana_monitoring/classes';
import { initBulkUploader } from './server/kibana_monitoring';
import {
getKibanaUsageCollector,
getOpsStatsCollector,
getSettingsCollector,
} from './server/kibana_monitoring/collectors';
/**
* Initialize the Kibana Monitoring plugin by starting up asynchronous server tasks
* - [1] instantiation of an elasticsearch-js client exposed as a server plugin object
* - [2] start monitoring cluster x-pack license and features check
* - [3] webserver route handling
* - [4] start the internal monitoring collectors
* - [4] start the internal monitoring collector/bulk uploader
* - [5] expose the monitoring collector object for other plugins to register with
* - [6] set monitoring plugin status to green
* @param monitoringPlugin {Object} Monitoring UI plugin
* @param server {Object} HapiJS server instance
*/
export const init = (monitoringPlugin, server) => {
monitoringPlugin.status.yellow('Initializing');
const xpackMainPlugin = server.plugins.xpack_main;
const config = server.config();
xpackMainPlugin.status.once('green', async () => {
const config = server.config();
const collectorSet = new CollectorSet(server);
server.expose('collectorSet', collectorSet); // expose the collectorSet service
/*
* Register collector objects for stats to show up in the APIs
*/
collectorSet.register(getOpsStatsCollector(server));
collectorSet.register(getKibanaUsageCollector(server));
collectorSet.register(getSettingsCollector(server));
/*
* Instantiate and start the internal background task that calls collector
* fetch methods and uploads to the ES monitoring bulk endpoint
*/
const xpackMainPlugin = server.plugins.xpack_main;
xpackMainPlugin.status.once('green', async () => { // first time xpack_main turns green
/*
* End-user-facing services
*/
const uiEnabled = config.get('xpack.monitoring.ui.enabled');
if (uiEnabled) {
@ -33,15 +54,33 @@ export const init = (monitoringPlugin, server) => {
await initMonitoringXpackInfo(server); // Route handlers depend on this for xpackInfo
await requireUIRoutes(server);
}
if (config.get('xpack.monitoring.kibana.collection.enabled')) {
const collectorSet = createCollectorSet(monitoringPlugin.kbnServer, server); // instantiate an object for collecting/sending metrics and usage stats
server.expose('collectorSet', collectorSet); // expose the collector set object on the server. other plugins will call statsCollectors.register(collector) to define their own collection
}
monitoringPlugin.status.green('Ready');
});
const bulkUploader = initBulkUploader(monitoringPlugin.kbnServer, server);
const kibanaCollectionEnabled = config.get('xpack.monitoring.kibana.collection.enabled');
const { info: xpackMainInfo } = xpackMainPlugin;
if (kibanaCollectionEnabled) {
/*
* Bulk uploading of Kibana stats
*/
xpackMainInfo.onLicenseInfoChange(() => {
// use updated xpack license info to start/stop bulk upload
const mainMonitoring = xpackMainInfo.feature('monitoring');
const monitoringBulkEnabled = mainMonitoring && mainMonitoring.isAvailable() && mainMonitoring.isEnabled();
if (monitoringBulkEnabled) {
bulkUploader.start(collectorSet);
} else {
bulkUploader.handleNotEnabled();
}
});
} else if (!kibanaCollectionEnabled) {
server.log(
['info', LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG],
'Internal collection for Kibana monitoring will is disabled per configuration.'
);
}
server.injectUiAppVars('monitoring', (server) => {
const config = server.config();
return {

View file

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { noop } from 'lodash';
import sinon from 'sinon';
import expect from 'expect.js';
import { Collector, CollectorSet } from '../classes';
import { BulkUploader } from '../bulk_uploader';
const FETCH_INTERVAL = 300;
const CHECK_DELAY = 500;
describe('BulkUploader', () => {
describe('registers a collector set and runs lifecycle events', () => {
let server;
beforeEach(() => {
server = {
log: sinon.spy(),
plugins: {
elasticsearch: {
getCluster: () => ({
createClient: () => ({
monitoring: {
bulk: sinon.spy(),
},
}),
callWithInternalUser: sinon.spy(), // this tests internal collection and bulk upload, not HTTP API
}),
},
},
};
});
it('should skip bulk upload if payload is empty', done => {
const collectors = new CollectorSet(server);
collectors.register(
new Collector(server, {
type: 'type_collector_test',
fetch: noop, // empty payloads
})
);
const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL,
combineTypes: noop,
});
uploader.start(collectors);
// allow interval to tick a few times
setTimeout(() => {
uploader.stop();
const loggingCalls = server.log.getCalls();
expect(loggingCalls.length).to.be.greaterThan(2); // should be 3-5: start, fetch, skip, fetch, skip
expect(loggingCalls[0].args).to.eql([
['info', 'monitoring-ui', 'kibana-monitoring'],
'Starting monitoring stats collection',
]);
expect(loggingCalls[1].args).to.eql([
['debug', 'monitoring-ui', 'kibana-monitoring'],
'Fetching data from type_collector_test collector',
]);
expect(loggingCalls[2].args).to.eql([
['debug', 'monitoring-ui', 'kibana-monitoring'],
'Skipping bulk uploading of an empty stats payload',
]);
expect(loggingCalls[loggingCalls.length - 1].args).to.eql([
['info', 'monitoring-ui', 'kibana-monitoring'],
'Monitoring stats collection is stopped',
]);
done();
}, CHECK_DELAY);
});
it('should run the bulk upload handler', done => {
const combineTypes = sinon.spy(data => {
return [data[0][0], { ...data[0][1], combined: true }];
});
const collectors = new CollectorSet(server);
collectors.register(
new Collector(server, {
type: 'type_collector_test',
fetch: () => ({ testData: 12345 }),
})
);
const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL,
combineTypes,
});
uploader.start(collectors);
// allow interval to tick a few times
setTimeout(() => {
uploader.stop();
const loggingCalls = server.log.getCalls();
expect(loggingCalls.length).to.be.greaterThan(2); // should be 3-5: start, fetch, upload, fetch, upload
expect(loggingCalls[0].args).to.eql([
['info', 'monitoring-ui', 'kibana-monitoring'],
'Starting monitoring stats collection',
]);
expect(loggingCalls[1].args).to.eql([
['debug', 'monitoring-ui', 'kibana-monitoring'],
'Fetching data from type_collector_test collector',
]);
expect(loggingCalls[2].args).to.eql([
['debug', 'monitoring-ui', 'kibana-monitoring'],
'Uploading bulk stats payload to the local cluster',
]);
// un-flattened
const combineCalls = combineTypes.getCalls();
expect(combineCalls.length).to.be.greaterThan(0); // should be 1-2 fetch and combine cycles
expect(combineCalls[0].args).to.eql([
[[{ index: { _type: 'type_collector_test' } }, { testData: 12345 }]],
]);
done();
}, CHECK_DELAY);
});
});
});

View file

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { isEmpty, flatten } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';
import {
getCollectorLogger,
sendBulkPayload,
monitoringBulk,
} from './lib';
/*
* Handles internal Kibana stats collection and uploading data to Monitoring
* bulk endpoint.
*
* NOTE: internal collection will be removed in 7.0
*
* Depends on
* - 'xpack.monitoring.kibana.collection.enabled' config
* - monitoring enabled in ES (checked against xpack_main.info license info change)
* The dependencies are handled upstream
* - Ops Events - essentially Kibana's /api/status
* - Usage Stats - essentially Kibana's /api/stats
* - Kibana Settings - select uiSettings
* @param {Object} server HapiJS server instance
* @param {Object} xpackInfo server.plugins.xpack_main.info object
*/
export class BulkUploader {
constructor(server, { interval, combineTypes }) {
if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}
if (typeof combineTypes !== 'function') {
throw new Error('combineTypes function is required');
}
this._timer = null;
this._interval = interval;
this._combineTypes = combineTypes;
this._log = getCollectorLogger(server);
this._client = server.plugins.elasticsearch.getCluster('admin').createClient({
plugins: [monitoringBulk],
});
this._callClusterWithInternalUser = callClusterFactory(server).getCallClusterInternal();
}
/*
* Start the interval timer
* @param {CollectorSet} collectorSet object to use for initial the fetch/upload and fetch/uploading on interval
* @return undefined
*/
start(collectorSet) {
this._log.info('Starting monitoring stats collection');
this._fetchAndUpload(collectorSet); // initial fetch
this._timer = setInterval(() => {
this._fetchAndUpload(collectorSet);
}, this._interval);
}
/*
* start() and stop() are lifecycle event handlers for
* xpackMainPlugin license changes
* @param {String} logPrefix help give context to the reason for stopping
*/
stop(logPrefix) {
clearInterval(this._timer);
this._timer = null;
const prefix = logPrefix ? logPrefix + ':' : '';
this._log.info(prefix + 'Monitoring stats collection is stopped');
}
handleNotEnabled() {
this.stop('Monitoring status upload endpoint is not enabled in Elasticsearch');
}
handleConnectionLost() {
this.stop('Connection issue detected');
}
/*
* @param {CollectorSet} collectorSet
* @return {Promise} - resolves to undefined
*/
async _fetchAndUpload(collectorSet) {
const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser);
const payload = data
.filter(d => Boolean(d) && !isEmpty(d.result))
.map(({ result, type }) => [{ index: { _type: type } }, result]);
if (payload.length > 0) {
try {
const combinedData = this._combineTypes(payload); // use the collector types combiner
this._log.debug(`Uploading bulk stats payload to the local cluster`);
this._onPayload(flatten(combinedData));
} catch (err) {
this._log.warn(err.stack);
this._log.warn(`Unable to bulk upload the stats payload to the local cluster`);
}
} else {
this._log.debug(`Skipping bulk uploading of an empty stats payload`);
}
}
_onPayload(payload) {
return sendBulkPayload(this._client, this._interval, payload);
}
}

View file

@ -4,54 +4,30 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { identity, noop } from 'lodash';
import { noop } from 'lodash';
import sinon from 'sinon';
import expect from 'expect.js';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';
const DEBUG_LOG = [ 'debug', 'monitoring-ui', 'kibana-monitoring' ];
const INFO_LOG = [ 'info', 'monitoring-ui', 'kibana-monitoring' ];
const COLLECTOR_INTERVAL = 10000;
const CHECK_DELAY = 100; // can be lower than COLLECTOR_INTERVAL because the collectors use fetchAfterInit
describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
let server;
let init;
let cleanup;
let fetch;
beforeEach(() => {
server = {
log: sinon.spy(),
plugins: {
elasticsearch: {
getCluster: () => ({
callWithInternalUser: sinon.spy() // this tests internal collection and bulk upload, not HTTP API
})
}
}
};
server = { log: sinon.spy() };
init = noop;
cleanup = noop;
fetch = noop;
});
it('should throw an error if non-Collector type of object is registered', () => {
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
combineTypes: identity,
onPayload: identity
});
const collectors = new CollectorSet(server);
const registerPojo = () => {
collectors.register({
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
};
@ -60,116 +36,25 @@ describe('CollectorSet', () => {
});
});
it('should skip bulk upload if payload is empty', (done) => {
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
combineTypes: identity,
onPayload: identity
});
it('should log debug status of fetching from the collector', async () => {
const mockCallCluster = () => Promise.resolve({ passTest: 1000 });
const collectors = new CollectorSet(server);
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
type: 'MY_TEST_COLLECTOR',
fetch: caller => caller()
}));
collectors.start();
// allow interval to tick a few times
setTimeout(() => {
collectors.cleanup();
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
done(); // for async exit
}, CHECK_DELAY);
});
it('should run the bulk upload handler', (done) => {
const combineTypes = sinon.spy(data => {
return [
data[0][0],
{ ...data[0][1], combined: true }
];
});
const onPayload = sinon.spy();
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
combineTypes,
onPayload
});
fetch = () => ({ testFetch: true });
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
}));
collectors.start();
// allow interval to tick a few times
setTimeout(() => {
collectors.cleanup();
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
// un-flattened
expect(combineTypes.getCall(0).args[0]).to.eql(
[ [ { index: { _type: 'type_collector_test' } }, { testFetch: true } ] ]
);
// flattened and altered
expect(onPayload.getCall(0).args[0]).to.eql(
[ { index: { _type: 'type_collector_test' } }, { testFetch: true, combined: true } ]
);
done(); // for async exit
}, CHECK_DELAY);
});
it('should log the info-level status of stopping and restarting', (done) => {
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
combineTypes: identity,
onPayload: identity
});
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
}));
collectors.start();
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
collectors.cleanup();
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
collectors.start();
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
// exit
collectors.cleanup();
done();
const result = await collectors.bulkFetch(mockCallCluster);
const calls = server.log.getCalls();
expect(calls.length).to.be(1);
expect(calls[0].args).to.eql([
['debug', 'monitoring-ui', 'kibana-monitoring'],
'Fetching data from MY_TEST_COLLECTOR collector',
]);
expect(result).to.eql([{
type: 'MY_TEST_COLLECTOR',
result: { passTest: 1000 }
}]);
});
});
});

View file

@ -9,18 +9,21 @@ import { getCollectorLogger } from '../lib';
export class Collector {
/*
* @param {Object} server - server object
* @param {String} properties.type - property name as the key for the data
* @param {Function} properties.init (optional) - initialization function
* @param {Function} properties.fetch - function to query data
* @param {Function} properties.cleanup (optional) - cleanup function -- TODO remove this, handle it in the collector itself
* @param {Boolean} properties.fetchAfterInit (optional) - if collector should fetch immediately after init -- TODO remove this, not useful
* @param {String} options.type - property name as the key for the data
* @param {Function} options.init (optional) - initialization function
* @param {Function} options.fetch - function to query data
*/
constructor(server, { type, init, fetch, cleanup, fetchAfterInit }) {
constructor(server, { type, init, fetch } = {}) {
if (type === undefined) {
throw new Error('Collector must be instantiated with a options.type string property');
}
if (typeof fetch !== 'function') {
throw new Error('Collector must be instantiated with a options.fetch function property');
}
this.type = type;
this.init = init;
this.fetch = fetch;
this.cleanup = cleanup;
this.fetchAfterInit = fetchAfterInit;
this.log = getCollectorLogger(server);
}

View file

@ -4,8 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { callClusterFactory } from '../../../../xpack_main';
import { flatten, isEmpty } from 'lodash';
import Promise from 'bluebird';
import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
@ -24,26 +22,9 @@ export class CollectorSet {
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
*/
constructor(server, { interval, combineTypes, onPayload }) {
this._collectors = [];
this._timer = null;
if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}
if (typeof combineTypes !== 'function') {
throw new Error('combineTypes function is required');
}
if (typeof onPayload !== 'function') {
throw new Error('onPayload function is required');
}
constructor(server) {
this._log = getCollectorLogger(server);
this._interval = interval;
this._combineTypes = combineTypes;
this._onPayload = onPayload;
this._callClusterInternal = callClusterFactory(server).getCallClusterInternal();
this._collectors = [];
}
/*
@ -54,70 +35,24 @@ export class CollectorSet {
if (!(collector instanceof Collector)) {
throw new Error('CollectorSet can only have Collector instances registered');
}
this._collectors.push(collector);
}
/*
* Call all the init methods
* if fetchAfterInit is true, fetch and collect immediately
*/
start() {
const initialCollectors = [];
this._log.info(`Starting all stats collectors`);
this._collectors.forEach(collector => {
if (collector.init) {
this._log.debug(`Initializing ${collector.type} collector`);
collector.init();
}
this._log.debug(`Setting logger for ${collector.type} collector`);
if (collector.fetchAfterInit) {
initialCollectors.push(collector);
}
});
// do some fetches and bulk collect
if (initialCollectors.length > 0) {
this._fetchAndUpload(this._callClusterInternal, initialCollectors);
}
this._timer = setInterval(() => {
this._fetchAndUpload(this._callClusterInternal, this._collectors);
}, this._interval);
}
async _fetchAndUpload(callCluster, collectors) {
const data = await this._bulkFetch(callCluster, collectors);
const usableData = data.filter(d => Boolean(d) && !isEmpty(d.result));
const payload = usableData.map(({ result, type }) => {
if (!isEmpty(result)) {
return [
{ index: { _type: type } },
result
];
}
});
if (payload.length > 0) {
try {
const combinedData = this._combineTypes(payload); // use the collector types combiner
this._log.debug(`Uploading bulk stats payload to the local cluster`);
this._onPayload(flatten(combinedData));
} catch(err) {
this._log.warn(err);
this._log.warn(`Unable to bulk upload the stats payload to the local cluster`);
}
} else {
this._log.debug(`Skipping bulk uploading of an empty stats payload`);
if (collector.init) {
this._log.debug(`Initializing ${collector.type} collector`);
collector.init();
}
}
/*
* Call a bunch of fetch methods and then do them in bulk
* @param {Array} collectors - an array of collectors, default to all registered collectors
*/
_bulkFetch(callCluster, collectors) {
bulkFetch(callCluster, collectors = this._collectors) {
if (!Array.isArray(collectors)) {
throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors);
}
return Promise.map(collectors, collector => {
const collectorType = collector.type;
this._log.debug(`Fetching data from ${collectorType} collector`);
@ -134,7 +69,7 @@ export class CollectorSet {
async bulkFetchUsage(callCluster) {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
const bulk = await this._bulkFetch(callCluster, usageCollectors);
const bulk = await this.bulkFetch(callCluster, usageCollectors);
// summarize each type of stat
return bulk.reduce((accumulatedStats, currentStat) => {
@ -149,18 +84,4 @@ export class CollectorSet {
};
}, {});
}
cleanup() {
this._log.info(`Stopping all stats collectors`);
// stop fetching
clearInterval(this._timer);
this._collectors.forEach(collector => {
if (collector.cleanup) {
this._log.debug(`Running ${collector.type} cleanup`);
collector.cleanup();
}
});
}
}

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { CollectorSet } from './collector_set';
export { Collector } from './collector';
export { UsageCollector } from './usage_collector';

View file

@ -6,7 +6,7 @@
import { get, snakeCase } from 'lodash';
import { KIBANA_USAGE_TYPE } from '../../../common/constants';
import { UsageCollector } from '../classes/usage_collector';
import { UsageCollector } from '../classes';
const TYPES = [
'dashboard',

View file

@ -6,47 +6,46 @@
import { KIBANA_STATS_TYPE } from '../../../common/constants';
import { opsBuffer } from './ops_buffer';
import { Collector } from '../classes/collector';
import { Collector } from '../classes';
/*
* Initialize a collector for Kibana Ops Stats
* FIXME: https://github.com/elastic/x-pack-kibana/issues/1301
*/
export function getOpsStatsCollector(server) {
let monitor;
const buffer = opsBuffer(server);
const onOps = event => buffer.push(event);
const init = () => {
const start = () => {
monitor = server.plugins['even-better'].monitor;
monitor.on('ops', onOps);
};
const cleanup = () => {
const stop = () => {
if (monitor) {
monitor.removeListener('ops', onOps);
}
};
// This needs to be removed once the FIXME for 1301 is fixed
/* Handle stopping / restarting the event listener if Elasticsearch stops and restarts
* NOTE it is possible for the plugin status to go from red to red and
* trigger handlers twice
*/
server.plugins.elasticsearch.status.on('red', stop);
server.plugins.elasticsearch.status.on('green', start);
// `process` is a NodeJS global, and is always available without using require/import
process.on('SIGHUP', () => {
this.log.info('Re-initializing Kibana Monitoring due to SIGHUP');
/* This timeout is a temporary stop-gap until collecting stats is not bound to even-better
* and collecting stats is not interfered by logging configuration reloading
* Related to https://github.com/elastic/x-pack-kibana/issues/1301
*/
setTimeout(() => {
cleanup();
init();
stop();
start();
this.log.info('Re-initialized Kibana Monitoring due to SIGHUP');
}, 5 * 1000); // wait 5 seconds to avoid race condition with reloading logging configuration
});
return new Collector(server, {
type: KIBANA_STATS_TYPE,
init,
fetch: buffer.flush,
fetchAfterInit: true,
cleanup
init: start,
fetch: buffer.flush
});
}

View file

@ -7,7 +7,7 @@
import { get } from 'lodash';
import { XPACK_DEFAULT_ADMIN_EMAIL_UI_SETTING } from '../../../../../server/lib/constants';
import { KIBANA_SETTINGS_TYPE } from '../../../common/constants';
import { Collector } from '../classes/collector';
import { Collector } from '../classes';
/*
* Check if Cluster Alert email notifications is enabled in config

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { getKibanaUsageCollector } from './get_kibana_usage_collector';
export { getOpsStatsCollector } from './get_ops_stats_collector';
export { getSettingsCollector } from './get_settings_collector';

View file

@ -1,35 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG, } from '../../common/constants';
import { monitoringBulk } from './lib/monitoring_bulk';
import { startCollectorSet } from './start_collector_set';
/**
* @param kbnServer {Object} manager of Kibana services - see `src/server/kbn_server` in Kibana core
* @param server {Object} HapiJS server instance
* @return {Object} CollectorSet instance to be exposed at a higher level, for other plugins to register their own type collectors
*/
export function createCollectorSet(kbnServer, server) {
const mainXpackInfo = server.plugins.xpack_main.info;
const mainMonitoring = mainXpackInfo.feature('monitoring');
let collectorSet;
if (mainXpackInfo && mainMonitoring.isAvailable() && mainMonitoring.isEnabled()) {
const client = server.plugins.elasticsearch.getCluster('admin').createClient({
plugins: [monitoringBulk]
});
collectorSet = startCollectorSet(kbnServer, server, client);
} else {
server.log(
['error', LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG],
'Unable to retrieve X-Pack info from the admin cluster. Kibana monitoring will be disabled until Kibana is restarted.'
);
}
return collectorSet;
}

View file

@ -4,6 +4,4 @@
* you may not use this file except in compliance with the Elastic License.
*/
export { createCollectorSet } from './create_collector_set';
export { getKibanaUsageCollector } from './collectors/get_kibana_usage_collector';
export { UsageCollector } from './classes/usage_collector';
export { initBulkUploader } from './init';

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { BulkUploader } from './bulk_uploader';
import { getCollectorTypesCombiner } from './lib';
/**
* Initialize different types of Kibana Monitoring
* TODO: remove this in 7.0
* - Ops Events - essentially Kibana's /api/status
* - Usage Stats - essentially Kibana's /api/stats
* - Kibana Settings - select uiSettings
* @param {Object} kbnServer manager of Kibana services - see `src/server/kbn_server` in Kibana core
* @param {Object} server HapiJS server instance
*/
export function initBulkUploader(kbnServer, server) {
const config = server.config();
const interval = config.get('xpack.monitoring.kibana.collection.interval');
return new BulkUploader(server, {
interval,
combineTypes: getCollectorTypesCombiner(kbnServer, config)
});
}

View file

@ -5,3 +5,6 @@
*/
export { getCollectorLogger } from './get_collector_logger';
export { getCollectorTypesCombiner } from './get_collector_types_combiner';
export { sendBulkPayload } from './send_bulk_payload';
export { monitoringBulk } from './monitoring_bulk';

View file

@ -1,52 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { CollectorSet } from './classes/collector_set';
import { getOpsStatsCollector } from './collectors/get_ops_stats_collector';
import { getSettingsCollector } from './collectors/get_settings_collector';
import { getKibanaUsageCollector } from './collectors/get_kibana_usage_collector';
import { sendBulkPayload } from './lib/send_bulk_payload';
import { getCollectorTypesCombiner } from './lib/get_collector_types_combiner';
/**
* Initialize different types of Kibana Monitoring
* - Ops Events - essentially Kibana's /api/status
* - Usage Stats - essentially Kibana's /api/stats
* - Kibana Settings - select uiSettings
* @param kbnServer {Object} manager of Kibana services - see `src/server/kbn_server` in Kibana core
* @param server {Object} HapiJS server instance
* @param client {Object} Dedicated ES Client with monitoringBulk plugin
* @return {Object} CollectorSet instance
*/
export function startCollectorSet(kbnServer, server, client, _sendBulkPayload = sendBulkPayload) {
const config = server.config();
const interval = config.get('xpack.monitoring.kibana.collection.interval');
const collectorSet = new CollectorSet(server, {
interval,
combineTypes: getCollectorTypesCombiner(kbnServer, config),
onPayload(payload) {
return _sendBulkPayload(client, interval, payload);
}
});
collectorSet.register(getKibanaUsageCollector(server));
collectorSet.register(getOpsStatsCollector(server));
collectorSet.register(getSettingsCollector(server));
// Startup Kibana cleanly or reconnect to Elasticsearch
server.plugins.elasticsearch.status.on('green', () => {
collectorSet.start();
});
// If connection to elasticsearch is lost
// NOTE it is possible for the plugin status to go from red to red and trigger cleanup twice
server.plugins.elasticsearch.status.on('red', () => {
collectorSet.cleanup();
});
return collectorSet;
}

View file

@ -8,7 +8,7 @@ import { uniq } from 'lodash';
import { getExportTypesHandler } from './get_export_type_handler';
import { getReportCountsByParameter } from './get_reporting_type_counts';
import { KIBANA_REPORTING_TYPE } from '../../common/constants';
import { UsageCollector } from '../../../monitoring/server/kibana_monitoring';
import { UsageCollector } from '../../../monitoring/server/kibana_monitoring/classes';
/**
* @typedef {Object} ReportingUsageStats Almost all of these stats are optional.

View file

@ -5,11 +5,11 @@
*/
/*
* TODO: deprecate this API in 7.0
* TODO: remove this API in 7.0
*/
import { wrap } from 'boom';
import { callClusterFactory } from '../../../lib/call_cluster_factory';
import { getKibanaUsageCollector } from '../../../../../monitoring/server/kibana_monitoring';
import { getKibanaUsageCollector } from '../../../../../monitoring/server/kibana_monitoring/collectors';
import { getReportingUsageCollector } from '../../../../../reporting/server/usage';
export function kibanaStatsRoute(server) {

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { wrap, serverTimeout as serverUnavailable } from 'boom';
import { wrap as wrapError } from 'boom';
const getClusterUuid = async callCluster => {
const { cluster_uuid: uuid } = await callCluster('info', { filterPath: 'cluster_uuid', });
@ -15,12 +15,8 @@ const getClusterUuid = async callCluster => {
* @return {Object} data from usage stats collectors registered with Monitoring CollectorSet
* @throws {Error} if the Monitoring CollectorSet is not ready
*/
const getUsage = async (callCluster, server) => {
const getUsage = (callCluster, server) => {
const { collectorSet } = server.plugins.monitoring;
if (collectorSet === undefined) {
const error = new Error('CollectorSet from Monitoring plugin is not ready for collecting usage'); // moving kibana_monitoring lib to xpack_main will make this unnecessary
throw serverUnavailable(error);
}
return collectorSet.bulkFetchUsage(callCluster);
};
@ -44,11 +40,11 @@ export function xpackUsageRoute(server) {
...xpackUsage
});
} catch(err) {
req.log(['error'], err);
req.log(['error'], err); // FIXME doesn't seem to log anything useful if ES times out
if (err.isBoom) {
reply(err);
} else {
reply(wrap(err, err.statusCode, err.message));
reply(wrapError(err, err.statusCode, err.message));
}
}
}