Resolved conflicts (#59467)

This commit is contained in:
igoristic 2020-03-05 16:39:01 -05:00 committed by GitHub
parent 47094cdd00
commit 542a0d62f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 185 additions and 239 deletions

View file

@ -143,6 +143,10 @@ export class PipelineListing extends Component {
className,
} = this.props;
const sortingOptions = sorting || { field: 'id', direction: 'asc' };
if (sortingOptions.field === 'name') {
sortingOptions.field = 'id';
}
const columns = this.getColumns();
return (
@ -155,7 +159,7 @@ export class PipelineListing extends Component {
className={className || 'logstashNodesTable'}
rows={data}
columns={columns}
sorting={sorting}
sorting={sortingOptions}
message={upgradeMessage}
pagination={pagination}
fetchMoreData={fetchMoreData}

View file

@ -11,7 +11,7 @@ import { flagSupportedClusters } from './flag_supported_clusters';
import { getMlJobsForCluster } from '../elasticsearch';
import { getKibanasForClusters } from '../kibana';
import { getLogstashForClusters } from '../logstash';
import { getPipelines } from '../logstash/get_pipelines';
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';
import { getBeatsForClusters } from '../beats';
import { alertsClustersAggregation } from '../../cluster_alerts/alerts_clusters_aggregation';
import { alertsClusterSearch } from '../../cluster_alerts/alerts_cluster_search';
@ -34,7 +34,6 @@ import { checkCcrEnabled } from '../elasticsearch/ccr';
import { getStandaloneClusterDefinition, hasStandaloneClusters } from '../standalone_clusters';
import { getLogTypes } from '../logs';
import { isInCodePath } from './is_in_code_path';
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';
/**
* Get all clusters or the cluster associated with {@code clusterUuid} when it is defined.
@ -55,7 +54,6 @@ export async function getClustersFromRequest(
} = indexPatterns;
const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
const isStandaloneCluster = clusterUuid === STANDALONE_CLUSTER_CLUSTER_UUID;
let clusters = [];
@ -163,22 +161,14 @@ export async function getClustersFromRequest(
// add logstash data
if (isInCodePath(codePaths, [CODE_PATH_LOGSTASH])) {
const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters);
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size);
const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, pipelines, [
'logstash_cluster_pipeline_nodes_count',
]);
// add the logstash data to each cluster
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, 1);
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });
// withhold LS overview stats until pipeline metrics have at least one full bucket
if (
logstash.clusterUuid === req.params.clusterUuid &&
clusterPipelineNodesCount.length === 0
) {
// withhold LS overview stats until there is at least 1 pipeline
if (logstash.clusterUuid === clusterUuid && !pipelines.length) {
logstash.stats = {};
}
set(clusters[clusterIndex], 'logstash', logstash.stats);
});
}

View file

@ -68,7 +68,7 @@ function createMetricAggs(metric) {
return metric.aggs;
}
function fetchSeries(
async function fetchSeries(
req,
indexPattern,
metric,
@ -142,7 +142,7 @@ function fetchSeries(
}
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return callWithRequest(req, 'search', params);
return await callWithRequest(req, 'search', params);
}
/**

View file

@ -1,59 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { processPipelinesAPIResponse } from '../get_pipelines';
describe('processPipelinesAPIResponse', () => {
let response;
beforeEach(() => {
response = {
pipelines: [
{
id: 1,
metrics: {
throughput_for_cluster: {
data: [
[1513721903, 17],
[1513722162, 23],
],
},
nodes_count_for_cluster: {
data: [
[1513721903, { 1: 5 }],
[1513722162, { 1: 10 }],
],
},
},
},
],
};
});
it('normalizes the metric keys', async () => {
const processedResponse = await processPipelinesAPIResponse(
response,
'throughput_for_cluster',
'nodes_count_for_cluster'
);
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
response.pipelines[0].metrics.throughput_for_cluster
);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][0]).to.eql(1513721903);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][1]).to.eql(5);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][0]).to.eql(1513722162);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][1]).to.eql(10);
});
it('computes the latest metrics', () => {
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
processedResponse => {
expect(processedResponse.pipelines[0].latestThroughput).to.eql(23);
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(10);
}
);
});
});

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { get } from 'lodash';
import { get, cloneDeep, last } from 'lodash';
import { filter } from '../pagination/filter';
import { getLogstashPipelineIds } from './get_pipeline_ids';
import { sortPipelines } from './sort_pipelines';
@ -31,11 +31,12 @@ export async function getPaginatedPipelines(
req,
lsIndexPattern,
{ clusterUuid, logstashUuid },
metricSet,
{ throughputMetric, nodesCountMetric },
pagination,
sort,
queryText
) {
const sortField = sort.field;
const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
const pipelines = await getLogstashPipelineIds(
@ -45,50 +46,12 @@ export async function getPaginatedPipelines(
size
);
// `metricSet` defines a list of metrics that are sortable in the UI
// but we don't need to fetch all the data for these metrics to perform
// the necessary sort - we only need the last bucket of data so we
// fetch the last two buckets of data (to ensure we have a single full bucekt),
// then return the value from that last bucket
const metricSeriesData = Object.values(
await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(
req,
lsIndexPattern,
metricSet,
[],
{
pipeline,
},
2
);
resolve({
id: pipeline.id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
});
});
})
)
);
for (const pipelineAggregationData of metricSeriesData) {
for (const pipeline of pipelines) {
if (pipelineAggregationData.id === pipeline.id) {
for (const metric of metricSet) {
const dataSeries = get(pipelineAggregationData, `metrics.${metric}.data`, [[]]);
pipeline[metric] = dataSeries[dataSeries.length - 1][1];
}
}
}
if (sortField === throughputMetric) {
await getPaginatedThroughputData(pipelines, req, lsIndexPattern, throughputMetric);
} else if (sortField === nodesCountMetric) {
await getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountMetric);
}
// Manually apply pagination/sorting/filtering concerns
// Filtering
const filteredPipelines = filter(pipelines, queryText, ['id']); // We only support filtering by id right now
@ -98,8 +61,150 @@ export async function getPaginatedPipelines(
// Pagination
const pageOfPipelines = paginate(pagination, sortedPipelines);
return {
pageOfPipelines,
const response = {
pipelines: await getPipelines(
req,
lsIndexPattern,
pageOfPipelines,
throughputMetric,
nodesCountMetric
),
totalPipelineCount: filteredPipelines.length,
};
return processPipelinesAPIResponse(response, throughputMetric, nodesCountMetric);
}
function processPipelinesAPIResponse(response, throughputMetricKey, nodesCountMetricKey) {
// Clone to avoid mutating original response
const processedResponse = cloneDeep(response);
// Normalize metric names for shared component code
// Calculate latest throughput and node count for each pipeline
processedResponse.pipelines.forEach(pipeline => {
pipeline.metrics = {
throughput: pipeline.metrics[throughputMetricKey],
nodesCount: pipeline.metrics[nodesCountMetricKey],
};
pipeline.latestThroughput = (last(pipeline.metrics.throughput.data) || [])[1];
pipeline.latestNodesCount = (last(pipeline.metrics.nodesCount.data) || [])[1];
});
return processedResponse;
}
async function getPaginatedThroughputData(pipelines, req, lsIndexPattern, throughputMetric) {
const metricSeriesData = Object.values(
await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(
req,
lsIndexPattern,
[throughputMetric],
[],
{
pipeline,
},
2
);
resolve(reduceData(pipeline, data));
});
})
)
);
for (const pipelineAggregationData of metricSeriesData) {
for (const pipeline of pipelines) {
if (pipelineAggregationData.id === pipeline.id) {
const dataSeries = get(pipelineAggregationData, `metrics.${throughputMetric}.data`, [[]]);
pipeline[throughputMetric] = dataSeries.pop()[1];
}
}
}
}
async function getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountMetric) {
const metricSeriesData = await getMetrics(
req,
lsIndexPattern,
[nodesCountMetric],
[],
{ pageOfPipelines: pipelines },
2
);
const { data } = metricSeriesData[nodesCountMetric][0] || [[]];
const pipelinesMap = (data.pop() || [])[1] || {};
if (!Object.keys(pipelinesMap).length) {
return;
}
pipelines.forEach(pipeline => void (pipeline[nodesCountMetric] = pipelinesMap[pipeline.id]));
}
async function getPipelines(req, lsIndexPattern, pipelines, throughputMetric, nodesCountMetric) {
const throughputPipelines = await getThroughputPipelines(
req,
lsIndexPattern,
pipelines,
throughputMetric
);
const nodePipelines = await getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric);
const finalPipelines = pipelines.map(({ id }) => {
const pipeline = {
id,
metrics: {
[throughputMetric]: throughputPipelines.find(p => p.id === id).metrics[throughputMetric],
[nodesCountMetric]: nodePipelines.find(p => p.id === id).metrics[nodesCountMetric],
},
};
return pipeline;
});
return finalPipelines;
}
async function getThroughputPipelines(req, lsIndexPattern, pipelines, throughputMetric) {
const metricsResponse = await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(req, lsIndexPattern, [throughputMetric], [], {
pipeline,
});
resolve(reduceData(pipeline, data));
});
})
);
return Object.values(metricsResponse);
}
async function getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric) {
const metricData = await getMetrics(req, lsIndexPattern, [nodesCountMetric], [], {
pageOfPipelines: pipelines,
});
const metricObject = metricData[nodesCountMetric][0];
const pipelinesData = pipelines.map(({ id }) => {
return {
id,
metrics: {
[nodesCountMetric]: {
...metricObject,
data: metricObject.data.map(([timestamp, valueMap]) => [timestamp, valueMap[id]]),
},
},
};
});
return pipelinesData;
}
function reduceData({ id }, data) {
return {
id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
};
}

View file

@ -1,64 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { cloneDeep, last } from 'lodash';
import { checkParam } from '../error_missing_required';
import { getMetrics } from '../details/get_metrics';
export async function processPipelinesAPIResponse(
response,
throughputMetricKey,
nodesCountMetricKey
) {
// Clone to avoid mutating original response
const processedResponse = cloneDeep(response);
// Normalize metric names for shared component code
// Calculate latest throughput and node count for each pipeline
processedResponse.pipelines.forEach(pipeline => {
pipeline.metrics = {
throughput: pipeline.metrics[throughputMetricKey],
nodesCount: {
...pipeline.metrics[nodesCountMetricKey],
data: pipeline.metrics[nodesCountMetricKey].data.map(item => [
item[0],
item[1][pipeline.id],
]),
},
};
pipeline.latestThroughput = last(pipeline.metrics.throughput.data)[1];
pipeline.latestNodesCount = last(pipeline.metrics.nodesCount.data)[1];
});
return processedResponse;
}
export async function getPipelines(req, logstashIndexPattern, pipelines, metricSet) {
checkParam(logstashIndexPattern, 'logstashIndexPattern in logstash/getPipelines');
checkParam(metricSet, 'metricSet in logstash/getPipelines');
const filters = [];
const metricsResponse = await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(req, logstashIndexPattern, metricSet, filters, {
pipeline,
});
resolve({
id: pipeline.id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
});
});
})
);
return Object.values(metricsResponse);
}

View file

@ -6,5 +6,5 @@
export function paginate({ size, index }, data) {
const start = index * size;
return data.slice(start, Math.min(data.length, start + size));
return data.slice(start, start + size);
}

View file

@ -6,10 +6,6 @@
import Joi from 'joi';
import { getClusterStatus } from '../../../../../lib/logstash/get_cluster_status';
import {
getPipelines,
processPipelinesAPIResponse,
} from '../../../../../lib/logstash/get_pipelines';
import { handleError } from '../../../../../lib/errors';
import { prefixIndexPattern } from '../../../../../lib/ccs_utils';
import { INDEX_PATTERN_LOGSTASH } from '../../../../../../common/constants';
@ -57,10 +53,7 @@ export function logstashClusterPipelinesRoute(server) {
const throughputMetric = 'logstash_cluster_pipeline_throughput';
const nodesCountMetric = 'logstash_cluster_pipeline_nodes_count';
const metricSet = [throughputMetric, nodesCountMetric];
// The client side fields do not match the server side metric names
// so adjust that here. See processPipelinesAPIResponse
// Mapping client and server metric keys together
const sortMetricSetMap = {
latestThroughput: throughputMetric,
latestNodesCount: nodesCountMetric,
@ -69,29 +62,20 @@ export function logstashClusterPipelinesRoute(server) {
sort.field = sortMetricSetMap[sort.field] || sort.field;
}
const { pageOfPipelines, totalPipelineCount } = await getPaginatedPipelines(
req,
lsIndexPattern,
{ clusterUuid },
metricSet,
pagination,
sort,
queryText
);
try {
const pipelineData = await getPipelines(req, lsIndexPattern, pageOfPipelines, metricSet);
const response = await processPipelinesAPIResponse(
{
pipelines: pipelineData,
clusterStatus: await getClusterStatus(req, lsIndexPattern, { clusterUuid }),
},
throughputMetric,
nodesCountMetric
const response = await getPaginatedPipelines(
req,
lsIndexPattern,
{ clusterUuid },
{ throughputMetric, nodesCountMetric },
pagination,
sort,
queryText
);
return {
...response,
totalPipelineCount,
clusterStatus: await getClusterStatus(req, lsIndexPattern, { clusterUuid }),
};
} catch (err) {
throw handleError(err, req);

View file

@ -6,10 +6,6 @@
import Joi from 'joi';
import { getNodeInfo } from '../../../../../lib/logstash/get_node_info';
import {
getPipelines,
processPipelinesAPIResponse,
} from '../../../../../lib/logstash/get_pipelines';
import { handleError } from '../../../../../lib/errors';
import { prefixIndexPattern } from '../../../../../lib/ccs_utils';
import { INDEX_PATTERN_LOGSTASH } from '../../../../../../common/constants';
@ -57,10 +53,8 @@ export function logstashNodePipelinesRoute(server) {
const throughputMetric = 'logstash_node_pipeline_throughput';
const nodesCountMetric = 'logstash_node_pipeline_nodes_count';
const metricSet = [throughputMetric, nodesCountMetric];
// The client side fields do not match the server side metric names
// so adjust that here. See processPipelinesAPIResponse
// Mapping client and server metric keys together
const sortMetricSetMap = {
latestThroughput: throughputMetric,
latestNodesCount: nodesCountMetric,
@ -69,28 +63,20 @@ export function logstashNodePipelinesRoute(server) {
sort.field = sortMetricSetMap[sort.field] || sort.field;
}
const { pageOfPipelines, totalPipelineCount } = await getPaginatedPipelines(
req,
lsIndexPattern,
{ clusterUuid, logstashUuid },
metricSet,
pagination,
sort,
queryText
);
try {
const pipelineData = await getPipelines(req, lsIndexPattern, pageOfPipelines, metricSet);
const response = await processPipelinesAPIResponse(
{
pipelines: pipelineData,
nodeSummary: await getNodeInfo(req, lsIndexPattern, { clusterUuid, logstashUuid }),
},
throughputMetric,
nodesCountMetric
const response = await getPaginatedPipelines(
req,
lsIndexPattern,
{ clusterUuid, logstashUuid },
{ throughputMetric, nodesCountMetric },
pagination,
sort,
queryText
);
return {
...response,
totalPipelineCount,
nodeSummary: await getNodeInfo(req, lsIndexPattern, { clusterUuid, logstashUuid }),
};
} catch (err) {
throw handleError(err, req);