mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
* [Monitoring/Beats] Telemetry Data from Beats * filter apm-server * ignore results payload if hitsLength === 0 * process each payload as stats are saved to clusters object
This commit is contained in:
parent
bc388bfc52
commit
19a8bc4c6c
5 changed files with 11376 additions and 3 deletions
|
@ -23,6 +23,12 @@ export const CONFIG_TELEMETRY_DESC = (
|
|||
*/
|
||||
export const KIBANA_SYSTEM_ID = 'kibana';
|
||||
|
||||
/**
|
||||
* The name of the Beats System ID used to publish and look up Beats stats through the Monitoring system.
|
||||
* @type {string}
|
||||
*/
|
||||
export const BEATS_SYSTEM_ID = 'beats';
|
||||
|
||||
/**
|
||||
* The name of the Kibana System ID used to look up Logstash stats through the Monitoring system.
|
||||
* @type {string}
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { fetchBeatsStats, processResults } from '../get_beats_stats';
|
||||
import sinon from 'sinon';
|
||||
import expect from 'expect.js';
|
||||
import beatsStatsResultSet from './fixtures/beats_stats_results';
|
||||
|
||||
describe('Get Beats Stats', () => {
|
||||
describe('fetchBeatsStats', () => {
|
||||
const clusterUuids = ['aCluster', 'bCluster', 'cCluster'];
|
||||
const start = 100;
|
||||
const end = 200;
|
||||
let server;
|
||||
let callCluster;
|
||||
|
||||
beforeEach(() => {
|
||||
const getStub = { get: sinon.stub() };
|
||||
getStub.get
|
||||
.withArgs('xpack.monitoring.beats.index_pattern')
|
||||
.returns('beats-indices-*');
|
||||
server = { config: () => getStub };
|
||||
callCluster = sinon.stub();
|
||||
});
|
||||
|
||||
it('should set `from: 0, to: 10000` in the query', () => {
|
||||
fetchBeatsStats(server, callCluster, clusterUuids, start, end);
|
||||
const { args } = callCluster.firstCall;
|
||||
const [api, { body }] = args;
|
||||
|
||||
expect(api).to.be('search');
|
||||
expect(body.from).to.be(0);
|
||||
expect(body.size).to.be(10000);
|
||||
});
|
||||
|
||||
it('should set `from: 10000, from: 10000` in the query', () => {
|
||||
fetchBeatsStats(server, callCluster, clusterUuids, start, end, { page: 1 });
|
||||
const { args } = callCluster.firstCall;
|
||||
const [api, { body }] = args;
|
||||
|
||||
expect(api).to.be('search');
|
||||
expect(body.from).to.be(10000);
|
||||
expect(body.size).to.be(10000);
|
||||
});
|
||||
|
||||
it('should set `from: 20000, from: 10000` in the query', () => {
|
||||
fetchBeatsStats(server, callCluster, clusterUuids, start, end, { page: 2 });
|
||||
const { args } = callCluster.firstCall;
|
||||
const [api, { body }] = args;
|
||||
|
||||
expect(api).to.be('search');
|
||||
expect(body.from).to.be(20000);
|
||||
expect(body.size).to.be(10000);
|
||||
});
|
||||
});
|
||||
|
||||
describe('processResults', () => {
|
||||
it('should summarize empty results', () => {
|
||||
const resultsEmpty = undefined;
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
|
||||
processResults(resultsEmpty, clusters, clusterHostMaps);
|
||||
|
||||
expect(clusters).to.eql({});
|
||||
});
|
||||
|
||||
it('should summarize single result with some missing fields', () => {
|
||||
const results = {
|
||||
hits: {
|
||||
hits: [
|
||||
{
|
||||
_source: {
|
||||
cluster_uuid: 'FlV4ckTxQ0a78hmBkzzc9A',
|
||||
beats_stats: {
|
||||
metrics: { libbeat: { output: { type: 'elasticsearch' } } }, // missing events published
|
||||
beat: { type: 'cowbeat' }, // missing version and output
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
processResults(results, clusters, clusterHostMaps);
|
||||
|
||||
expect(clusters).to.eql({
|
||||
FlV4ckTxQ0a78hmBkzzc9A: {
|
||||
count: 1,
|
||||
versions: {},
|
||||
types: { cowbeat: 1 },
|
||||
outputs: { elasticsearch: 1 },
|
||||
eventsPublished: 0,
|
||||
hosts: 0,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should summarize stats from hits across multiple result objects', () => {
|
||||
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
|
||||
// beatsStatsResultSet is an array of many small query results
|
||||
beatsStatsResultSet.forEach(results => {
|
||||
processResults(results, clusters, clusterHostMaps);
|
||||
});
|
||||
|
||||
expect(clusters).to.eql({
|
||||
W7hppdX7R229Oy3KQbZrTw: {
|
||||
count: 5,
|
||||
versions: { '7.0.0-alpha1': 5 },
|
||||
types: { metricbeat: 1, filebeat: 4 },
|
||||
outputs: { elasticsearch: 5 },
|
||||
eventsPublished: 80875,
|
||||
hosts: 1,
|
||||
},
|
||||
FlV4ckTxQ0a78hmBkzzc9A: {
|
||||
count: 405,
|
||||
versions: { '7.0.0-alpha1': 405 },
|
||||
types: {
|
||||
filebeat: 200,
|
||||
metricbeat: 100,
|
||||
heartbeat: 100,
|
||||
winlogbeat: 1,
|
||||
duckbeat: 1,
|
||||
'apm-server': 1,
|
||||
sheepbeat: 1,
|
||||
cowbeat: 1,
|
||||
},
|
||||
outputs: { elasticsearch: 405 },
|
||||
eventsPublished: 723985,
|
||||
hosts: 1,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -9,11 +9,13 @@ import {
|
|||
KIBANA_SYSTEM_ID,
|
||||
LOGSTASH_SYSTEM_ID,
|
||||
REPORTING_SYSTEM_ID,
|
||||
BEATS_SYSTEM_ID,
|
||||
} from '../../../../common/constants';
|
||||
import { getClusterUuids } from './get_cluster_uuids';
|
||||
import { getElasticsearchStats } from './get_es_stats';
|
||||
import { getKibanaStats } from './get_kibana_stats';
|
||||
import { getReportingStats } from './get_reporting_stats';
|
||||
import { getBeatsStats } from './get_beats_stats';
|
||||
import { getHighLevelStats } from './get_high_level_stats';
|
||||
|
||||
/**
|
||||
|
@ -68,8 +70,9 @@ function getAllStatsWithCaller(server, callCluster, start, end) {
|
|||
getKibanaStats(server, callCluster, clusterUuids, start, end), // stack_stats.kibana
|
||||
getHighLevelStats(server, callCluster, clusterUuids, start, end, LOGSTASH_SYSTEM_ID), // stack_stats.logstash
|
||||
getReportingStats(server, callCluster, clusterUuids, start, end), // stack_stats.xpack.reporting
|
||||
getBeatsStats(server, callCluster, clusterUuids, start, end), // stack_stats.beats
|
||||
])
|
||||
.then(([esClusters, kibana, logstash, reporting]) => handleAllStats(esClusters, { kibana, logstash, reporting }));
|
||||
.then(([esClusters, kibana, logstash, reporting, beats]) => handleAllStats(esClusters, { kibana, logstash, reporting, beats }));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -82,12 +85,12 @@ function getAllStatsWithCaller(server, callCluster, start, end) {
|
|||
* @param {Object} logstash The Logstash nodes keyed by Cluster UUID
|
||||
* @return {Array} The clusters joined with the Kibana and Logstash instances under each cluster's {@code stack_stats}.
|
||||
*/
|
||||
export function handleAllStats(clusters, { kibana, logstash, reporting }) {
|
||||
export function handleAllStats(clusters, { kibana, logstash, reporting, beats }) {
|
||||
return clusters.map(cluster => {
|
||||
// if they are using Kibana or Logstash, then add it to the cluster details under cluster.stack_stats
|
||||
addStackStats(cluster, kibana, KIBANA_SYSTEM_ID);
|
||||
addStackStats(cluster, logstash, LOGSTASH_SYSTEM_ID);
|
||||
|
||||
addStackStats(cluster, beats, BEATS_SYSTEM_ID);
|
||||
addXPackStats(cluster, reporting, REPORTING_SYSTEM_ID);
|
||||
mergeXPackStats(cluster, kibana, 'graph_workspace', 'graph'); // copy graph_workspace info out of kibana, merge it into stack_stats.xpack.graph
|
||||
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { get } from 'lodash';
|
||||
import { createQuery } from './create_query';
|
||||
|
||||
const HITS_SIZE = 10000; // maximum hits to receive from ES with each search
|
||||
|
||||
/*
|
||||
* Create a set of result objects where each is the result of searching hits from Elasticsearch with a size of HITS_SIZE each time.
|
||||
* @param {Object} server - The server instance
|
||||
* @param {function} callCluster - The callWithRequest or callWithInternalUser handler
|
||||
* @param {Array} clusterUuids - The string Cluster UUIDs to fetch details for
|
||||
* @param {Date} start - Start time to limit the stats
|
||||
* @param {Date} end - End time to limit the stats
|
||||
* @param {Number} page - selection of hits to fetch from ES
|
||||
* @param {Object} clusters - Beats stats in an object keyed by the cluster UUIDs
|
||||
* @param {Object} clusterHostMaps - the object keyed by cluster UUIDs to count the unique hosts
|
||||
* @return {Promise}
|
||||
*/
|
||||
export async function fetchBeatsStats(
|
||||
server, callCluster, clusterUuids, start, end,
|
||||
{ page = 0, clusters, clusterHostMaps } = {}
|
||||
) {
|
||||
const config = server.config();
|
||||
|
||||
const params = {
|
||||
index: config.get('xpack.monitoring.beats.index_pattern'),
|
||||
ignoreUnavailable: true,
|
||||
filterPath: [
|
||||
'hits.hits._source.cluster_uuid',
|
||||
'hits.hits._source.beats_stats.beat.version',
|
||||
'hits.hits._source.beats_stats.beat.type',
|
||||
'hits.hits._source.beats_stats.beat.host',
|
||||
'hits.hits._source.beats_stats.metrics.libbeat.pipeline.events.published',
|
||||
'hits.hits._source.beats_stats.metrics.libbeat.output.type',
|
||||
],
|
||||
body: {
|
||||
query: createQuery({
|
||||
start,
|
||||
end,
|
||||
type: 'beats_stats',
|
||||
filters: [
|
||||
{ terms: { cluster_uuid: clusterUuids } },
|
||||
{ bool: { must_not: { term: { 'beats_stats.beat.type': 'apm-server' } } } },
|
||||
],
|
||||
}),
|
||||
collapse: { field: 'beats_stats.beat.uuid' },
|
||||
sort: [{ 'beats_stats.timestamp': 'desc' }],
|
||||
from: page * HITS_SIZE,
|
||||
size: HITS_SIZE,
|
||||
},
|
||||
};
|
||||
|
||||
const results = await callCluster('search', params);
|
||||
const hitsLength = get(results, 'hits.hits.length', 0);
|
||||
if (hitsLength > 0) {
|
||||
// further augment the clusters object with more stats
|
||||
processResults(results, clusters, clusterHostMaps);
|
||||
|
||||
if (hitsLength === HITS_SIZE) {
|
||||
// call recursively
|
||||
const nextOptions = {
|
||||
clusters,
|
||||
clusterHostMaps,
|
||||
page: page + 1,
|
||||
};
|
||||
|
||||
// returns a promise and keeps the caller blocked from returning until the entire clusters object is built
|
||||
return fetchBeatsStats(server, callCluster, clusterUuids, start, end, nextOptions);
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
const getBaseStats = () => ({
|
||||
count: 0,
|
||||
versions: {},
|
||||
types: {},
|
||||
outputs: {},
|
||||
eventsPublished: 0,
|
||||
hosts: 0,
|
||||
});
|
||||
|
||||
/*
|
||||
* Update a clusters object with processed beat stats
|
||||
* @param {Array} results - array of Beats docs from ES
|
||||
* @param {Object} clusters - Beats stats in an object keyed by the cluster UUIDs
|
||||
* @param {Object} clusterHostMaps - the object keyed by cluster UUIDs to count the unique hosts
|
||||
*/
|
||||
export function processResults(results = [], clusters, clusterHostMaps) {
|
||||
const currHits = get(results, 'hits.hits', []);
|
||||
currHits.forEach(hit => {
|
||||
const clusterUuid = get(hit, '_source.cluster_uuid');
|
||||
if (clusters[clusterUuid] === undefined) {
|
||||
clusters[clusterUuid] = getBaseStats();
|
||||
clusterHostMaps[clusterUuid] = new Map();
|
||||
}
|
||||
|
||||
clusters[clusterUuid].count += 1;
|
||||
|
||||
const { versions, types, outputs } = clusters[clusterUuid];
|
||||
|
||||
const thisVersion = get(hit, '_source.beats_stats.beat.version');
|
||||
if (thisVersion !== undefined) {
|
||||
const thisVersionAccum = versions[thisVersion] || 0;
|
||||
versions[thisVersion] = thisVersionAccum + 1;
|
||||
}
|
||||
|
||||
const thisType = get(hit, '_source.beats_stats.beat.type');
|
||||
if (thisType !== undefined) {
|
||||
const thisTypeAccum = types[thisType] || 0;
|
||||
types[thisType] = thisTypeAccum + 1;
|
||||
}
|
||||
|
||||
const thisOutput = get(hit, '_source.beats_stats.metrics.libbeat.output.type');
|
||||
if (thisOutput !== undefined) {
|
||||
const thisOutputAccum = outputs[thisOutput] || 0;
|
||||
outputs[thisOutput] = thisOutputAccum + 1;
|
||||
}
|
||||
|
||||
const thisEvents = get(hit, '_source.beats_stats.metrics.libbeat.pipeline.events.published');
|
||||
if (thisEvents !== undefined) {
|
||||
clusters[clusterUuid].eventsPublished += thisEvents;
|
||||
}
|
||||
|
||||
const thisHost = get(hit, '_source.beats_stats.beat.host');
|
||||
if (thisHost !== undefined) {
|
||||
const hostsMap = clusterHostMaps[clusterUuid];
|
||||
hostsMap.set(thisHost, thisHost); // values don't matter, as this data structure is not part of the output
|
||||
clusters[clusterUuid].hosts = hostsMap.size;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Call the function for fetching and summarizing beats stats
|
||||
* @return {Object} - Beats stats in an object keyed by the cluster UUIDs
|
||||
*/
|
||||
export async function getBeatsStats(server, callCluster, clusterUuids, start, end) {
|
||||
const clusters = {};
|
||||
const clusterHostMaps = {};
|
||||
await fetchBeatsStats(server, callCluster, clusterUuids, start, end, { clusters, clusterHostMaps });
|
||||
return clusters;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue