mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
* fix metrics field name (stats) * add state info * add module name count * use a set for the unique hosts instead of a map * fix getting beats_state info * fix unit test * processBeatsStatsResults and processBeatsStateResults
This commit is contained in:
parent
c50c6fc6b4
commit
efdd408674
3 changed files with 617 additions and 88 deletions
File diff suppressed because it is too large
Load diff
|
@ -9,6 +9,13 @@ import sinon from 'sinon';
|
|||
import expect from 'expect.js';
|
||||
import beatsStatsResultSet from './fixtures/beats_stats_results';
|
||||
|
||||
const getBaseOptions = () => ({
|
||||
clusters: {},
|
||||
clusterHostSets: {},
|
||||
clusterInputSets: {},
|
||||
clusterModuleSets: {},
|
||||
});
|
||||
|
||||
describe('Get Beats Stats', () => {
|
||||
describe('fetchBeatsStats', () => {
|
||||
const clusterUuids = ['aCluster', 'bCluster', 'cCluster'];
|
||||
|
@ -60,12 +67,11 @@ describe('Get Beats Stats', () => {
|
|||
describe('processResults', () => {
|
||||
it('should summarize empty results', () => {
|
||||
const resultsEmpty = undefined;
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
|
||||
processResults(resultsEmpty, clusters, clusterHostMaps);
|
||||
const options = getBaseOptions();
|
||||
processResults(resultsEmpty, options);
|
||||
|
||||
expect(clusters).to.eql({});
|
||||
expect(options.clusters).to.eql({});
|
||||
});
|
||||
|
||||
it('should summarize single result with some missing fields', () => {
|
||||
|
@ -74,6 +80,7 @@ describe('Get Beats Stats', () => {
|
|||
hits: [
|
||||
{
|
||||
_source: {
|
||||
type: 'beats_stats',
|
||||
cluster_uuid: 'FlV4ckTxQ0a78hmBkzzc9A',
|
||||
beats_stats: {
|
||||
metrics: { libbeat: { output: { type: 'elasticsearch' } } }, // missing events published
|
||||
|
@ -84,11 +91,11 @@ describe('Get Beats Stats', () => {
|
|||
],
|
||||
},
|
||||
};
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
processResults(results, clusters, clusterHostMaps);
|
||||
|
||||
expect(clusters).to.eql({
|
||||
const options = getBaseOptions();
|
||||
processResults(results, options);
|
||||
|
||||
expect(options.clusters).to.eql({
|
||||
FlV4ckTxQ0a78hmBkzzc9A: {
|
||||
count: 1,
|
||||
versions: {},
|
||||
|
@ -96,21 +103,28 @@ describe('Get Beats Stats', () => {
|
|||
outputs: { elasticsearch: 1 },
|
||||
eventsPublished: 0,
|
||||
hosts: 0,
|
||||
input: {
|
||||
count: 0,
|
||||
names: [],
|
||||
},
|
||||
module: {
|
||||
count: 0,
|
||||
names: [],
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should summarize stats from hits across multiple result objects', () => {
|
||||
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
const options = getBaseOptions();
|
||||
|
||||
// beatsStatsResultSet is an array of many small query results
|
||||
beatsStatsResultSet.forEach(results => {
|
||||
processResults(results, clusters, clusterHostMaps);
|
||||
processResults(results, options);
|
||||
});
|
||||
|
||||
expect(clusters).to.eql({
|
||||
expect(options.clusters).to.eql({
|
||||
W7hppdX7R229Oy3KQbZrTw: {
|
||||
count: 5,
|
||||
versions: { '7.0.0-alpha1': 5 },
|
||||
|
@ -118,6 +132,14 @@ describe('Get Beats Stats', () => {
|
|||
outputs: { elasticsearch: 5 },
|
||||
eventsPublished: 80875,
|
||||
hosts: 1,
|
||||
module: {
|
||||
count: 1,
|
||||
names: [ 'ferrari' ],
|
||||
},
|
||||
input: {
|
||||
count: 1,
|
||||
names: [ 'firehose' ],
|
||||
},
|
||||
},
|
||||
FlV4ckTxQ0a78hmBkzzc9A: {
|
||||
count: 405,
|
||||
|
@ -135,6 +157,14 @@ describe('Get Beats Stats', () => {
|
|||
outputs: { elasticsearch: 405 },
|
||||
eventsPublished: 723985,
|
||||
hosts: 1,
|
||||
input: {
|
||||
count: 0,
|
||||
names: [],
|
||||
},
|
||||
module: {
|
||||
count: 0,
|
||||
names: [],
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
|
|
@ -9,6 +9,105 @@ import { createQuery } from './create_query';
|
|||
|
||||
const HITS_SIZE = 10000; // maximum hits to receive from ES with each search
|
||||
|
||||
const getBaseStats = () => ({
|
||||
// stats
|
||||
versions: {},
|
||||
types: {},
|
||||
outputs: {},
|
||||
count: 0,
|
||||
eventsPublished: 0,
|
||||
hosts: 0,
|
||||
// state
|
||||
input: {
|
||||
count: 0,
|
||||
names: []
|
||||
},
|
||||
module: {
|
||||
count: 0,
|
||||
names: []
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Update a clusters object with processed beat stats
|
||||
* @param {Array} results - array of Beats docs from ES
|
||||
* @param {Object} clusters - Beats stats in an object keyed by the cluster UUIDs
|
||||
* @param {Object} clusterHostSets - the object keyed by cluster UUIDs to count the unique hosts
|
||||
* @param {Object} clusterModuleSets - the object keyed by cluster UUIDs to count the unique modules
|
||||
*/
|
||||
export function processResults(results = [], { clusters, clusterHostSets, clusterInputSets, clusterModuleSets }) {
|
||||
const currHits = get(results, 'hits.hits', []);
|
||||
currHits.forEach(hit => {
|
||||
const clusterUuid = get(hit, '_source.cluster_uuid');
|
||||
if (clusters[clusterUuid] === undefined) {
|
||||
clusters[clusterUuid] = getBaseStats();
|
||||
clusterHostSets[clusterUuid] = new Set();
|
||||
clusterInputSets[clusterUuid] = new Set();
|
||||
clusterModuleSets[clusterUuid] = new Set();
|
||||
}
|
||||
|
||||
const processBeatsStatsResults = () => {
|
||||
const { versions, types, outputs } = clusters[clusterUuid];
|
||||
const thisVersion = get(hit, '_source.beats_stats.beat.version');
|
||||
if (thisVersion !== undefined) {
|
||||
const thisVersionAccum = versions[thisVersion] || 0;
|
||||
versions[thisVersion] = thisVersionAccum + 1;
|
||||
}
|
||||
|
||||
const thisType = get(hit, '_source.beats_stats.beat.type');
|
||||
if (thisType !== undefined) {
|
||||
const thisTypeAccum = types[thisType] || 0;
|
||||
types[thisType] = thisTypeAccum + 1;
|
||||
}
|
||||
|
||||
const thisOutput = get(hit, '_source.beats_stats.metrics.libbeat.output.type');
|
||||
if (thisOutput !== undefined) {
|
||||
const thisOutputAccum = outputs[thisOutput] || 0;
|
||||
outputs[thisOutput] = thisOutputAccum + 1;
|
||||
}
|
||||
|
||||
const thisEvents = get(hit, '_source.beats_stats.metrics.libbeat.pipeline.events.published');
|
||||
if (thisEvents !== undefined) {
|
||||
clusters[clusterUuid].eventsPublished += thisEvents;
|
||||
}
|
||||
|
||||
const thisHost = get(hit, '_source.beats_stats.beat.host');
|
||||
if (thisHost !== undefined) {
|
||||
const hostsMap = clusterHostSets[clusterUuid];
|
||||
hostsMap.add(thisHost);
|
||||
clusters[clusterUuid].hosts = hostsMap.size;
|
||||
}
|
||||
};
|
||||
|
||||
const processBeatsStateResults = () => {
|
||||
const stateInput = get(hit, '_source.beats_state.state.input');
|
||||
if (stateInput !== undefined) {
|
||||
const inputSet = clusterInputSets[clusterUuid];
|
||||
stateInput.names.forEach(name => inputSet.add(name));
|
||||
clusters[clusterUuid].input.names = Array.from(inputSet);
|
||||
clusters[clusterUuid].input.count += stateInput.count;
|
||||
}
|
||||
|
||||
const stateModule = get(hit, '_source.beats_state.state.module');
|
||||
if (stateModule !== undefined) {
|
||||
const moduleSet = clusterModuleSets[clusterUuid];
|
||||
stateModule.names.forEach(name => moduleSet.add(name));
|
||||
clusters[clusterUuid].module.names = Array.from(moduleSet);
|
||||
clusters[clusterUuid].module.count += stateModule.count;
|
||||
}
|
||||
};
|
||||
|
||||
if (get(hit, '_source.type') === 'beats_stats') {
|
||||
clusters[clusterUuid].count += 1;
|
||||
processBeatsStatsResults();
|
||||
} else {
|
||||
processBeatsStateResults();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a set of result objects where each is the result of searching hits from Elasticsearch with a size of HITS_SIZE each time.
|
||||
* @param {Object} server - The server instance
|
||||
|
@ -16,15 +115,12 @@ const HITS_SIZE = 10000; // maximum hits to receive from ES with each search
|
|||
* @param {Array} clusterUuids - The string Cluster UUIDs to fetch details for
|
||||
* @param {Date} start - Start time to limit the stats
|
||||
* @param {Date} end - End time to limit the stats
|
||||
* @param {Number} page - selection of hits to fetch from ES
|
||||
* @param {Object} clusters - Beats stats in an object keyed by the cluster UUIDs
|
||||
* @param {Object} clusterHostMaps - the object keyed by cluster UUIDs to count the unique hosts
|
||||
* @param {Number} options.page - selection of hits to fetch from ES
|
||||
* @param {Object} options.clusters - Beats stats in an object keyed by the cluster UUIDs
|
||||
* @param {String} type - beats_stats or beats_state
|
||||
* @return {Promise}
|
||||
*/
|
||||
export async function fetchBeatsStats(
|
||||
server, callCluster, clusterUuids, start, end,
|
||||
{ page = 0, clusters, clusterHostMaps } = {}
|
||||
) {
|
||||
async function fetchBeatsByType(server, callCluster, clusterUuids, start, end, { page = 0, ...options } = {}, type) {
|
||||
const config = server.config();
|
||||
|
||||
const params = {
|
||||
|
@ -32,25 +128,30 @@ export async function fetchBeatsStats(
|
|||
ignoreUnavailable: true,
|
||||
filterPath: [
|
||||
'hits.hits._source.cluster_uuid',
|
||||
'hits.hits._source.type',
|
||||
'hits.hits._source.beats_stats.beat.version',
|
||||
'hits.hits._source.beats_stats.beat.type',
|
||||
'hits.hits._source.beats_stats.beat.host',
|
||||
'hits.hits._source.beats_stats.metrics.libbeat.pipeline.events.published',
|
||||
'hits.hits._source.beats_stats.metrics.libbeat.output.type',
|
||||
'hits.hits._source.beats_state.state.input',
|
||||
'hits.hits._source.beats_state.state.module',
|
||||
],
|
||||
body: {
|
||||
query: createQuery({
|
||||
start,
|
||||
end,
|
||||
type: 'beats_stats',
|
||||
filters: [
|
||||
{ terms: { cluster_uuid: clusterUuids } },
|
||||
{ bool: { must_not: { term: { 'beats_stats.beat.type': 'apm-server' } } } },
|
||||
{ bool: {
|
||||
must_not: { term: { [`${type}.beat.type`]: 'apm-server' } },
|
||||
must: { term: { 'type': type } }
|
||||
} }
|
||||
],
|
||||
}),
|
||||
collapse: { field: 'beats_stats.beat.uuid' },
|
||||
sort: [{ 'beats_stats.timestamp': 'desc' }],
|
||||
from: page * HITS_SIZE,
|
||||
collapse: { field: `${type}.beat.uuid` },
|
||||
sort: [{ [`${type}.timestamp`]: 'desc' }],
|
||||
size: HITS_SIZE,
|
||||
},
|
||||
};
|
||||
|
@ -59,82 +160,29 @@ export async function fetchBeatsStats(
|
|||
const hitsLength = get(results, 'hits.hits.length', 0);
|
||||
if (hitsLength > 0) {
|
||||
// further augment the clusters object with more stats
|
||||
processResults(results, clusters, clusterHostMaps);
|
||||
processResults(results, options);
|
||||
|
||||
if (hitsLength === HITS_SIZE) {
|
||||
// call recursively
|
||||
const nextOptions = {
|
||||
clusters,
|
||||
clusterHostMaps,
|
||||
page: page + 1,
|
||||
...options,
|
||||
};
|
||||
|
||||
// returns a promise and keeps the caller blocked from returning until the entire clusters object is built
|
||||
return fetchBeatsStats(server, callCluster, clusterUuids, start, end, nextOptions);
|
||||
return fetchBeatsByType(server, callCluster, clusterUuids, start, end, nextOptions, type);
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
const getBaseStats = () => ({
|
||||
count: 0,
|
||||
versions: {},
|
||||
types: {},
|
||||
outputs: {},
|
||||
eventsPublished: 0,
|
||||
hosts: 0,
|
||||
});
|
||||
export async function fetchBeatsStats(...args) {
|
||||
return fetchBeatsByType(...args, 'beats_stats');
|
||||
}
|
||||
|
||||
/*
|
||||
* Update a clusters object with processed beat stats
|
||||
* @param {Array} results - array of Beats docs from ES
|
||||
* @param {Object} clusters - Beats stats in an object keyed by the cluster UUIDs
|
||||
* @param {Object} clusterHostMaps - the object keyed by cluster UUIDs to count the unique hosts
|
||||
*/
|
||||
export function processResults(results = [], clusters, clusterHostMaps) {
|
||||
const currHits = get(results, 'hits.hits', []);
|
||||
currHits.forEach(hit => {
|
||||
const clusterUuid = get(hit, '_source.cluster_uuid');
|
||||
if (clusters[clusterUuid] === undefined) {
|
||||
clusters[clusterUuid] = getBaseStats();
|
||||
clusterHostMaps[clusterUuid] = new Map();
|
||||
}
|
||||
|
||||
clusters[clusterUuid].count += 1;
|
||||
|
||||
const { versions, types, outputs } = clusters[clusterUuid];
|
||||
|
||||
const thisVersion = get(hit, '_source.beats_stats.beat.version');
|
||||
if (thisVersion !== undefined) {
|
||||
const thisVersionAccum = versions[thisVersion] || 0;
|
||||
versions[thisVersion] = thisVersionAccum + 1;
|
||||
}
|
||||
|
||||
const thisType = get(hit, '_source.beats_stats.beat.type');
|
||||
if (thisType !== undefined) {
|
||||
const thisTypeAccum = types[thisType] || 0;
|
||||
types[thisType] = thisTypeAccum + 1;
|
||||
}
|
||||
|
||||
const thisOutput = get(hit, '_source.beats_stats.metrics.libbeat.output.type');
|
||||
if (thisOutput !== undefined) {
|
||||
const thisOutputAccum = outputs[thisOutput] || 0;
|
||||
outputs[thisOutput] = thisOutputAccum + 1;
|
||||
}
|
||||
|
||||
const thisEvents = get(hit, '_source.beats_stats.metrics.libbeat.pipeline.events.published');
|
||||
if (thisEvents !== undefined) {
|
||||
clusters[clusterUuid].eventsPublished += thisEvents;
|
||||
}
|
||||
|
||||
const thisHost = get(hit, '_source.beats_stats.beat.host');
|
||||
if (thisHost !== undefined) {
|
||||
const hostsMap = clusterHostMaps[clusterUuid];
|
||||
hostsMap.set(thisHost, thisHost); // values don't matter, as this data structure is not part of the output
|
||||
clusters[clusterUuid].hosts = hostsMap.size;
|
||||
}
|
||||
});
|
||||
export async function fetchBeatsStates(...args) {
|
||||
return fetchBeatsByType(...args, 'beats_state');
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -142,8 +190,17 @@ export function processResults(results = [], clusters, clusterHostMaps) {
|
|||
* @return {Object} - Beats stats in an object keyed by the cluster UUIDs
|
||||
*/
|
||||
export async function getBeatsStats(server, callCluster, clusterUuids, start, end) {
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
await fetchBeatsStats(server, callCluster, clusterUuids, start, end, { clusters, clusterHostMaps });
|
||||
return clusters;
|
||||
const options = {
|
||||
clusters: {}, // the result object to be built up
|
||||
clusterHostSets: {}, // passed to processResults for tracking state in the results generation
|
||||
clusterInputSets: {}, // passed to processResults for tracking state in the results generation
|
||||
clusterModuleSets: {} // passed to processResults for tracking state in the results generation
|
||||
};
|
||||
|
||||
await Promise.all([
|
||||
fetchBeatsStats(server, callCluster, clusterUuids, start, end, options),
|
||||
fetchBeatsStates(server, callCluster, clusterUuids, start, end, options)
|
||||
]);
|
||||
|
||||
return options.clusters;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue