[Stack Monitoring] Add metricbeat errors to Health API response (#137288)

* Add metricbeat erros to Health API response

* Fix unit test

* Add integration test scenario

* Small fix

* Small fixes and improving integration test data

* Small refactor of fetchMetricbeatErrors

* Add logging

* Unload metricbeat archive after test finishes up

* Fix data_stream setup function

* Remove manual metricbeat data stream deletion in test teardown in favor of archiver unload

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Carlos Crespo 2022-08-03 14:29:35 +02:00 committed by GitHub
parent adddf89b38
commit 62ce378b25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 35545 additions and 71 deletions

View file

@ -5,14 +5,16 @@
* 2.0.
*/
import { LegacyRequest, MonitoringCore } from '../../../../types';
import { MonitoringConfig } from '../../../../config';
import type { LegacyRequest, MonitoringCore } from '../../../../types';
import type { MonitoringConfig } from '../../../../config';
import { createValidationFunction } from '../../../../lib/create_route_validation_function';
import { getHealthRequestQueryRT } from '../../../../../common/http_api/_health';
import { TimeRange } from '../../../../../common/http_api/shared';
import type { TimeRange } from '../../../../../common/http_api/shared';
import { INDEX_PATTERN, INDEX_PATTERN_ENTERPRISE_SEARCH } from '../../../../../common/constants';
import { fetchMonitoredClusters } from './monitored_clusters';
import { fetchMetricbeatErrors } from './metricbeat';
import type { FetchParameters } from './types';
const DEFAULT_QUERY_TIMERANGE = { min: 'now-15m', max: 'now' };
const DEFAULT_QUERY_TIMEOUT_SECONDS = 15;
@ -44,19 +46,38 @@ export function registerV1HealthRoute(server: MonitoringCore) {
const settings = extractSettings(server.config);
const monitoredClusters = await fetchMonitoredClusters({
const fetchArgs: FetchParameters = {
timeout,
timeRange,
monitoringIndex: withCCS(INDEX_PATTERN),
entSearchIndex: withCCS(INDEX_PATTERN_ENTERPRISE_SEARCH),
search: (params: any) => callWithRequest(req, 'search', params),
logger,
}).catch((err: Error) => {
logger.error(`_health: failed to retrieve monitored clusters:\n${err.stack}`);
return { error: err.message };
});
};
return { monitoredClusters, settings };
const monitoredClustersFn = () =>
fetchMonitoredClusters({
...fetchArgs,
monitoringIndex: withCCS(INDEX_PATTERN),
entSearchIndex: withCCS(INDEX_PATTERN_ENTERPRISE_SEARCH),
}).catch((err: Error) => {
logger.error(`_health: failed to retrieve monitored clusters:\n${err.stack}`);
return { error: err.message };
});
const metricbeatErrorsFn = () =>
fetchMetricbeatErrors({
...fetchArgs,
metricbeatIndex: server.config.ui.metricbeat.index,
}).catch((err: Error) => {
logger.error(`_health: failed to retrieve metricbeat data:\n${err.stack}`);
return { error: err.message };
});
const [monitoredClusters, metricbeatErrors] = await Promise.all([
monitoredClustersFn(),
metricbeatErrorsFn(),
]);
return { monitoredClusters, metricbeatErrors, settings };
},
});
}

View file

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { buildMetricbeatErrors } from './build_metricbeat_errors';
import assert from 'assert';
describe(__filename, () => {
describe('buildMetricbeatErrors', () => {
test('it should build an object containing dedup error messages per event.dataset', () => {
const metricbeatErrors = [
{
key: 'beat',
errors_by_dataset: {
buckets: [
{
key: 'state',
latest_docs: {
hits: {
hits: [
{
_source: {
'@timestamp': '2022-07-26T08:43:32.625Z',
error: {
message:
'error making http request: Get "http://host.docker.internal:5067/state": dial tcp 192.168.65.2:5067: connect: connection refused',
},
},
},
{
_source: {
'@timestamp': '2022-07-26T08:42:32.625Z',
error: {
message:
'error making http request: Get "http://host.docker.internal:5067/state": dial tcp 192.168.65.2:5067: connect: connection refused',
},
},
},
{
_source: {
'@timestamp': '2022-07-26T08:41:32.625Z',
error: {
message: 'Generic random error',
},
},
},
],
},
},
},
],
},
},
];
const monitoredClusters = buildMetricbeatErrors(metricbeatErrors);
assert.deepEqual(monitoredClusters, {
beat: {
state: [
{
lastSeen: '2022-07-26T08:43:32.625Z',
message:
'error making http request: Get "http://host.docker.internal:5067/state": dial tcp 192.168.65.2:5067: connect: connection refused',
},
{
lastSeen: '2022-07-26T08:41:32.625Z',
message: 'Generic random error',
},
],
},
});
});
});
});

View file

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MetricbeatMonitoredProduct } from '../types';
export type MetricbeatProducts = {
[product in MetricbeatMonitoredProduct]?: ErrorsByMetricset;
};
interface ErrorsByMetricset {
[dataset: string]: ErrorDetails[];
}
interface ErrorDetails {
message: string;
lastSeen: string;
}
/**
* builds a normalized representation of the metricbeat errors from the provided
* query buckets with a product->metricset hierarchy where
* product: the monitored products (eg elasticsearch)
* metricset: the collected metricsets for a given entity
*
* example:
* {
* "product": {
* "logstash": {
* "node": {
* "message": "some error message",
* "lastSeen": "2022-05-17T16:56:52.929Z"
* }
* }
* }
* }
*/
export const buildMetricbeatErrors = (modulesBucket: any[]): MetricbeatProducts => {
return (modulesBucket ?? []).reduce((module, { key, errors_by_dataset: errorsByDataset }) => {
const datasets = buildMetricsets(errorsByDataset.buckets);
if (Object.keys(datasets).length === 0) {
return { ...module };
}
return {
...module,
[key]: datasets,
};
}, {} as MetricbeatProducts);
};
const buildMetricsets = (errorsByDataset: any[]): ErrorsByMetricset => {
return (errorsByDataset ?? []).reduce((dataset, { key, latest_docs: latestDocs }) => {
const errors = buildErrorMessages(latestDocs.hits.hits ?? []);
if (errors.length === 0) {
return { ...dataset };
}
return {
...dataset,
[key]: errors,
};
}, {} as ErrorsByMetricset);
};
const getErrorMessage = (doc: any) => {
return doc._source?.error?.message;
};
const buildErrorMessages = (errorDocs: any[]): ErrorDetails[] => {
const seenErrorMessages = new Set<string>();
return errorDocs
.filter((doc) => {
const message = getErrorMessage(doc);
if (seenErrorMessages.has(message)) {
return false;
} else {
seenErrorMessages.add(message);
return true;
}
})
.map((uniqueDoc: any) => {
const source = uniqueDoc._source;
return {
message: getErrorMessage(uniqueDoc),
lastSeen: source['@timestamp'],
};
});
};

View file

@ -0,0 +1,177 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import assert from 'assert';
import sinon from 'sinon';
import type { Logger } from '@kbn/core/server';
import { fetchMetricbeatErrors } from './fetch_metricbeat_errors';
const getMockLogger = () =>
({
warn: sinon.spy(),
error: sinon.spy(),
} as unknown as Logger);
describe(__filename, () => {
describe('fetchMetricbeatErrors', () => {
test('it fetch and build metricbeat errors response', async () => {
const response = {
aggregations: {
errors_aggregation: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{
key: 'logstash',
doc_count: 180,
errors_by_dataset: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{
key: 'node',
doc_count: 90,
latest_docs: {
hits: {
total: {
value: 90,
relation: 'eq',
},
max_score: null,
hits: [
{
_index: '.ds-metricbeat-8.4.0-2022.07.27-000001',
_id: 'djXRP4IBU_ii2T3qLwey',
_score: null,
_source: {
'@timestamp': '2022-07-27T13:20:49.070Z',
metricset: {
period: 10000,
name: 'node',
},
error: {
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
},
sort: [1658928049070],
},
{
_index: '.ds-metricbeat-8.4.0-2022.07.27-000001',
_id: 'UTXRP4IBU_ii2T3qCAef',
_score: null,
_source: {
'@timestamp': '2022-07-27T13:20:39.066Z',
metricset: {
period: 10000,
name: 'node',
},
error: {
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
},
sort: [1658928039066],
},
],
},
},
},
{
key: 'node_stats',
doc_count: 90,
latest_docs: {
hits: {
total: {
value: 90,
relation: 'eq',
},
max_score: null,
hits: [
{
_index: '.ds-metricbeat-8.4.0-2022.07.27-000001',
_id: 'eTXRP4IBU_ii2T3qLwey',
_score: null,
_source: {
'@timestamp': '2022-07-27T13:20:49.580Z',
metricset: {
period: 10000,
name: 'node_stats',
},
error: {
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
},
sort: [1658928049580],
},
{
_index: '.ds-metricbeat-8.4.0-2022.07.27-000001',
_id: 'VDXRP4IBU_ii2T3qCAef',
_score: null,
_source: {
'@timestamp': '2022-07-27T13:20:39.582Z',
metricset: {
period: 10000,
name: 'node_stats',
},
error: {
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
},
sort: [1658928039582],
},
],
},
},
},
],
},
},
],
},
},
};
const searchFn = jest.fn().mockResolvedValueOnce(response);
const monitoredClusters = await fetchMetricbeatErrors({
timeout: 10,
metricbeatIndex: 'foo',
timeRange: { min: 1652979091217, max: 11652979091217 },
search: searchFn,
logger: getMockLogger(),
});
assert.deepEqual(monitoredClusters, {
execution: {
timedOut: false,
errors: [],
},
products: {
logstash: {
node: [
{
lastSeen: '2022-07-27T13:20:49.070Z',
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
],
node_stats: [
{
lastSeen: '2022-07-27T13:20:49.580Z',
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
],
},
},
});
});
});
});

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FetchParameters, FetchExecution, MonitoredProduct } from '../types';
import type { MetricbeatProducts } from './build_metricbeat_errors';
import { metricbeatErrorsQuery } from './metricbeat_errors_query';
import { buildMetricbeatErrors } from './build_metricbeat_errors';
interface MetricbeatResponse {
products?: MetricbeatProducts;
execution: FetchExecution;
}
export const fetchMetricbeatErrors = async ({
timeout,
metricbeatIndex,
timeRange,
search,
logger,
}: FetchParameters & {
metricbeatIndex: string;
}): Promise<MetricbeatResponse> => {
const getMetricbeatErrors = async () => {
const { aggregations, timed_out: timedOut } = await search({
index: metricbeatIndex,
body: metricbeatErrorsQuery({
timeRange,
timeout,
products: [
MonitoredProduct.Beats,
MonitoredProduct.Elasticsearch,
MonitoredProduct.EnterpriseSearch,
MonitoredProduct.Kibana,
MonitoredProduct.Logstash,
],
}),
size: 0,
ignore_unavailable: true,
});
const buckets = aggregations?.errors_aggregation?.buckets ?? [];
return { products: buildMetricbeatErrors(buckets), timedOut: Boolean(timedOut) };
};
try {
const { products, timedOut } = await getMetricbeatErrors();
return {
products,
execution: {
timedOut,
errors: [],
},
};
} catch (err) {
logger.error(`fetchMetricbeatErrors: failed to fetch:\n${err.stack}`);
return {
execution: {
timedOut: false,
errors: [err.message],
},
};
}
};

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { fetchMetricbeatErrors } from './fetch_metricbeat_errors';

View file

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MetricbeatMonitoredProduct, QueryOptions } from '../types';
const MAX_BUCKET_SIZE = 50;
/**
* Returns a nested aggregation of error messages per event.datasets.
* Each module (beats, kibana...) can contain one or multiple metricsets with error messages
*/
interface MetricbeatErrorsQueryOptions extends QueryOptions {
products: MetricbeatMonitoredProduct[];
}
export const metricbeatErrorsQuery = ({
timeRange,
timeout,
products,
}: MetricbeatErrorsQueryOptions) => {
if (!timeRange) throw new Error('metricbeatErrorsQuery: missing timeRange parameter');
return {
timeout: `${timeout}s`,
query: {
bool: {
filter: {
bool: {
must: [
{
exists: {
field: 'error.message',
},
},
{
terms: {
'event.module': Object.values(products),
},
},
{
range: {
timestamp: {
gte: timeRange.min,
lte: timeRange.max,
},
},
},
],
},
},
},
},
aggs: {
errors_aggregation: errorsAggregation,
},
};
};
const errorsByMetricset = {
terms: {
field: 'metricset.name',
},
aggs: {
latest_docs: {
top_hits: {
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: MAX_BUCKET_SIZE,
_source: {
includes: ['@timestamp', 'error', 'metricset'],
},
},
},
},
};
const errorsAggregation = {
terms: {
field: 'event.module',
},
aggs: {
errors_by_dataset: errorsByMetricset,
},
};

View file

@ -7,6 +7,7 @@
import type { Logger } from '@kbn/core/server';
import { isEmpty, mapValues, merge, omitBy, reduce } from 'lodash';
import { MonitoredProduct } from '../types';
enum CollectionMode {
Internal = 'internal-monitoring',
@ -15,15 +16,6 @@ enum CollectionMode {
Unknown = 'unknown',
}
enum MonitoredProduct {
Cluster = 'cluster',
Elasticsearch = 'elasticsearch',
Kibana = 'kibana',
Beats = 'beats',
Logstash = 'logstash',
EnterpriseSearch = 'enterpriseSearch',
}
interface MonitoredMetricsets {
[metricset: string]: {
[collectionMode in CollectionMode]: {

View file

@ -5,27 +5,19 @@
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import { merge } from 'lodash';
import { ElasticsearchResponse } from '../../../../../../common/types/es';
import { TimeRange } from '../../../../../../common/http_api/shared';
import { buildMonitoredClusters, MonitoredClusters } from './build_monitored_clusters';
import {
monitoredClustersQuery,
persistentMetricsetsQuery,
enterpriseSearchQuery,
} from './monitored_clusters_query';
type SearchFn = (params: any) => Promise<ElasticsearchResponse>;
import type { FetchParameters, FetchExecution } from '../types';
interface MonitoredClustersResponse {
clusters?: MonitoredClusters;
execution: {
timedOut?: boolean;
errors?: string[];
};
execution: FetchExecution;
}
/**
@ -40,13 +32,9 @@ export const fetchMonitoredClusters = async ({
timeRange,
search,
logger,
}: {
timeout: number;
timeRange: TimeRange;
}: FetchParameters & {
monitoringIndex: string;
entSearchIndex: string;
search: SearchFn;
logger: Logger;
}): Promise<MonitoredClustersResponse> => {
const getMonitoredClusters = async (
index: string,

View file

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import type { TimeRange } from '../../../../../common/http_api/shared';
import type { ElasticsearchResponse } from '../../../../../common/types/es';
export enum MonitoredProduct {
Cluster = 'cluster',
Elasticsearch = 'elasticsearch',
Kibana = 'kibana',
Beats = 'beat',
Logstash = 'logstash',
EnterpriseSearch = 'enterpriseSearch',
}
export type MetricbeatMonitoredProduct = Exclude<MonitoredProduct, MonitoredProduct.Cluster>;
export type SearchFn = (params: any) => Promise<ElasticsearchResponse>;
export interface QueryOptions {
timeRange?: TimeRange;
timeout: number; // in seconds
}
export interface FetchParameters {
timeout: number;
timeRange: TimeRange;
search: SearchFn;
logger: Logger;
}
export interface FetchExecution {
timedOut?: boolean;
errors?: string[];
}

View file

@ -5,5 +5,12 @@
"timedOut": false,
"errors": []
}
},
"metricbeatErrors": {
"execution": {
"errors": [],
"timedOut": false
},
"products": {}
}
}
}

View file

@ -82,5 +82,45 @@ export const esBeatsResponse = (date = moment().format('YYYY.MM.DD')) => {
errors: [],
},
},
metricbeatErrors: {
execution: {
errors: [],
timedOut: false,
},
products: {
beat: {
stats: [
{
lastSeen: '2022-05-23T22:11:52.985Z',
message:
'error making http request: Get "http://host.docker.internal:5067/stats": dial tcp 192.168.65.2:5067: connect: connection refused',
},
],
state: [
{
lastSeen: '2022-05-23T22:11:51.083Z',
message:
'error making http request: Get "http://host.docker.internal:5067/state": dial tcp 192.168.65.2:5067: connect: connection refused',
},
],
},
logstash: {
node: [
{
lastSeen: '2022-05-23T22:11:54.563Z',
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
],
node_stats: [
{
lastSeen: '2022-05-23T22:11:54.331Z',
message:
'error making http request: Get "http://host.docker.internal:9600/": dial tcp 192.168.65.2:9600: connect: connection refused',
},
],
},
},
},
};
};

View file

@ -13,6 +13,8 @@ import { getLifecycleMethods } from '../data_stream';
import emptyResponse from './fixtures/response_empty.json';
import { esBeatsResponse } from './fixtures/response_es_beats';
const METRICBEAT_ARCHIVE =
'x-pack/test/api_integration/apis/monitoring/es_archives/_health/metricbeat_8';
export default function ({ getService }) {
const supertest = getService('supertest');
@ -38,15 +40,16 @@ export default function ({ getService }) {
const archives = [
'x-pack/test/api_integration/apis/monitoring/es_archives/_health/monitoring_es_8',
'x-pack/test/api_integration/apis/monitoring/es_archives/_health/monitoring_beats_8',
METRICBEAT_ARCHIVE,
];
const { setup, tearDown } = getLifecycleMethods(getService);
before('load archive', () => {
return Promise.all(archives.map(setup));
return setup(archives);
});
after('unload archive', () => {
return tearDown();
return tearDown([METRICBEAT_ARCHIVE]);
});
it('returns the state of the monitoring documents', async () => {

View file

@ -1,33 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const getLifecycleMethods = (getService) => {
const esArchiver = getService('esArchiver');
const client = getService('es');
const deleteDataStream = async (index) => {
await client.transport.request(
{
method: 'DELETE',
path: `_data_stream/${index}`,
},
{
ignore: [404],
}
);
};
return {
async setup(archive) {
await esArchiver.load(archive, { useCreate: true });
},
async tearDown() {
await deleteDataStream('.monitoring-*');
},
};
};

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FtrProviderContext } from '../../ftr_provider_context';
export const getLifecycleMethods = (getService: FtrProviderContext['getService']) => {
const esArchiver = getService('esArchiver');
const client = getService('es');
const deleteDataStream = async (index: string) => {
await client.transport.request(
{
method: 'DELETE',
path: `_data_stream/${index}`,
},
{
ignore: [404],
}
);
};
return {
async setup(archives: string[] | string) {
const archivesArray = Array.isArray(archives) ? archives : [archives];
await Promise.all(
archivesArray.map((archive) => esArchiver.load(archive, { useCreate: true }))
);
},
async tearDown(archives: string[] | string = []) {
const archivesArray = Array.isArray(archives) ? archives : [archives];
await Promise.all(archivesArray.map((archive) => esArchiver.unload(archive)));
// Monitoring mappings are already installed by elasticsearch
// the archiver doesn't have any reference to the template and can't automatically delete it.
// that's why we manually delete the data stream here to clean up the environment
await deleteDataStream('.monitoring-*');
},
};
};