mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
* Retrofit the Bulk Uploader types combiner [ch2198] fix usage collector, add comments to formatForBulk remove unnecessary customizations * override default format for bulk upload for usage type collectors * rename to ignoreForInternalUploader * collectors -> collectorSet * use constant for kibana_stats type * example of data formatting for bulk in function comment
This commit is contained in:
parent
1402b63876
commit
cba2c02135
14 changed files with 218 additions and 289 deletions
|
@ -44,6 +44,7 @@ export function getOpsStatsCollector(server, kbnServer) {
|
|||
kibana: getKibanaInfoForStats(server, kbnServer),
|
||||
...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index
|
||||
};
|
||||
}
|
||||
},
|
||||
ignoreForInternalUploader: true, // Ignore this one from internal uploader. A different stats collector is used there.
|
||||
});
|
||||
}
|
||||
|
|
|
@ -25,26 +25,48 @@ export class Collector {
|
|||
* @param {String} options.type - property name as the key for the data
|
||||
* @param {Function} options.init (optional) - initialization function
|
||||
* @param {Function} options.fetch - function to query data
|
||||
* @param {Function} options.formatForBulkUpload - optional
|
||||
* @param {Function} options.rest - optional other properties
|
||||
*/
|
||||
constructor(server, { type, init, fetch } = {}) {
|
||||
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
|
||||
if (type === undefined) {
|
||||
throw new Error('Collector must be instantiated with a options.type string property');
|
||||
}
|
||||
if (typeof init !== 'undefined' && typeof init !== 'function') {
|
||||
throw new Error('If init property is passed, Collector must be instantiated with a options.init as a function property');
|
||||
}
|
||||
if (typeof fetch !== 'function') {
|
||||
throw new Error('Collector must be instantiated with a options.fetch function property');
|
||||
}
|
||||
|
||||
this.log = getCollectorLogger(server);
|
||||
|
||||
Object.assign(this, options); // spread in other properties and mutate "this"
|
||||
|
||||
this.type = type;
|
||||
this.init = init;
|
||||
this.fetch = fetch;
|
||||
|
||||
this.log = getCollectorLogger(server);
|
||||
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
|
||||
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param {Function} callCluster - callCluster function
|
||||
*/
|
||||
fetchInternal(callCluster) {
|
||||
if (typeof callCluster !== 'function') {
|
||||
throw new Error('A `callCluster` function must be passed to the fetch methods of collectors');
|
||||
}
|
||||
return this.fetch(callCluster);
|
||||
}
|
||||
|
||||
/*
|
||||
* A hook for allowing the fetched data payload to be organized into a typed
|
||||
* data model for internal bulk upload. See defaultFormatterForBulkUpload for
|
||||
* a generic example.
|
||||
*/
|
||||
formatForBulkUpload(result) {
|
||||
return this._formatForBulkUpload(result);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,19 +26,17 @@ import { UsageCollector } from './usage_collector';
|
|||
/*
|
||||
* A collector object has types registered into it with the register(type)
|
||||
* function. Each type that gets registered defines how to fetch its own data
|
||||
* and combine it into a unified payload for bulk upload.
|
||||
* and optionally, how to combine it into a unified payload for bulk upload.
|
||||
*/
|
||||
export class CollectorSet {
|
||||
|
||||
/*
|
||||
* @param {Object} server - server object
|
||||
* @param {Number} options.interval - in milliseconds
|
||||
* @param {Function} options.combineTypes
|
||||
* @param {Function} options.onPayload
|
||||
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
|
||||
*/
|
||||
constructor(server) {
|
||||
constructor(server, collectors = []) {
|
||||
this._log = getCollectorLogger(server);
|
||||
this._collectors = [];
|
||||
this._collectors = collectors;
|
||||
|
||||
/*
|
||||
* Helper Factory methods
|
||||
|
@ -46,6 +44,7 @@ export class CollectorSet {
|
|||
*/
|
||||
this.makeStatsCollector = options => new Collector(server, options);
|
||||
this.makeUsageCollector = options => new UsageCollector(server, options);
|
||||
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -71,14 +70,14 @@ export class CollectorSet {
|
|||
|
||||
/*
|
||||
* Call a bunch of fetch methods and then do them in bulk
|
||||
* @param {Array} collectors - an array of collectors, default to all registered collectors
|
||||
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
|
||||
*/
|
||||
bulkFetch(callCluster, collectors = this._collectors) {
|
||||
if (!Array.isArray(collectors)) {
|
||||
throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors);
|
||||
bulkFetch(callCluster, collectorSet = this) {
|
||||
if (!(collectorSet instanceof CollectorSet)) {
|
||||
throw new Error(`bulkFetch method given bad collectorSet parameter: ` + typeof collectorSet);
|
||||
}
|
||||
|
||||
return Promise.map(collectors, collector => {
|
||||
const fetchPromises = collectorSet.map(collector => {
|
||||
const collectorType = collector.type;
|
||||
this._log.debug(`Fetching data from ${collectorType} collector`);
|
||||
return Promise.props({
|
||||
|
@ -90,10 +89,19 @@ export class CollectorSet {
|
|||
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
|
||||
});
|
||||
});
|
||||
return Promise.all(fetchPromises);
|
||||
}
|
||||
|
||||
/*
|
||||
* @return {new CollectorSet}
|
||||
*/
|
||||
getFilteredCollectorSet(filter) {
|
||||
const filtered = this._collectors.filter(filter);
|
||||
return this._makeCollectorSetFromArray(filtered);
|
||||
}
|
||||
|
||||
async bulkFetchUsage(callCluster) {
|
||||
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
|
||||
const usageCollectors = this.getFilteredCollectorSet(c => c instanceof UsageCollector);
|
||||
return this.bulkFetch(callCluster, usageCollectors);
|
||||
}
|
||||
|
||||
|
@ -137,4 +145,8 @@ export class CollectorSet {
|
|||
};
|
||||
}, {});
|
||||
}
|
||||
|
||||
map(mapFn) {
|
||||
return this._collectors.map(mapFn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,35 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { KIBANA_STATS_TYPE } from '../../status/constants';
|
||||
import { Collector } from './collector';
|
||||
|
||||
export class UsageCollector extends Collector {}
|
||||
export class UsageCollector extends Collector {
|
||||
/*
|
||||
* @param {Object} server - server object
|
||||
* @param {String} options.type - property name as the key for the data
|
||||
* @param {Function} options.init (optional) - initialization function
|
||||
* @param {Function} options.fetch - function to query data
|
||||
* @param {Function} options.formatForBulkUpload - optional
|
||||
* @param {Function} options.rest - optional other properties
|
||||
*/
|
||||
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
|
||||
super(server, { type, init, fetch, formatForBulkUpload, ...options });
|
||||
|
||||
/*
|
||||
* Currently, for internal bulk uploading, usage stats are part of
|
||||
* `kibana_stats` type, under the `usage` namespace in the document.
|
||||
*/
|
||||
const defaultUsageFormatterForBulkUpload = result => {
|
||||
return {
|
||||
type: KIBANA_STATS_TYPE,
|
||||
payload: {
|
||||
usage: {
|
||||
[type]: result
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
this._formatForBulkUpload = formatForBulkUpload || defaultUsageFormatterForBulkUpload;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ export const MONITORING_SYSTEM_API_VERSION = '6';
|
|||
* The type name used within the Monitoring index to publish Kibana ops stats.
|
||||
* @type {string}
|
||||
*/
|
||||
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_monitoring'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
|
||||
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
|
||||
/**
|
||||
* The type name used within the Monitoring index to publish Kibana stats.
|
||||
* @type {string}
|
||||
|
|
|
@ -14,8 +14,15 @@ const CHECK_DELAY = 500;
|
|||
|
||||
class MockCollectorSet {
|
||||
constructor(_mockServer, mockCollectors) {
|
||||
this.mockServer = _mockServer;
|
||||
this.mockCollectors = mockCollectors;
|
||||
}
|
||||
getCollectorByType(type) {
|
||||
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
|
||||
}
|
||||
getFilteredCollectorSet(filter) {
|
||||
return new MockCollectorSet(this.mockServer, this.mockCollectors.filter(filter));
|
||||
}
|
||||
async bulkFetch() {
|
||||
return this.mockCollectors.map(({ fetch }) => fetch());
|
||||
}
|
||||
|
@ -47,7 +54,8 @@ describe('BulkUploader', () => {
|
|||
const collectors = new MockCollectorSet(server, [
|
||||
{
|
||||
type: 'type_collector_test',
|
||||
fetch: noop, // empty payloads
|
||||
fetch: noop, // empty payloads,
|
||||
formatForBulkUpload: result => result,
|
||||
}
|
||||
]);
|
||||
|
||||
|
@ -82,7 +90,10 @@ describe('BulkUploader', () => {
|
|||
|
||||
it('should run the bulk upload handler', done => {
|
||||
const collectors = new MockCollectorSet(server, [
|
||||
{ fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }) }
|
||||
{
|
||||
fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }),
|
||||
formatForBulkUpload: result => result
|
||||
}
|
||||
]);
|
||||
const uploader = new BulkUploader(server, {
|
||||
interval: FETCH_INTERVAL
|
||||
|
|
|
@ -1,187 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { KIBANA_STATS_TYPE_MONITORING, KIBANA_USAGE_TYPE, KIBANA_SETTINGS_TYPE } from '../../common/constants';
|
||||
import { KIBANA_REPORTING_TYPE } from '../../../reporting/common/constants';
|
||||
import { BulkUploader } from './bulk_uploader';
|
||||
|
||||
const getInitial = () => {
|
||||
return [
|
||||
[
|
||||
{ 'index': { '_type': KIBANA_STATS_TYPE_MONITORING } },
|
||||
{
|
||||
'host': 'tsullivan.local',
|
||||
'concurrent_connections': 0,
|
||||
'os': {
|
||||
'load': { '1m': 2.28857421875, '5m': 2.45068359375, '15m': 2.29248046875 },
|
||||
'memory': { 'total_in_bytes': 17179869184, 'free_in_bytes': 527749120, 'used_in_bytes': 16652120064 },
|
||||
'uptime_in_millis': 1211027000
|
||||
},
|
||||
'process': {
|
||||
'event_loop_delay': 4.222616970539093,
|
||||
'memory': {
|
||||
'heap': { 'total_in_bytes': 219455488, 'used_in_bytes': 152622064, 'size_limit': 1501560832 },
|
||||
'resident_set_size_in_bytes': 245923840
|
||||
},
|
||||
'uptime_in_millis': 18467
|
||||
},
|
||||
'requests': {
|
||||
'disconnects': 0,
|
||||
'total': 2,
|
||||
},
|
||||
'response_times': { 'average': 47, 'max': 47 },
|
||||
'timestamp': '2017-07-26T00:14:20.771Z',
|
||||
}
|
||||
],
|
||||
[
|
||||
{ 'index': { '_type': KIBANA_USAGE_TYPE } },
|
||||
{
|
||||
'dashboard': { 'total': 0 },
|
||||
'visualization': { 'total': 0 },
|
||||
'search': { 'total': 0 },
|
||||
'index_pattern': { 'total': 2 },
|
||||
'index': '.kibana'
|
||||
}
|
||||
],
|
||||
[
|
||||
{ 'index': { '_type': KIBANA_REPORTING_TYPE } },
|
||||
{
|
||||
'available': true,
|
||||
'enabled': false,
|
||||
'_all': 55,
|
||||
'csv': {
|
||||
'available': true,
|
||||
'count': 25
|
||||
},
|
||||
'printable_pdf': {
|
||||
'available': true,
|
||||
'count': 30
|
||||
}
|
||||
}
|
||||
],
|
||||
[
|
||||
{ 'index': { '_type': KIBANA_SETTINGS_TYPE } },
|
||||
{ 'xpack': { 'defaultAdminEmail': 'tim@elastic.co' } }
|
||||
]
|
||||
];
|
||||
};
|
||||
|
||||
// TODO use jest snapshotting
|
||||
const getResult = () => {
|
||||
return [
|
||||
[
|
||||
{ 'index': { '_type': 'kibana_stats' } },
|
||||
{
|
||||
'host': 'tsullivan.local',
|
||||
'concurrent_connections': 0,
|
||||
'os': {
|
||||
'load': { '1m': 2.28857421875, '5m': 2.45068359375, '15m': 2.29248046875 },
|
||||
'memory': { 'total_in_bytes': 17179869184, 'free_in_bytes': 527749120, 'used_in_bytes': 16652120064 },
|
||||
'uptime_in_millis': 1211027000
|
||||
},
|
||||
'process': {
|
||||
'event_loop_delay': 4.222616970539093,
|
||||
'memory': {
|
||||
'heap': { 'total_in_bytes': 219455488, 'used_in_bytes': 152622064, 'size_limit': 1501560832 },
|
||||
'resident_set_size_in_bytes': 245923840
|
||||
},
|
||||
'uptime_in_millis': 18467
|
||||
},
|
||||
'requests': {
|
||||
'disconnects': 0,
|
||||
'total': 2,
|
||||
},
|
||||
'response_times': { 'average': 47, 'max': 47 },
|
||||
'timestamp': '2017-07-26T00:14:20.771Z',
|
||||
'usage': {
|
||||
'dashboard': { 'total': 0 },
|
||||
'visualization': { 'total': 0 },
|
||||
'search': { 'total': 0 },
|
||||
'index_pattern': { 'total': 2 },
|
||||
'index': '.kibana',
|
||||
'xpack': {
|
||||
'reporting': {
|
||||
'_all': 55,
|
||||
'available': true,
|
||||
'csv': {
|
||||
'available': true,
|
||||
'count': 25,
|
||||
},
|
||||
'enabled': false,
|
||||
'printable_pdf': {
|
||||
'available': true,
|
||||
'count': 30,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
[
|
||||
{ 'index': { '_type': 'kibana_settings' } },
|
||||
{
|
||||
'xpack': { 'defaultAdminEmail': 'tim@elastic.co' },
|
||||
}
|
||||
]
|
||||
];
|
||||
};
|
||||
|
||||
describe('Collector Types Combiner', () => {
|
||||
describe('with all the data types present', () => {
|
||||
it('provides settings, and combined stats/usage data', () => {
|
||||
// default gives all the data types
|
||||
const initial = getInitial();
|
||||
const result = BulkUploader.combineStatsLegacy(initial);
|
||||
expect(result).toEqual(getResult());
|
||||
});
|
||||
});
|
||||
describe('with settings data missing', () => {
|
||||
it('provides combined stats/usage data', () => {
|
||||
// default gives all the data types
|
||||
const initial = getInitial();
|
||||
const trimmedInitial = [ initial[0], initial[1], initial[2] ]; // just stats, usage and reporting, no settings
|
||||
const result = BulkUploader.combineStatsLegacy(trimmedInitial);
|
||||
const expectedResult = getResult();
|
||||
const trimmedExpectedResult = [ expectedResult[0] ]; // single combined item
|
||||
expect(result).toEqual(trimmedExpectedResult);
|
||||
});
|
||||
});
|
||||
describe('with usage data missing', () => {
|
||||
it('provides settings, and stats data', () => {
|
||||
// default gives all the data types
|
||||
const initial = getInitial();
|
||||
const trimmedInitial = [ initial[0], initial[3] ]; // just stats and settings, no usage or reporting
|
||||
const result = BulkUploader.combineStatsLegacy(trimmedInitial);
|
||||
const expectedResult = getResult();
|
||||
delete expectedResult[0][1].usage; // usage stats should not be present in the result
|
||||
const trimmedExpectedResult = [ expectedResult[0], expectedResult[1] ];
|
||||
expect(result).toEqual(trimmedExpectedResult);
|
||||
});
|
||||
});
|
||||
describe('with stats data missing', () => {
|
||||
it('provides settings data', () => {
|
||||
// default gives all the data types
|
||||
const initial = getInitial();
|
||||
const trimmedInitial = [ initial[3] ]; // just settings
|
||||
const result = BulkUploader.combineStatsLegacy(trimmedInitial);
|
||||
const expectedResult = getResult();
|
||||
const trimmedExpectedResult = [ expectedResult[1] ]; // just settings
|
||||
expect(result).toEqual(trimmedExpectedResult);
|
||||
});
|
||||
});
|
||||
|
||||
it('throws an error if duplicate types are registered', () => {
|
||||
const combineWithDuplicate = () => {
|
||||
const initial = getInitial();
|
||||
const withDuplicate = [ initial[0] ].concat(initial);
|
||||
return BulkUploader.combineStatsLegacy(withDuplicate);
|
||||
};
|
||||
expect(combineWithDuplicate).toThrow(
|
||||
'Duplicate collector type identifiers found in payload! ' +
|
||||
'kibana_stats_monitoring,kibana_stats_monitoring,kibana,reporting,kibana_settings'
|
||||
);
|
||||
});
|
||||
});
|
|
@ -4,19 +4,16 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { get, set, isEmpty, flatten, uniq } from 'lodash';
|
||||
import { defaultsDeep, isEmpty, uniq, compact } from 'lodash';
|
||||
import { callClusterFactory } from '../../../xpack_main';
|
||||
import {
|
||||
LOGGING_TAG,
|
||||
KIBANA_MONITORING_LOGGING_TAG,
|
||||
KIBANA_STATS_TYPE_MONITORING,
|
||||
KIBANA_SETTINGS_TYPE,
|
||||
KIBANA_USAGE_TYPE,
|
||||
} from '../../common/constants';
|
||||
import { KIBANA_REPORTING_TYPE } from '../../../reporting/common/constants';
|
||||
import {
|
||||
sendBulkPayload,
|
||||
monitoringBulk,
|
||||
getKibanaInfoForStats,
|
||||
} from './lib';
|
||||
|
||||
const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
|
||||
|
@ -38,7 +35,7 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
|
|||
* @param {Object} xpackInfo server.plugins.xpack_main.info object
|
||||
*/
|
||||
export class BulkUploader {
|
||||
constructor(server, { interval }) {
|
||||
constructor(server, { kbnServer, interval }) {
|
||||
if (typeof interval !== 'number') {
|
||||
throw new Error('interval number of milliseconds is required');
|
||||
}
|
||||
|
@ -56,7 +53,7 @@ export class BulkUploader {
|
|||
});
|
||||
|
||||
this._callClusterWithInternalUser = callClusterFactory(server).getCallClusterInternal();
|
||||
|
||||
this._getKibanaInfoForStats = () => getKibanaInfoForStats(server, kbnServer);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -66,9 +63,12 @@ export class BulkUploader {
|
|||
*/
|
||||
start(collectorSet) {
|
||||
this._log.info('Starting monitoring stats collection');
|
||||
this._fetchAndUpload(collectorSet); // initial fetch
|
||||
|
||||
// this is internal bulk upload, so filter out API-only collectors
|
||||
const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true);
|
||||
this._fetchAndUpload(filterThem(collectorSet)); // initial fetch
|
||||
this._timer = setInterval(() => {
|
||||
this._fetchAndUpload(collectorSet);
|
||||
this._fetchAndUpload(filterThem(collectorSet));
|
||||
}, this._interval);
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ export class BulkUploader {
|
|||
*/
|
||||
async _fetchAndUpload(collectorSet) {
|
||||
const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser);
|
||||
const payload = BulkUploader.toBulkUploadFormat(data);
|
||||
const payload = this.toBulkUploadFormat(compact(data), collectorSet);
|
||||
|
||||
if (payload) {
|
||||
try {
|
||||
|
@ -120,15 +120,69 @@ export class BulkUploader {
|
|||
/*
|
||||
* Bulk stats are transformed into a bulk upload format
|
||||
* Non-legacy transformation is done in CollectorSet.toApiStats
|
||||
*
|
||||
* Example:
|
||||
* Before:
|
||||
* [
|
||||
* {
|
||||
* "type": "kibana_stats",
|
||||
* "result": {
|
||||
* "process": { ... },
|
||||
* "requests": { ... },
|
||||
* ...
|
||||
* }
|
||||
* },
|
||||
* ]
|
||||
*
|
||||
* After:
|
||||
* [
|
||||
* {
|
||||
* "index": {
|
||||
* "_type": "kibana_stats"
|
||||
* }
|
||||
* },
|
||||
* {
|
||||
* "kibana": {
|
||||
* "host": "localhost",
|
||||
* "uuid": "d619c5d1-4315-4f35-b69d-a3ac805489fb",
|
||||
* "version": "7.0.0-alpha1",
|
||||
* ...
|
||||
* },
|
||||
* "process": { ... },
|
||||
* "requests": { ... },
|
||||
* ...
|
||||
* }
|
||||
* ]
|
||||
*/
|
||||
static toBulkUploadFormat(uploadData) {
|
||||
const payload = uploadData
|
||||
.filter(d => Boolean(d) && !isEmpty(d.result))
|
||||
.map(({ result, type }) => [{ index: { _type: type } }, result]);
|
||||
if (payload.length > 0) {
|
||||
const combinedData = BulkUploader.combineStatsLegacy(payload); // arrange the usage data into the stats
|
||||
return flatten(combinedData);
|
||||
toBulkUploadFormat(rawData, collectorSet) {
|
||||
if (rawData.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// convert the raw data to a nested object by taking each payload through
|
||||
// its formatter, organizing it per-type
|
||||
const typesNested = rawData.reduce((accum, { type, result }) => {
|
||||
if (isEmpty(result)) {
|
||||
return accum;
|
||||
}
|
||||
const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result);
|
||||
return defaultsDeep(accum, { [uploadType]: uploadData });
|
||||
}, {});
|
||||
|
||||
// convert the nested object into a flat array, with each payload prefixed
|
||||
// with an 'index' instruction, for bulk upload
|
||||
const flat = Object.keys(typesNested).reduce((accum, type) => {
|
||||
return [
|
||||
...accum,
|
||||
{ index: { _type: type } },
|
||||
{
|
||||
kibana: this._getKibanaInfoForStats(),
|
||||
...typesNested[type],
|
||||
}
|
||||
];
|
||||
}, []);
|
||||
|
||||
return flat;
|
||||
}
|
||||
|
||||
static checkPayloadTypesUnique(payload) {
|
||||
|
@ -138,45 +192,4 @@ export class BulkUploader {
|
|||
throw new Error('Duplicate collector type identifiers found in payload! ' + ids.join(','));
|
||||
}
|
||||
}
|
||||
|
||||
static combineStatsLegacy(payload) {
|
||||
BulkUploader.checkPayloadTypesUnique(payload);
|
||||
|
||||
// default the item to [] to allow destructuring
|
||||
const findItem = type => payload.find(item => get(item, '[0].index._type') === type) || [];
|
||||
|
||||
// kibana usage and stats
|
||||
let statsResult;
|
||||
const [ statsHeader, statsPayload ] = findItem(KIBANA_STATS_TYPE_MONITORING);
|
||||
const [ reportingHeader, reportingPayload ] = findItem(KIBANA_REPORTING_TYPE);
|
||||
|
||||
if (statsHeader && statsPayload) {
|
||||
statsHeader.index._type = 'kibana_stats'; // HACK to convert kibana_stats_monitoring to just kibana_stats for bwc
|
||||
const [ usageHeader, usagePayload ] = findItem(KIBANA_USAGE_TYPE);
|
||||
const kibanaUsage = (usageHeader && usagePayload) ? usagePayload : null;
|
||||
const reportingUsage = (reportingHeader && reportingPayload) ? reportingPayload : null;
|
||||
statsResult = [ statsHeader, statsPayload ];
|
||||
if (kibanaUsage) {
|
||||
set(statsResult, '[1].usage', kibanaUsage);
|
||||
}
|
||||
if (reportingUsage) {
|
||||
set(statsResult, '[1].usage.xpack.reporting', reportingUsage);
|
||||
}
|
||||
}
|
||||
|
||||
// kibana settings
|
||||
let settingsResult;
|
||||
const [ settingsHeader, settingsPayload ] = findItem(KIBANA_SETTINGS_TYPE);
|
||||
if (settingsHeader && settingsPayload) {
|
||||
settingsResult = [ settingsHeader, settingsPayload ];
|
||||
}
|
||||
|
||||
// return new payload with the combined data
|
||||
// adds usage data to stats data
|
||||
// strips usage out as a top-level type
|
||||
const result = [ statsResult, settingsResult ];
|
||||
|
||||
// remove result items that are undefined
|
||||
return result.filter(Boolean);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
*/
|
||||
|
||||
import { get, snakeCase } from 'lodash';
|
||||
import { KIBANA_USAGE_TYPE } from '../../../common/constants';
|
||||
import { KIBANA_USAGE_TYPE, KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants';
|
||||
|
||||
const TYPES = [
|
||||
'dashboard',
|
||||
|
@ -17,12 +17,13 @@ const TYPES = [
|
|||
];
|
||||
|
||||
/**
|
||||
* Fetches saved object client counts by querying the saved object index
|
||||
* Fetches saved object counts by querying the .kibana index
|
||||
*/
|
||||
export function getKibanaUsageCollector(server) {
|
||||
const { collectorSet } = server.usage;
|
||||
return collectorSet.makeUsageCollector({
|
||||
type: KIBANA_USAGE_TYPE,
|
||||
|
||||
async fetch(callCluster) {
|
||||
const index = server.config().get('kibana.index');
|
||||
const savedObjectCountSearchParams = {
|
||||
|
@ -60,6 +61,20 @@ export function getKibanaUsageCollector(server) {
|
|||
}
|
||||
}), {})
|
||||
};
|
||||
},
|
||||
|
||||
/*
|
||||
* Format the response data into a model for internal upload
|
||||
* 1. Make this data part of the "kibana_stats" type
|
||||
* 2. Organize the payload in the usage namespace of the data payload (usage.index, etc)
|
||||
*/
|
||||
formatForBulkUpload: result => {
|
||||
return {
|
||||
type: KIBANA_STATS_TYPE_MONITORING,
|
||||
payload: {
|
||||
usage: result
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -6,12 +6,11 @@
|
|||
|
||||
import { KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants';
|
||||
import { opsBuffer } from './ops_buffer';
|
||||
import { getKibanaInfoForStats } from '../lib';
|
||||
|
||||
/*
|
||||
* Initialize a collector for Kibana Ops Stats
|
||||
*/
|
||||
export function getOpsStatsCollector(server, kbnServer) {
|
||||
export function getOpsStatsCollector(server) {
|
||||
let monitor;
|
||||
const buffer = opsBuffer(server);
|
||||
const onOps = event => buffer.push(event);
|
||||
|
@ -48,10 +47,7 @@ export function getOpsStatsCollector(server, kbnServer) {
|
|||
type: KIBANA_STATS_TYPE_MONITORING,
|
||||
init: start,
|
||||
fetch: () => {
|
||||
return {
|
||||
kibana: getKibanaInfoForStats(server, kbnServer),
|
||||
...buffer.flush()
|
||||
};
|
||||
return buffer.flush();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
import { get } from 'lodash';
|
||||
import { XPACK_DEFAULT_ADMIN_EMAIL_UI_SETTING } from '../../../../../server/lib/constants';
|
||||
import { KIBANA_SETTINGS_TYPE } from '../../../common/constants';
|
||||
import { getKibanaInfoForStats } from '../lib';
|
||||
|
||||
/*
|
||||
* Check if Cluster Alert email notifications is enabled in config
|
||||
|
@ -54,14 +53,14 @@ export async function checkForEmailValue(
|
|||
}
|
||||
}
|
||||
|
||||
export function getSettingsCollector(server, kbnServer) {
|
||||
export function getSettingsCollector(server) {
|
||||
const config = server.config();
|
||||
|
||||
const { collectorSet } = server.usage;
|
||||
|
||||
return collectorSet.makeStatsCollector({
|
||||
type: KIBANA_SETTINGS_TYPE,
|
||||
async fetch(callCluster) {
|
||||
let kibanaSettingsData = null;
|
||||
let kibanaSettingsData;
|
||||
const defaultAdminEmail = await checkForEmailValue(config, callCluster);
|
||||
|
||||
// skip everything if defaultAdminEmail === undefined
|
||||
|
@ -79,16 +78,8 @@ export function getSettingsCollector(server, kbnServer) {
|
|||
// remember the current email so that we can mark it as successful if the bulk does not error out
|
||||
shouldUseNull = !!defaultAdminEmail;
|
||||
|
||||
// return nothing when there was no result
|
||||
let settingsDoc;
|
||||
if (kibanaSettingsData !== null) {
|
||||
settingsDoc = {
|
||||
kibana: getKibanaInfoForStats(server, kbnServer),
|
||||
...kibanaSettingsData
|
||||
};
|
||||
}
|
||||
|
||||
return settingsDoc;
|
||||
// returns undefined if there was no result
|
||||
return kibanaSettingsData;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -27,8 +27,14 @@ export function opsBuffer(server) {
|
|||
},
|
||||
|
||||
flush() {
|
||||
let cloud; // a property that will be left out of the result if the details are undefined
|
||||
const cloudDetails = cloudDetector.getCloudDetails();
|
||||
if (cloudDetails != null) {
|
||||
cloud = { cloud: cloudDetails };
|
||||
}
|
||||
|
||||
return {
|
||||
cloud: cloudDetector.getCloudDetails(),
|
||||
...cloud,
|
||||
...eventRoller.flush()
|
||||
};
|
||||
}
|
||||
|
|
|
@ -15,11 +15,11 @@ import { BulkUploader } from './bulk_uploader';
|
|||
* @param {Object} kbnServer manager of Kibana services - see `src/server/kbn_server` in Kibana core
|
||||
* @param {Object} server HapiJS server instance
|
||||
*/
|
||||
export function initBulkUploader(_kbnServer, server) {
|
||||
|
||||
export function initBulkUploader(kbnServer, server) {
|
||||
const config = server.config();
|
||||
const interval = config.get('xpack.monitoring.kibana.collection.interval');
|
||||
return new BulkUploader(server, {
|
||||
kbnServer,
|
||||
interval
|
||||
});
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import { uniq } from 'lodash';
|
|||
import { getExportTypesHandler } from './get_export_type_handler';
|
||||
import { getReportCountsByParameter } from './get_reporting_type_counts';
|
||||
import { KIBANA_REPORTING_TYPE } from '../../common/constants';
|
||||
import { KIBANA_STATS_TYPE_MONITORING } from '../../../monitoring/common/constants';
|
||||
|
||||
/**
|
||||
* @typedef {Object} ReportingUsageStats Almost all of these stats are optional.
|
||||
|
@ -117,6 +118,7 @@ export function getReportingUsageCollector(server) {
|
|||
const { collectorSet } = server.usage;
|
||||
return collectorSet.makeUsageCollector({
|
||||
type: KIBANA_REPORTING_TYPE,
|
||||
|
||||
fetch: async callCluster => {
|
||||
const xpackInfo = server.plugins.xpack_main.info;
|
||||
const config = server.config();
|
||||
|
@ -147,6 +149,24 @@ export function getReportingUsageCollector(server) {
|
|||
...statsOverLast7Days
|
||||
}
|
||||
};
|
||||
},
|
||||
|
||||
/*
|
||||
* Format the response data into a model for internal upload
|
||||
* 1. Make this data part of the "kibana_stats" type
|
||||
* 2. Organize the payload in the usage.xpack.reporting namespace of the data payload
|
||||
*/
|
||||
formatForBulkUpload: result => {
|
||||
return {
|
||||
type: KIBANA_STATS_TYPE_MONITORING,
|
||||
payload: {
|
||||
usage: {
|
||||
xpack: {
|
||||
reporting: result
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue