[8.0] Stack monitoring - logstash 8.0 metricbeat standalone compatibility (#122177) (#122345)

* Stack monitoring - logstash 8.0 metricbeat standalone compatibility  (#122177)

* use node_stats metricset filter

* fix pipeline dropdown redirection

* add get_pipeline metricbeat aggregation

* add get_pipeline_vertex metricbeat aggregation

* update logstash metricbeat archived data

* merge_pipeline_versions

* type parameter

* use datastream for mb functional tests

* fix metricbeat apis tests

* lint

* lint

* lint

* fix multicluster api test

* lint

* fix singlecluster_green_gold_mb tests

* lint

* add test case

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
# Conflicts:
#	x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.ts

* remove bluebird import
This commit is contained in:
Kevin Lacabane 2022-01-05 18:07:22 +01:00 committed by GitHub
parent e50b966277
commit e65414950d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
38 changed files with 573 additions and 589124 deletions

View file

@ -586,6 +586,17 @@ export interface ElasticsearchMetricbeatSource {
};
logstash?: {
node?: {
state?: {
pipeline?: {
id: string;
name: string;
representation?: {
graph?: {
vertices: ElasticsearchSourceLogstashPipelineVertex[];
};
};
};
};
stats?: {
timestamp?: string;
logstash?: {

View file

@ -142,11 +142,12 @@ export const LogStashPipelinePage: React.FC<ComponentProps> = ({ clusters }) =>
}
}, [detailVertexId, getPageData]);
const onChangePipelineHash = useCallback(() => {
window.location.hash = getSafeForExternalLink(
`#/logstash/pipelines/${pipelineId}/${pipelineHash}`
);
}, [pipelineId, pipelineHash]);
const onChangePipelineHash = useCallback(
(hash) => {
window.location.hash = getSafeForExternalLink(`#/logstash/pipelines/${pipelineId}/${hash}`);
},
[pipelineId]
);
useEffect(() => {
if (cluster) {

View file

@ -12,7 +12,7 @@ import React from 'react';
interface Props {
pipelineVersions: any[];
pipelineHash?: string;
onChangePipelineHash: () => void;
onChangePipelineHash: (hash: string) => void;
}
export const PipelineVersions = (props: Props) => {
@ -23,23 +23,25 @@ export const PipelineVersions = (props: Props) => {
<EuiFlexItem grow={false}>
<EuiSelect
value={pipelineHash}
options={pipelineVersions.map((option) => {
return {
text: i18n.translate(
'xpack.monitoring.logstashNavigation.pipelineVersionDescription',
{
defaultMessage:
'Version active {relativeLastSeen} and first seen {relativeFirstSeen}',
values: {
relativeLastSeen: option.relativeLastSeen,
relativeFirstSeen: option.relativeFirstSeen,
},
}
),
value: option.hash,
};
})}
onChange={onChangePipelineHash}
options={pipelineVersions.map(
(option: { hash: string; relativeLastSeen: number; relativeFirstSeen: number }) => {
return {
text: i18n.translate(
'xpack.monitoring.logstashNavigation.pipelineVersionDescription',
{
defaultMessage:
'Version active {relativeLastSeen} and first seen {relativeFirstSeen}',
values: {
relativeLastSeen: option.relativeLastSeen,
relativeFirstSeen: option.relativeFirstSeen,
},
}
),
value: option.hash,
};
}
)}
onChange={({ target }) => onChangePipelineHash(target.value)}
/>
</EuiFlexItem>
</EuiFlexGroup>

View file

@ -5,7 +5,6 @@
* 2.0.
*/
import Bluebird from 'bluebird';
import { get } from 'lodash';
import { LegacyRequest, Cluster, Bucket } from '../../types';
import { LOGSTASH } from '../../../common/constants';
@ -48,208 +47,210 @@ export function getLogstashForClusters(
const end = req.payload.timeRange.max;
const config = req.server.config();
return Bluebird.map(clusters, (cluster) => {
const clusterUuid = get(cluster, 'elasticsearch.cluster.id', cluster.cluster_uuid);
const params = {
index: lsIndexPattern,
size: 0,
ignore_unavailable: true,
body: {
query: createQuery({
types: ['stats', 'logstash_stats'],
start,
end,
clusterUuid,
metric: LogstashClusterMetric.getMetricFields(),
}),
aggs: {
logstash_uuids: {
terms: {
field: 'logstash_stats.logstash.uuid',
size: config.get('monitoring.ui.max_bucket_size'),
},
aggs: {
latest_report: {
terms: {
field: 'logstash_stats.timestamp',
size: 1,
order: {
_key: 'desc',
},
},
aggs: {
memory_used: {
max: {
field: 'logstash_stats.jvm.mem.heap_used_in_bytes',
return Promise.all(
clusters.map((cluster) => {
const clusterUuid = get(cluster, 'elasticsearch.cluster.id', cluster.cluster_uuid);
const params = {
index: lsIndexPattern,
size: 0,
ignore_unavailable: true,
body: {
query: createQuery({
types: ['node_stats', 'logstash_stats'],
start,
end,
clusterUuid,
metric: LogstashClusterMetric.getMetricFields(),
}),
aggs: {
logstash_uuids: {
terms: {
field: 'logstash_stats.logstash.uuid',
size: config.get('monitoring.ui.max_bucket_size'),
},
aggs: {
latest_report: {
terms: {
field: 'logstash_stats.timestamp',
size: 1,
order: {
_key: 'desc',
},
},
memory: {
max: {
field: 'logstash_stats.jvm.mem.heap_max_in_bytes',
aggs: {
memory_used: {
max: {
field: 'logstash_stats.jvm.mem.heap_used_in_bytes',
},
},
},
events_in_total: {
max: {
field: 'logstash_stats.events.in',
memory: {
max: {
field: 'logstash_stats.jvm.mem.heap_max_in_bytes',
},
},
},
events_out_total: {
max: {
field: 'logstash_stats.events.out',
events_in_total: {
max: {
field: 'logstash_stats.events.in',
},
},
events_out_total: {
max: {
field: 'logstash_stats.events.out',
},
},
},
},
},
memory_used_per_node: {
max_bucket: {
buckets_path: 'latest_report>memory_used',
memory_used_per_node: {
max_bucket: {
buckets_path: 'latest_report>memory_used',
},
},
},
memory_per_node: {
max_bucket: {
buckets_path: 'latest_report>memory',
memory_per_node: {
max_bucket: {
buckets_path: 'latest_report>memory',
},
},
},
events_in_total_per_node: {
max_bucket: {
buckets_path: 'latest_report>events_in_total',
events_in_total_per_node: {
max_bucket: {
buckets_path: 'latest_report>events_in_total',
},
},
},
events_out_total_per_node: {
max_bucket: {
buckets_path: 'latest_report>events_out_total',
events_out_total_per_node: {
max_bucket: {
buckets_path: 'latest_report>events_out_total',
},
},
},
},
},
logstash_versions: {
terms: {
field: 'logstash_stats.logstash.version',
size: config.get('monitoring.ui.max_bucket_size'),
},
},
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines',
},
aggs: {
pipelines: {
sum_bucket: {
buckets_path: 'queue_types>num_pipelines',
},
logstash_versions: {
terms: {
field: 'logstash_stats.logstash.version',
size: config.get('monitoring.ui.max_bucket_size'),
},
queue_types: {
terms: {
field: 'logstash_stats.pipelines.queue.type',
size: config.get('monitoring.ui.max_bucket_size'),
},
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines',
},
aggs: {
pipelines: {
sum_bucket: {
buckets_path: 'queue_types>num_pipelines',
},
},
aggs: {
num_pipelines: {
cardinality: {
field: 'logstash_stats.pipelines.id',
queue_types: {
terms: {
field: 'logstash_stats.pipelines.queue.type',
size: config.get('monitoring.ui.max_bucket_size'),
},
aggs: {
num_pipelines: {
cardinality: {
field: 'logstash_stats.pipelines.id',
},
},
},
},
},
},
},
pipelines_nested_mb: {
nested: {
path: 'logstash.node.stats.pipelines',
},
aggs: {
pipelines: {
sum_bucket: {
buckets_path: 'queue_types>num_pipelines',
},
pipelines_nested_mb: {
nested: {
path: 'logstash.node.stats.pipelines',
},
queue_types: {
terms: {
field: 'logstash.node.stats.pipelines.queue.type',
size: config.get('monitoring.ui.max_bucket_size'),
aggs: {
pipelines: {
sum_bucket: {
buckets_path: 'queue_types>num_pipelines',
},
},
aggs: {
num_pipelines: {
cardinality: {
field: 'logstash.node.stats.pipelines.id',
queue_types: {
terms: {
field: 'logstash.node.stats.pipelines.queue.type',
size: config.get('monitoring.ui.max_bucket_size'),
},
aggs: {
num_pipelines: {
cardinality: {
field: 'logstash.node.stats.pipelines.id',
},
},
},
},
},
},
},
events_in_total: {
sum_bucket: {
buckets_path: 'logstash_uuids>events_in_total_per_node',
events_in_total: {
sum_bucket: {
buckets_path: 'logstash_uuids>events_in_total_per_node',
},
},
events_out_total: {
sum_bucket: {
buckets_path: 'logstash_uuids>events_out_total_per_node',
},
},
memory_used: {
sum_bucket: {
buckets_path: 'logstash_uuids>memory_used_per_node',
},
},
memory: {
sum_bucket: {
buckets_path: 'logstash_uuids>memory_per_node',
},
},
max_uptime: {
max: {
field: 'logstash_stats.jvm.uptime_in_millis',
},
},
},
events_out_total: {
sum_bucket: {
buckets_path: 'logstash_uuids>events_out_total_per_node',
},
},
memory_used: {
sum_bucket: {
buckets_path: 'logstash_uuids>memory_used_per_node',
},
},
memory: {
sum_bucket: {
buckets_path: 'logstash_uuids>memory_per_node',
},
},
max_uptime: {
max: {
field: 'logstash_stats.jvm.uptime_in_millis',
},
},
},
},
};
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return callWithRequest(req, 'search', params).then((result) => {
const aggregations = get(result, 'aggregations', {});
const logstashUuids = get(aggregations, 'logstash_uuids.buckets', []);
const logstashVersions = get(aggregations, 'logstash_versions.buckets', []);
// everything is initialized such that it won't impact any rollup
let eventsInTotal = 0;
let eventsOutTotal = 0;
let memory = 0;
let memoryUsed = 0;
let maxUptime = 0;
// if the cluster has logstash instances at all
if (logstashUuids.length) {
eventsInTotal = get(aggregations, 'events_in_total.value');
eventsOutTotal = get(aggregations, 'events_out_total.value');
memory = get(aggregations, 'memory.value');
memoryUsed = get(aggregations, 'memory_used.value');
maxUptime = get(aggregations, 'max_uptime.value');
}
let types = get(aggregations, 'pipelines_nested_mb.queue_types.buckets', []);
if (!types || types.length === 0) {
types = aggregations.pipelines_nested?.queue_types.buckets ?? [];
}
return {
clusterUuid,
stats: {
node_count: logstashUuids.length,
events_in_total: eventsInTotal,
events_out_total: eventsOutTotal,
avg_memory: memory,
avg_memory_used: memoryUsed,
max_uptime: maxUptime,
pipeline_count:
get(aggregations, 'pipelines_nested_mb.pipelines.value') ||
get(aggregations, 'pipelines_nested.pipelines.value', 0),
queue_types: getQueueTypes(types),
versions: logstashVersions.map((versionBucket: Bucket) => versionBucket.key),
},
};
});
});
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return callWithRequest(req, 'search', params).then((result) => {
const aggregations = get(result, 'aggregations', {});
const logstashUuids = get(aggregations, 'logstash_uuids.buckets', []);
const logstashVersions = get(aggregations, 'logstash_versions.buckets', []);
// everything is initialized such that it won't impact any rollup
let eventsInTotal = 0;
let eventsOutTotal = 0;
let memory = 0;
let memoryUsed = 0;
let maxUptime = 0;
// if the cluster has logstash instances at all
if (logstashUuids.length) {
eventsInTotal = get(aggregations, 'events_in_total.value');
eventsOutTotal = get(aggregations, 'events_out_total.value');
memory = get(aggregations, 'memory.value');
memoryUsed = get(aggregations, 'memory_used.value');
maxUptime = get(aggregations, 'max_uptime.value');
}
let types = get(aggregations, 'pipelines_nested_mb.queue_types.buckets', []);
if (!types || types.length === 0) {
types = aggregations.pipelines_nested?.queue_types.buckets ?? [];
}
return {
clusterUuid,
stats: {
node_count: logstashUuids.length,
events_in_total: eventsInTotal,
events_out_total: eventsOutTotal,
avg_memory: memory,
avg_memory_used: memoryUsed,
max_uptime: maxUptime,
pipeline_count:
get(aggregations, 'pipelines_nested_mb.pipelines.value') ||
get(aggregations, 'pipelines_nested.pipelines.value', 0),
queue_types: getQueueTypes(types),
versions: logstashVersions.map((versionBucket: Bucket) => versionBucket.key),
},
};
});
})
);
}

View file

@ -85,7 +85,7 @@ export async function getNodes(
end,
clusterUuid,
metric: LogstashMetric.getMetricFields(),
types: ['stats', 'logstash_stats'],
types: ['node_stats', 'logstash_stats'],
}),
collapse: {
field: 'logstash_stats.logstash.uuid',

View file

@ -154,7 +154,7 @@ async function getPaginatedThroughputData(
},
{
term: {
'metricset.name': 'stats',
'metricset.name': 'node_stats',
},
},
],
@ -208,7 +208,10 @@ async function getPaginatedNodesData(
[
{
bool: {
should: [{ term: { type: 'logstash_stats' } }, { term: { 'metricset.name': 'stats' } }],
should: [
{ term: { type: 'logstash_stats' } },
{ term: { 'metricset.name': 'node_stats' } },
],
},
},
],
@ -296,7 +299,7 @@ async function getThroughputPipelines(
},
{
term: {
'metricset.name': 'stats',
'metricset.name': 'node_stats',
},
},
],
@ -330,7 +333,10 @@ async function getNodePipelines(
[
{
bool: {
should: [{ term: { type: 'logstash_stats' } }, { term: { 'metricset.name': 'stats' } }],
should: [
{ term: { type: 'logstash_stats' } },
{ term: { 'metricset.name': 'node_stats' } },
],
},
},
],

View file

@ -76,7 +76,7 @@ export function _enrichStateWithStatsAggregation(
statsAggregation: any,
timeseriesIntervalInSeconds: number
) {
const logstashState = stateDocument.logstash_state;
const logstashState = stateDocument.logstash_state || stateDocument.logstash?.node?.state;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices ?? [];
const verticesById: any = {};
@ -90,7 +90,9 @@ export function _enrichStateWithStatsAggregation(
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;
const verticesWithStatsBuckets =
statsAggregation.aggregations?.pipelines.scoped.vertices.vertex_id.buckets ?? [];
statsAggregation.aggregations?.pipelines.scoped.vertices?.vertex_id.buckets ??
statsAggregation.aggregations?.pipelines_mb.scoped.vertices?.vertex_id.buckets ??
[];
verticesWithStatsBuckets.forEach((vertexStatsBucket: any) => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
@ -107,7 +109,7 @@ export function _enrichStateWithStatsAggregation(
}
});
return stateDocument.logstash_state?.pipeline;
return logstashState?.pipeline;
}
export async function getPipeline(
@ -121,7 +123,7 @@ export async function getPipeline(
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');
// Determine metrics' timeseries interval based on version's timespan
const minIntervalSeconds = config.get('monitoring.ui.min_interval_seconds');
const minIntervalSeconds = Math.max(Number(config.get('monitoring.ui.min_interval_seconds')), 30);
const timeseriesInterval = calculateTimeseriesInterval(
Number(version.firstSeen),
Number(version.lastSeen),

View file

@ -35,7 +35,7 @@ export async function getPipelineStateDocument({
// This is important because a user may pick a very narrow time picker window. If we were to use a start/end value
// that could result in us being unable to render the graph
// Use the logstash_stats documents to determine whether the instance is up/down
type: 'logstash_state',
types: ['logstash_state', 'node'],
metric: LogstashMetric.getMetricFields(),
clusterUuid,
filters,

View file

@ -52,16 +52,16 @@ function scalarCounterAggregation(
return aggs;
}
function nestedVertices(maxBucketSize: string) {
const fieldPath = 'logstash_stats.pipelines.vertices';
const ephemeralIdField = 'logstash_stats.pipelines.vertices.pipeline_ephemeral_id';
function nestedVertices(statsPath: string, maxBucketSize: string) {
const fieldPath = `${statsPath}.pipelines.vertices`;
const ephemeralIdField = `${statsPath}.pipelines.vertices.pipeline_ephemeral_id`;
return {
nested: { path: 'logstash_stats.pipelines.vertices' },
nested: { path: `${statsPath}.pipelines.vertices` },
aggs: {
vertex_id: {
terms: {
field: 'logstash_stats.pipelines.vertices.id',
field: `${statsPath}.pipelines.vertices.id`,
size: maxBucketSize,
},
aggs: {
@ -79,24 +79,33 @@ function nestedVertices(maxBucketSize: string) {
};
}
function createScopedAgg(pipelineId: string, pipelineHash: string, agg: { [key: string]: any }) {
return {
pipelines: {
nested: { path: 'logstash_stats.pipelines' },
function createScopedAgg(pipelineId: string, pipelineHash: string, maxBucketSize: string) {
return (statsPath: string) => {
const verticesAgg = {
vertices: nestedVertices(statsPath, maxBucketSize),
total_processor_duration_stats: {
stats: {
field: `${statsPath}.pipelines.events.duration_in_millis`,
},
},
};
return {
nested: { path: `${statsPath}.pipelines` },
aggs: {
scoped: {
filter: {
bool: {
filter: [
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
{ term: { 'logstash_stats.pipelines.hash': pipelineHash } },
{ term: { [`${statsPath}.pipelines.id`]: pipelineId } },
{ term: { [`${statsPath}.pipelines.hash`]: pipelineHash } },
],
},
},
aggs: agg,
aggs: verticesAgg,
},
},
},
};
};
}
@ -109,6 +118,7 @@ function fetchPipelineLatestStats(
callWithRequest: any,
req: LegacyRequest
) {
const pipelineAggregation = createScopedAgg(pipelineId, version.hash, maxBucketSize);
const params = {
index: logstashIndexPattern,
size: 0,
@ -119,17 +129,18 @@ function fetchPipelineLatestStats(
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.events_out_total',
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.duration_in_millis_total',
'aggregations.pipelines.scoped.total_processor_duration_stats',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.key',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.events_in_total',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.events_out_total',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.duration_in_millis_total',
'aggregations.pipelines_mb.scoped.total_processor_duration_stats',
],
body: {
query,
aggs: createScopedAgg(pipelineId, version.hash, {
vertices: nestedVertices(maxBucketSize),
total_processor_duration_stats: {
stats: {
field: 'logstash_stats.pipelines.events.duration_in_millis',
},
},
}),
aggs: {
pipelines: pipelineAggregation('logstash_stats'),
pipelines_mb: pipelineAggregation('logstash.node.stats'),
},
},
};
@ -154,16 +165,31 @@ export function getPipelineStatsAggregation({
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const filters = [
{
nested: {
path: 'logstash_stats.pipelines',
query: {
bool: {
must: [
{ term: { 'logstash_stats.pipelines.hash': version.hash } },
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
],
bool: {
should: [
{
nested: {
path: 'logstash_stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash_stats.pipelines.id': pipelineId } }],
},
},
},
},
},
{
nested: {
path: 'logstash.node.stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash.node.stats.pipelines.id': pipelineId } }],
},
},
},
},
],
},
},
];
@ -172,7 +198,7 @@ export function getPipelineStatsAggregation({
const end = version.lastSeen;
const query = createQuery({
types: ['stats', 'logstash_stats'],
types: ['node_stats', 'logstash_stats'],
start,
end,
metric: LogstashMetric.getMetricFields(),

View file

@ -5,11 +5,59 @@
* 2.0.
*/
import { get } from 'lodash';
import { get, orderBy } from 'lodash';
import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';
import { checkParam } from '../error_missing_required';
import { LegacyRequest } from '../../types';
import { LegacyRequest, PipelineVersion } from '../../types';
import { mergePipelineVersions } from './merge_pipeline_versions';
const createScopedAgg = (pipelineId: string, maxBucketSize: number) => {
return (statsPath: string) => {
const byPipelineHash = {
by_pipeline_hash: {
terms: {
field: `${statsPath}.pipelines.hash`,
size: maxBucketSize,
order: { 'path_to_root>first_seen': 'desc' },
},
aggs: {
path_to_root: {
reverse_nested: {},
aggs: {
first_seen: {
min: {
field: `${statsPath}.timestamp`,
},
},
last_seen: {
max: {
field: `${statsPath}.timestamp`,
},
},
},
},
},
},
};
return {
nested: {
path: `${statsPath}.pipelines`,
},
aggs: {
scoped: {
filter: {
bool: {
filter: [{ term: { [`${statsPath}.pipelines.id`]: pipelineId } }],
},
},
aggs: byPipelineHash,
},
},
};
};
};
function fetchPipelineVersions({
req,
@ -28,66 +76,48 @@ function fetchPipelineVersions({
const filters = [
{
nested: {
path: 'logstash_stats.pipelines',
query: {
bool: {
filter: [{ term: { 'logstash_stats.pipelines.id': pipelineId } }],
bool: {
should: [
{
nested: {
path: 'logstash_stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash_stats.pipelines.id': pipelineId } }],
},
},
},
},
},
{
nested: {
path: 'logstash.node.stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash.node.stats.pipelines.id': pipelineId } }],
},
},
},
},
],
},
},
];
const query = createQuery({
types: ['stats', 'logstash_stats'],
types: ['node_stats', 'logstash_stats'],
metric: LogstashMetric.getMetricFields(),
clusterUuid,
filters,
});
const filteredAggs = {
by_pipeline_hash: {
terms: {
field: 'logstash_stats.pipelines.hash',
size: config.get('monitoring.ui.max_bucket_size'),
order: { 'path_to_root>first_seen': 'desc' },
},
aggs: {
path_to_root: {
reverse_nested: {},
aggs: {
first_seen: {
min: {
field: 'logstash_stats.timestamp',
},
},
last_seen: {
max: {
field: 'logstash_stats.timestamp',
},
},
},
},
},
},
};
const pipelineAggregation = createScopedAgg(
pipelineId,
Number(config.get('monitoring.ui.max_bucket_size'))
);
const aggs = {
pipelines: {
nested: {
path: 'logstash_stats.pipelines',
},
aggs: {
scoped: {
filter: {
bool: {
filter: [{ term: { 'logstash_stats.pipelines.id': pipelineId } }],
},
},
aggs: filteredAggs,
},
},
},
pipelines: pipelineAggregation('logstash_stats'),
pipelines_mb: pipelineAggregation('logstash.node.stats'),
};
const params = {
@ -105,16 +135,26 @@ function fetchPipelineVersions({
}
export function _handleResponse(response: any) {
const pipelineHashes = get(
const pipelines = get(response, 'aggregations.pipelines.scoped.by_pipeline_hash.buckets', []);
const pipelinesMb = get(
response,
'aggregations.pipelines.scoped.by_pipeline_hash.buckets',
'aggregations.pipelines_mb.scoped.by_pipeline_hash.buckets',
[]
);
return pipelineHashes.map((pipelineHash: any) => ({
hash: pipelineHash.key,
firstSeen: get(pipelineHash, 'path_to_root.first_seen.value'),
lastSeen: get(pipelineHash, 'path_to_root.last_seen.value'),
}));
const versions = pipelines.concat(pipelinesMb).map(
(pipelineHash: any): PipelineVersion => ({
hash: pipelineHash.key,
firstSeen: get(pipelineHash, 'path_to_root.first_seen.value'),
lastSeen: get(pipelineHash, 'path_to_root.last_seen.value'),
})
);
// we could have continuous data about a pipeline version spread across legacy and
// metricbeat indices, make sure to join the start and end dates for these occurrences
const uniqVersions = mergePipelineVersions(versions);
return orderBy(uniqVersions, 'firstSeen', 'desc');
}
export async function getPipelineVersions(args: {

View file

@ -86,7 +86,7 @@ export function _enrichVertexStateWithStatsAggregation(
vertexId: string,
timeseriesIntervalInSeconds: number
) {
const logstashState = stateDocument.logstash_state;
const logstashState = stateDocument.logstash?.node?.state || stateDocument.logstash_state;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices;
// First, filter out the vertex we care about
@ -98,13 +98,21 @@ export function _enrichVertexStateWithStatsAggregation(
// Next, iterate over timeseries metrics and attach them to vertex
const timeSeriesBuckets = vertexStatsAggregation.aggregations?.timeseries.buckets ?? [];
timeSeriesBuckets.forEach((timeSeriesBucket: any) => {
// each bucket calculates stats for total pipeline CPU time for the associated timeseries
const totalDurationStats = timeSeriesBucket.pipelines.scoped.total_processor_duration_stats;
// each bucket calculates stats for total pipeline CPU time for the associated timeseries.
// we could have data in both legacy and metricbeat collection, we pick the bucket most filled
const bucketCount = (aggregationKey: string) =>
get(timeSeriesBucket, `${aggregationKey}.scoped.total_processor_duration_stats.count`);
const pipelineBucket =
bucketCount('pipelines_mb') > bucketCount('pipelines')
? timeSeriesBucket.pipelines_mb
: timeSeriesBucket.pipelines;
const totalDurationStats = pipelineBucket.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;
const timestamp = timeSeriesBucket.key;
const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id;
const vertexStatsBucket = pipelineBucket.scoped.vertices.vertex_id;
if (vertex) {
const vertexStats = _vertexStats(
vertex,
@ -138,7 +146,7 @@ export async function getPipelineVertex(
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');
// Determine metrics' timeseries interval based on version's timespan
const minIntervalSeconds = config.get('monitoring.ui.min_interval_seconds');
const minIntervalSeconds = Math.max(Number(config.get('monitoring.ui.min_interval_seconds')), 30);
const timeseriesInterval = calculateTimeseriesInterval(
Number(version.firstSeen),
Number(version.lastSeen),

View file

@ -56,18 +56,18 @@ function createAggsObjectFromAggsList(aggsList: any) {
return aggsList.reduce((aggsSoFar: object, agg: object) => ({ ...aggsSoFar, ...agg }), {});
}
function createNestedVertexAgg(vertexId: string, maxBucketSize: number) {
const fieldPath = 'logstash_stats.pipelines.vertices';
const ephemeralIdField = 'logstash_stats.pipelines.vertices.pipeline_ephemeral_id';
function createNestedVertexAgg(statsPath: string, vertexId: string, maxBucketSize: number) {
const fieldPath = `${statsPath}.pipelines.vertices`;
const ephemeralIdField = `${statsPath}.pipelines.vertices.pipeline_ephemeral_id`;
return {
vertices: {
nested: { path: 'logstash_stats.pipelines.vertices' },
nested: { path: `${statsPath}.pipelines.vertices` },
aggs: {
vertex_id: {
filter: {
term: {
'logstash_stats.pipelines.vertices.id': vertexId,
[`${statsPath}.pipelines.vertices.id`]: vertexId,
},
},
aggs: {
@ -92,34 +92,44 @@ function createNestedVertexAgg(vertexId: string, maxBucketSize: number) {
};
}
function createTotalProcessorDurationStatsAgg() {
function createTotalProcessorDurationStatsAgg(statsPath: string) {
return {
total_processor_duration_stats: {
stats: {
field: 'logstash_stats.pipelines.events.duration_in_millis',
field: `${statsPath}.pipelines.events.duration_in_millis`,
},
},
};
}
function createScopedAgg(pipelineId: string, pipelineHash: string, ...aggsList: object[]) {
return {
pipelines: {
nested: { path: 'logstash_stats.pipelines' },
function createScopedAgg(
pipelineId: string,
pipelineHash: string,
vertexId: string,
maxBucketSize: number
) {
return (statsPath: string) => {
const aggs = {
...createNestedVertexAgg(statsPath, vertexId, maxBucketSize),
...createTotalProcessorDurationStatsAgg(statsPath),
};
return {
nested: { path: `${statsPath}.pipelines` },
aggs: {
scoped: {
filter: {
bool: {
filter: [
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
{ term: { 'logstash_stats.pipelines.hash': pipelineHash } },
{ term: { [`${statsPath}.pipelines.id`]: pipelineId } },
{ term: { [`${statsPath}.pipelines.hash`]: pipelineHash } },
],
},
},
aggs: createAggsObjectFromAggsList(aggsList),
aggs,
},
},
},
};
};
}
@ -156,16 +166,12 @@ function fetchPipelineVertexTimeSeriesStats({
callWithRequest: (req: any, endpoint: string, params: any) => Promise<any>;
req: LegacyRequest;
}) {
const pipelineAggregation = createScopedAgg(pipelineId, version.hash, vertexId, maxBucketSize);
const aggs = {
...createTimeSeriesAgg(
timeSeriesIntervalInSeconds,
createScopedAgg(
pipelineId,
version.hash,
createNestedVertexAgg(vertexId, maxBucketSize),
createTotalProcessorDurationStatsAgg()
)
),
...createTimeSeriesAgg(timeSeriesIntervalInSeconds, {
pipelines: pipelineAggregation('logstash_stats'),
pipelines_mb: pipelineAggregation('logstash.node.stats'),
}),
};
const params = {
@ -179,6 +185,11 @@ function fetchPipelineVertexTimeSeriesStats({
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.duration_in_millis_total',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.queue_push_duration_in_millis_total',
'aggregations.timeseries.buckets.pipelines.scoped.total_processor_duration_stats',
'aggregations.timeseries.buckets.pipelines_mb.scoped.vertices.vertex_id.events_in_total',
'aggregations.timeseries.buckets.pipelines_mb.scoped.vertices.vertex_id.events_out_total',
'aggregations.timeseries.buckets.pipelines_mb.scoped.vertices.vertex_id.duration_in_millis_total',
'aggregations.timeseries.buckets.pipelines_mb.scoped.vertices.vertex_id.queue_push_duration_in_millis_total',
'aggregations.timeseries.buckets.pipelines_mb.scoped.total_processor_duration_stats',
],
body: {
query,
@ -209,16 +220,31 @@ export function getPipelineVertexStatsAggregation({
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const filters = [
{
nested: {
path: 'logstash_stats.pipelines',
query: {
bool: {
must: [
{ term: { 'logstash_stats.pipelines.hash': version.hash } },
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
],
bool: {
should: [
{
nested: {
path: 'logstash_stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash_stats.pipelines.id': pipelineId } }],
},
},
},
},
},
{
nested: {
path: 'logstash.node.stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash.node.stats.pipelines.id': pipelineId } }],
},
},
},
},
],
},
},
];
@ -227,7 +253,7 @@ export function getPipelineVertexStatsAggregation({
const end = version.lastSeen;
const query = createQuery({
types: ['stats', 'logstash_stats'],
types: ['node_stats', 'logstash_stats'],
start,
end,
metric: LogstashMetric.getMetricFields(),

View file

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { mergePipelineVersions } from './merge_pipeline_versions';
describe('merge_pipeline_versions', () => {
it('merges duplicates', () => {
const versions = [
{ hash: 'foo', firstSeen: 1, lastSeen: 2 },
{ hash: 'bar', firstSeen: 5, lastSeen: 10 },
{ hash: 'foo', firstSeen: 3, lastSeen: 4 },
{ hash: 'bar', firstSeen: 1, lastSeen: 3 },
];
const result = mergePipelineVersions(versions);
expect(result.length).toEqual(2);
expect(result.find((v) => v.hash === 'foo')).toEqual({
hash: 'foo',
firstSeen: 1,
lastSeen: 4,
});
expect(result.find((v) => v.hash === 'bar')).toEqual({
hash: 'bar',
firstSeen: 1,
lastSeen: 10,
});
});
it('is a noop when no duplicates', () => {
const versions = [
{ hash: 'foo', firstSeen: 1, lastSeen: 2 },
{ hash: 'bar', firstSeen: 5, lastSeen: 10 },
];
const result = mergePipelineVersions(versions);
expect(result.length).toEqual(2);
expect(result.find((v) => v.hash === 'foo')).toEqual({
hash: 'foo',
firstSeen: 1,
lastSeen: 2,
});
expect(result.find((v) => v.hash === 'bar')).toEqual({
hash: 'bar',
firstSeen: 5,
lastSeen: 10,
});
});
});

View file

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { PipelineVersion } from '../../types';
export const mergePipelineVersions = (versions: PipelineVersion[]): PipelineVersion[] => {
const versionsByHash = versions.reduce(
(acc: { [key: string]: PipelineVersion }, pipeline: PipelineVersion) => {
const existing = acc[pipeline.hash];
if (!existing) {
return { ...acc, [pipeline.hash]: pipeline };
}
existing.firstSeen = Math.min(existing.firstSeen, pipeline.firstSeen);
existing.lastSeen = Math.max(existing.lastSeen, pipeline.lastSeen);
return acc;
},
{}
);
return Object.values(versionsByHash);
};

View file

@ -77,7 +77,7 @@ export function logstashNodeRoute(server) {
bool: {
should: [
{ term: { type: 'logstash_stats' } },
{ term: { 'metricset.name': 'stats' } },
{ term: { 'metricset.name': 'node_stats' } },
],
},
},

View file

@ -57,7 +57,7 @@ export function logstashOverviewRoute(server) {
bool: {
should: [
{ term: { type: 'logstash_stats' } },
{ term: { 'metricset.name': 'stats' } },
{ term: { 'metricset.name': 'node_stats' } },
],
},
},

View file

@ -7,10 +7,11 @@
import expect from '@kbn/expect';
import multiclusterFixture from './fixtures/multicluster';
import { getLifecycleMethods } from '../data_stream';
export default function ({ getService }) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const { setup, tearDown } = getLifecycleMethods(getService);
describe('list mb', () => {
describe('with trial license clusters', () => {
@ -22,11 +23,11 @@ export default function ({ getService }) {
const codePaths = ['all'];
before('load clusters archive', () => {
return esArchiver.load(archive);
return setup(archive);
});
after('unload clusters archive', () => {
return esArchiver.unload(archive);
return tearDown();
});
it('should load multiple clusters', async () => {

View file

@ -7,10 +7,11 @@
import expect from '@kbn/expect';
import overviewFixture from './fixtures/overview';
import { getLifecycleMethods } from '../data_stream';
export default function ({ getService }) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const { setup, tearDown } = getLifecycleMethods(getService);
describe('overview mb', function () {
// TODO: https://github.com/elastic/stack-monitoring/issues/31
@ -25,11 +26,11 @@ export default function ({ getService }) {
const codePaths = ['all'];
before('load clusters archive', () => {
return esArchiver.load(archive);
return setup(archive);
});
after('unload clusters archive', () => {
return esArchiver.unload(archive);
return tearDown();
});
it('should load multiple clusters', async () => {

View file

@ -7,10 +7,11 @@
import expect from '@kbn/expect';
import fixture from './fixtures/multicluster_pipelines';
import { getLifecycleMethods } from '../data_stream';
export default function ({ getService }) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const { setup, tearDown } = getLifecycleMethods(getService);
describe('pipelines listing multicluster mb', () => {
const archive =
@ -25,11 +26,11 @@ export default function ({ getService }) {
};
before('load archive', () => {
return esArchiver.load(archive);
return setup(archive);
});
after('unload archive', () => {
return esArchiver.unload(archive);
return tearDown();
});
it('should get the pipelines', async () => {

View file

@ -9,10 +9,11 @@ import expect from '@kbn/expect';
import { normalizeDataTypeDifferences } from '../normalize_data_type_differences';
import nodeDetailFixture from './fixtures/node_detail';
import nodeDetailAdvancedFixture from './fixtures/node_detail_advanced';
import { getLifecycleMethods } from '../data_stream';
export default function ({ getService }) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const { setup, tearDown } = getLifecycleMethods(getService);
describe('node detail mb', () => {
const archive = 'x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb';
@ -22,11 +23,11 @@ export default function ({ getService }) {
};
before('load archive', () => {
return esArchiver.load(archive);
return setup(archive);
});
after('unload archive', () => {
return esArchiver.unload(archive);
return tearDown();
});
it('should summarize the Logstash node with non-advanced chart data metrics', async () => {

View file

@ -7,10 +7,11 @@
import expect from '@kbn/expect';
import nodesFixture from './fixtures/nodes';
import { getLifecycleMethods } from '../data_stream';
export default function ({ getService }) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const { setup, tearDown } = getLifecycleMethods(getService);
describe('node listing mb', () => {
const archive = 'x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb';
@ -20,11 +21,11 @@ export default function ({ getService }) {
};
before('load archive', () => {
return esArchiver.load(archive);
return setup(archive);
});
after('unload archive', () => {
return esArchiver.unload(archive);
return tearDown();
});
it('should summarize the Logstash nodes with stats', async () => {

View file

@ -7,10 +7,11 @@
import expect from '@kbn/expect';
import overviewFixture from './fixtures/overview.json';
import { getLifecycleMethods } from '../data_stream';
export default function ({ getService }) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const { setup, tearDown } = getLifecycleMethods(getService);
describe('overview mb', () => {
const archive = 'x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb';
@ -20,11 +21,11 @@ export default function ({ getService }) {
};
before('load archive', () => {
return esArchiver.load(archive);
return setup(archive);
});
after('unload archive', () => {
return esArchiver.unload(archive);
return tearDown();
});
it('should summarize two Logstash nodes with metrics', async () => {

View file

@ -7,10 +7,11 @@
import expect from '@kbn/expect';
import pipelinesFixture from './fixtures/pipelines';
import { getLifecycleMethods } from '../data_stream';
export default function ({ getService }) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const { setup, tearDown } = getLifecycleMethods(getService);
describe('pipelines mb', () => {
const archive = 'x-pack/test/functional/es_archives/monitoring/logstash/changing_pipelines_mb';
@ -28,11 +29,11 @@ export default function ({ getService }) {
};
before('load archive', () => {
return esArchiver.load(archive);
return setup(archive);
});
after('unload archive', () => {
return esArchiver.unload(archive);
return tearDown();
});
it('should return paginated pipelines', async () => {

View file

@ -34,6 +34,7 @@ export default function ({ getService, getPageObjects }) {
await setup('x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb', {
from: 'Jan 22, 2018 @ 09:10:00.000',
to: 'Jan 22, 2018 @ 09:41:00.000',
useCreate: true,
});
await clusterOverview.closeAlertsModal();

View file

@ -21,6 +21,7 @@ export default function ({ getService, getPageObjects }) {
await setup('x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb', {
from: 'Jan 22, 2018 @ 09:10:00.000',
to: 'Jan 22, 2018 @ 09:41:00.000',
useCreate: true,
});
await clusterOverview.closeAlertsModal();

View file

@ -20,6 +20,7 @@ export default function ({ getService, getPageObjects }) {
await setup('x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb', {
from: 'Jan 22, 2018 @ 09:10:00.000',
to: 'Jan 22, 2018 @ 09:41:00.000',
useCreate: true,
});
await clusterOverview.closeAlertsModal();

View file

@ -22,6 +22,7 @@ export default function ({ getService, getPageObjects }) {
await setup('x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb', {
from: 'Jan 22, 2018 @ 09:10:00.000',
to: 'Jan 22, 2018 @ 09:41:00.000',
useCreate: true,
});
await overview.closeAlertsModal();