Improve performance of the Logstash Pipeline Viewer (#33793) (#33849)

Resolves #27513.

_This PR is a combination of #31293 (the code changes) + #33570 (test updates). These two PRs were individually reviewed and merged into a feature branch. This combo PR here simply sets up the merge from the feature branch to `master`._

Summary of changes, taken from #31293:

The Logstash Pipeline Viewer UI would make a single Kibana API call to fetch all the information necessary to render the Logstash pipeline. This included information necessary to render the detail drawer that opens up when a user clicks on an individual vertex in the pipeline.

Naturally, this single API call fetched _a lot_ of data, not just from the Kibana server but also, in turn, from Elasticsearch as well. The "pro" of this approach was that the user would see instantaneous results if they clicked on a vertex in a pipeline and opened the detail drawer for that vertex. The "cons" were the amount of computation Elasticsearch had to perform and the amount of data being transferred over the wire between Elasticsearch and the Kibana server as well as between the Kibana server and the browser.

This PR makes the Kibana API call to fetch data necessary for **initially** rendering the pipeline — that is, with the detail drawer closed — much lighter. When the user clicks on a vertex in a pipeline, a second API call is then made to fetch data necessary for the detail drawer.

Based on a simple, 1-input, 1-filter, and 1-output pipeline.

* Before this PR, the Elasticsearch `logstash_stats` API responses (multiple calls were made using the `composite` aggregation over the `date_histogram` aggregation) generated a total of 1228 aggregation buckets (before any `filter_path`s were applied but across all `composite` "pages"). With this PR, the single `logstash_stats` API response (note that this is just for the initial rendering of the pipeline, with the detail drawer closed) generated 12 buckets (also before any `filter_path`s were applied). That's a **99.02% reduction** in number of buckets.

* Before this PR, the Elasticsearch `logstash_stats` API responses added up to 70319 bytes. With this PR, the single `logstash_stats` API response for the same pipeline is 746 bytes. That's a **98.93% reduction** in size.

* Before this PR, the Elasticsearch `logstash_state` API response was 7718 bytes. With this PR, the API response for the same pipeline is 2328 bytes. That's a **69.83% reduction** in size.

* Before this PR the Kibana API response was 51777 bytes. With this PR, the API response for the same pipeline is 2567 bytes (again, note that this is just for the initial rendering of the pipeline, with the detail drawer closed). That's a **95.04% reduction** in size.
This commit is contained in:
Shaunak Kashyap 2019-03-26 08:03:34 -07:00 committed by GitHub
parent 9cb066376c
commit 30dd4e1b69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 547 additions and 534 deletions

View file

@ -25,30 +25,10 @@ describe('PluginVertex', () => {
vertexJson = {
config_name: 'some-name',
stats: {
millis_per_event: {
data: [
[ 1516667383000, 10 ],
[ 1516667386000, 50 ]
]
},
percent_of_total_processor_duration: {
data: [
[ 1516667383000, 0.25 ],
[ 1516667386000, 0.3 ]
]
},
events_in_per_millisecond: {
data: [
[ 1516667383000, 0.01 ],
[ 1516667386000, 0.02 ]
]
},
events_out_per_millisecond: {
data: [
[ 1516667383000, 0.01 ],
[ 1516667386000, 0.03 ]
]
}
millis_per_event: 50,
percent_of_total_processor_duration: 0.3,
events_in_per_millisecond: 0.01,
events_out_per_millisecond: 0.01
}
};
});
@ -80,7 +60,7 @@ describe('PluginVertex', () => {
it('should have the correct events-per-second stat', () => {
const pluginVertex = new PluginVertex(graph, vertexJson);
expect(pluginVertex.latestEventsPerSecond).to.be(20);
expect(pluginVertex.latestEventsPerSecond).to.be(10);
});
describe("isTimeConsuming", () => {
@ -91,13 +71,13 @@ describe('PluginVertex', () => {
});
it('should have a false isTimeConsuming result when the plugin consumes an average amount of execution time', () => {
vertexJson.stats.percent_of_total_processor_duration.data[1][1] = percentExecution;
vertexJson.stats.percent_of_total_processor_duration = percentExecution;
const pluginVertex = new PluginVertex(graph, vertexJson);
expect(pluginVertex.isTimeConsuming()).to.be(false);
});
it("should have a true isTimeConsuming result when the plugin consumes a large amount of execution time", () => {
vertexJson.stats.percent_of_total_processor_duration.data[1][1] = 0.1 +
it('should have a true isTimeConsuming result when the plugin consumes a large amount of execution time', () => {
vertexJson.stats.percent_of_total_processor_duration = 0.1 +
(percentExecution * (TIME_CONSUMING_PROCESSOR_THRESHOLD_COEFFICIENT));
const pluginVertex = new PluginVertex(graph, vertexJson);
expect(pluginVertex.isTimeConsuming()).to.be(true);
@ -111,13 +91,13 @@ describe('PluginVertex', () => {
});
it('should have a true isSlow result when the plugin\'s seconds per event is 2 standard deviations above the mean', () => {
vertexJson.stats.millis_per_event.data[1][1] = 999999999999999999;
vertexJson.stats.millis_per_event = 999999999999999999;
const pluginVertex = new PluginVertex(graph, vertexJson);
expect(pluginVertex.isSlow()).to.be(true);
});
it('should have a false isSlow result when the plugin\'s seconds per event is 2 standard deviations above the mean', () => {
vertexJson.stats.millis_per_event.data[1][1] = 1;
vertexJson.stats.millis_per_event = 1;
const pluginVertex = new PluginVertex(graph, vertexJson);
expect(pluginVertex.isSlow()).to.be(false);
});

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { last, get, omit } from 'lodash';
import { get } from 'lodash';
import { Vertex } from './vertex';
export const TIME_CONSUMING_PROCESSOR_THRESHOLD_COEFFICIENT = 2;
@ -44,28 +44,36 @@ export class PluginVertex extends Vertex {
}
get latestMillisPerEvent() {
const latestMillisPerEventBucket = last(get(this.stats, 'millis_per_event.data', [])) || [];
return latestMillisPerEventBucket[1];
return get(this.stats, 'millis_per_event');
}
get percentOfTotalProcessorTime() {
const latestPercentOfTotalProcessorDurationBucket = last(get(this.stats, 'percent_of_total_processor_duration.data', [])) || [];
return latestPercentOfTotalProcessorDurationBucket[1];
return get(this.stats, 'percent_of_total_processor_duration');
}
get eventsPerMillisecond() {
return this.isInput
? this.stats.events_out_per_millisecond
: this.stats.events_in_per_millisecond;
}
get eventsPerSecond() {
const eventsPerMillisecond = this.isInput
? this.stats.events_out_per_millisecond
: this.stats.events_in_per_millisecond;
return {
...omit(eventsPerMillisecond, 'data'),
data: get(eventsPerMillisecond, 'data', []).map(([x, y]) => [x, y * 1000])
};
if (!this.eventsPerMillisecond.hasOwnProperty('data')) {
return this.eventsPerMillisecond * 1000;
}
const eps = { ...this.eventsPerMillisecond }; // Clone the object so we don't modify the original one
eps.data = this.eventsPerMillisecond.data.map(([timestamp, value]) => [ timestamp, value * 1000]);
return eps;
}
get latestEventsPerSecond() {
const latestBucket = last(get(this.eventsPerSecond, 'data', [])) || [];
return latestBucket[1];
if (!this.eventsPerSecond.hasOwnProperty('data')) {
return this.eventsPerSecond;
}
const numTimeseriesBuckets = this.eventsPerSecond.data.length;
return this.eventsPerSecond.data[numTimeseriesBuckets - 1][1];
}
isTimeConsuming() {

View file

@ -14,7 +14,6 @@ exports[`PipelineViewer component passes expected props 1`] = `
verticalPosition="center"
>
<StatementSection
detailVertex={null}
elements={
Array [
Object {
@ -26,7 +25,7 @@ exports[`PipelineViewer component passes expected props 1`] = `
}
headingText="Inputs"
iconType="logstashInput"
onShowVertexDetails={[Function]}
onShowVertexDetails={[MockFunction]}
/>
<EuiSpacer
size="l"
@ -45,7 +44,6 @@ exports[`PipelineViewer component passes expected props 1`] = `
size="l"
/>
<StatementSection
detailVertex={null}
elements={
Array [
Object {
@ -57,13 +55,12 @@ exports[`PipelineViewer component passes expected props 1`] = `
}
headingText="Filters"
iconType="logstashFilter"
onShowVertexDetails={[Function]}
onShowVertexDetails={[MockFunction]}
/>
<EuiSpacer
size="l"
/>
<StatementSection
detailVertex={null}
elements={
Array [
Object {
@ -75,7 +72,7 @@ exports[`PipelineViewer component passes expected props 1`] = `
}
headingText="Outputs"
iconType="logstashOutput"
onShowVertexDetails={[Function]}
onShowVertexDetails={[MockFunction]}
/>
</EuiPageContent>
</EuiPageBody>
@ -96,11 +93,6 @@ exports[`PipelineViewer component renders DetailDrawer when selected vertex is n
verticalPosition="center"
>
<StatementSection
detailVertex={
Object {
"id": "stdin",
}
}
elements={
Array [
Object {
@ -112,7 +104,7 @@ exports[`PipelineViewer component renders DetailDrawer when selected vertex is n
}
headingText="Inputs"
iconType="logstashInput"
onShowVertexDetails={[Function]}
onShowVertexDetails={[MockFunction]}
/>
<EuiSpacer
size="l"
@ -131,11 +123,6 @@ exports[`PipelineViewer component renders DetailDrawer when selected vertex is n
size="l"
/>
<StatementSection
detailVertex={
Object {
"id": "stdin",
}
}
elements={
Array [
Object {
@ -147,17 +134,12 @@ exports[`PipelineViewer component renders DetailDrawer when selected vertex is n
}
headingText="Filters"
iconType="logstashFilter"
onShowVertexDetails={[Function]}
onShowVertexDetails={[MockFunction]}
/>
<EuiSpacer
size="l"
/>
<StatementSection
detailVertex={
Object {
"id": "stdin",
}
}
elements={
Array [
Object {
@ -169,7 +151,7 @@ exports[`PipelineViewer component renders DetailDrawer when selected vertex is n
}
headingText="Outputs"
iconType="logstashOutput"
onShowVertexDetails={[Function]}
onShowVertexDetails={[MockFunction]}
/>
<DetailDrawer
onHide={[Function]}

View file

@ -7,7 +7,6 @@
import React from 'react';
import { PipelineViewer } from '../pipeline_viewer';
import { shallowWithIntl } from '../../../../../../../../test_utils/enzyme_helpers';
import { get } from 'lodash';
describe('PipelineViewer component', () => {
let pipeline;
@ -44,7 +43,7 @@ describe('PipelineViewer component', () => {
},
};
component = <PipelineViewer.WrappedComponent pipeline={pipeline} />;
component = <PipelineViewer.WrappedComponent pipeline={pipeline} setDetailVertexId={jest.fn()} />;
});
it('passes expected props', () => {
@ -53,33 +52,12 @@ describe('PipelineViewer component', () => {
expect(renderedComponent).toMatchSnapshot();
});
it('changes selected vertex', () => {
const vertex = { id: 'stdin' };
const instance = shallowWithIntl(component).instance();
instance.onShowVertexDetails(vertex);
expect(get(instance, 'state.detailDrawer.vertex')).toBe(vertex);
});
it('toggles selected vertex on second pass', () => {
const vertex = { id: 'stdin' };
const instance = shallowWithIntl(component).instance();
instance.onShowVertexDetails(vertex);
instance.onShowVertexDetails(vertex);
expect(get(instance, 'state.detailDrawer.vertex')).toBeNull();
});
it('renders DetailDrawer when selected vertex is not null', () => {
const vertex = { id: 'stdin' };
component = <PipelineViewer.WrappedComponent pipeline={pipeline} setDetailVertexId={jest.fn()} detailVertex={vertex} />;
const wrapper = shallowWithIntl(component);
const instance = wrapper.instance();
instance.onShowVertexDetails(vertex);
wrapper.update();
const renderedComponent = shallowWithIntl(component);
expect(wrapper).toMatchSnapshot();
expect(renderedComponent).toMatchSnapshot();
});
});

View file

@ -27,36 +27,15 @@ class PipelineViewerUi extends React.Component {
};
}
onShowVertexDetails = (vertex) => {
if (vertex === this.state.detailDrawer.vertex) {
this.onHideVertexDetails();
}
else {
this.setState({
detailDrawer: {
vertex
}
});
}
}
onHideVertexDetails = () => {
this.setState({
detailDrawer: {
vertex: null
}
});
}
renderDetailDrawer = () => {
if (!this.state.detailDrawer.vertex) {
if (!this.props.detailVertex) {
return null;
}
return (
<DetailDrawer
vertex={this.state.detailDrawer.vertex}
onHide={this.onHideVertexDetails}
vertex={this.props.detailVertex}
onHide={() => this.props.setDetailVertexId(undefined)}
timeseriesTooltipXValueFormatter={this.props.timeseriesTooltipXValueFormatter}
/>
);
@ -79,8 +58,7 @@ class PipelineViewerUi extends React.Component {
iconType="logstashInput"
headingText={intl.formatMessage({ id: 'xpack.monitoring.logstash.pipelineViewer.inputsTitle', defaultMessage: 'Inputs' })}
elements={inputs}
onShowVertexDetails={this.onShowVertexDetails}
detailVertex={this.state.detailDrawer.vertex}
onShowVertexDetails={this.props.setDetailVertexId}
/>
<EuiSpacer />
<Queue queue={queue} />
@ -89,16 +67,14 @@ class PipelineViewerUi extends React.Component {
iconType="logstashFilter"
headingText={intl.formatMessage({ id: 'xpack.monitoring.logstash.pipelineViewer.filtersTitle', defaultMessage: 'Filters' })}
elements={filters}
onShowVertexDetails={this.onShowVertexDetails}
detailVertex={this.state.detailDrawer.vertex}
onShowVertexDetails={this.props.setDetailVertexId}
/>
<EuiSpacer />
<StatementSection
iconType="logstashOutput"
headingText={intl.formatMessage({ id: 'xpack.monitoring.logstash.pipelineViewer.outputsTitle', defaultMessage: 'Outputs' })}
elements={outputs}
onShowVertexDetails={this.onShowVertexDetails}
detailVertex={this.state.detailDrawer.vertex}
onShowVertexDetails={this.props.setDetailVertexId}
/>
{ this.renderDetailDrawer() }
</EuiPageContent>

View file

@ -20,6 +20,7 @@ import { List } from 'plugins/monitoring/components/logstash/pipeline_viewer/mod
import { PipelineState } from 'plugins/monitoring/components/logstash/pipeline_viewer/models/pipeline_state';
import { PipelineViewer } from 'plugins/monitoring/components/logstash/pipeline_viewer';
import { Pipeline } from 'plugins/monitoring/components/logstash/pipeline_viewer/models/pipeline';
import { vertexFactory } from 'plugins/monitoring/components/logstash/pipeline_viewer/models/graph/vertex_factory';
import { MonitoringViewBaseController } from '../../base_controller';
import { I18nContext } from 'ui/i18n';
import {
@ -28,6 +29,9 @@ import {
EuiPageContent,
} from '@elastic/eui';
let previousPipelineHash = undefined;
let detailVertexId = undefined;
function getPageData($injector) {
const $route = $injector.get('$route');
const $http = $injector.get('$http');
@ -38,11 +42,20 @@ function getPageData($injector) {
const { ccs, cluster_uuid: clusterUuid } = globalState;
const pipelineId = $route.current.params.id;
const pipelineHash = $route.current.params.hash || '';
// Pipeline version was changed, so clear out detailVertexId since that vertex won't
// exist in the updated pipeline version
if (pipelineHash !== previousPipelineHash) {
previousPipelineHash = pipelineHash;
detailVertexId = undefined;
}
const url = pipelineHash
? `../api/monitoring/v1/clusters/${clusterUuid}/logstash/pipeline/${pipelineId}/${pipelineHash}`
: `../api/monitoring/v1/clusters/${clusterUuid}/logstash/pipeline/${pipelineId}`;
return $http.post(url, {
ccs
ccs,
detailVertexId
})
.then(response => response.data)
.then(data => {
@ -107,11 +120,22 @@ uiRoutes.when('/logstash/pipelines/:id/:hash?', {
const timeseriesTooltipXValueFormatter = xValue =>
moment(xValue).format(dateFormat);
const setDetailVertexId = vertex => {
if (!vertex) {
detailVertexId = undefined;
} else {
detailVertexId = vertex.id;
}
return this.updateData();
};
$scope.$watch(() => this.data, data => {
if (!data || !data.pipeline) {
return;
}
this.pipelineState = new PipelineState(data.pipeline);
this.detailVertex = data.vertex ? vertexFactory(null, data.vertex) : null;
this.renderReact(
<I18nContext>
<EuiPage>
@ -122,6 +146,8 @@ uiRoutes.when('/logstash/pipelines/:id/:hash?', {
Pipeline.fromPipelineGraph(this.pipelineState.config.graph)
)}
timeseriesTooltipXValueFormatter={timeseriesTooltipXValueFormatter}
setDetailVertexId={setDetailVertexId}
detailVertex={this.detailVertex}
/>
</EuiPageContent>
</EuiPageBody>
@ -129,6 +155,11 @@ uiRoutes.when('/logstash/pipelines/:id/:hash?', {
</I18nContext>
);
});
$scope.$on('$destroy', () => {
previousPipelineHash = undefined;
detailVertexId = undefined;
});
}
}
});

View file

@ -37,13 +37,8 @@ describe('get_pipeline', () => {
it('returns correct stats', () => {
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
expect(result).to.eql({
events_in: 10000,
events_out: 9000,
duration_in_millis: 18000,
events_out_per_millisecond: 0.01,
millis_per_event: 2,
queue_push_duration_in_millis: 100000,
queue_push_duration_in_millis_per_event: 11.11111111111111
});
});
@ -57,9 +52,6 @@ describe('get_pipeline', () => {
it('returns correct stats', () => {
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
expect(result).to.eql({
events_in: 10000,
events_out: 9000,
duration_in_millis: 18000,
events_in_per_millisecond: 0.011111111111111112,
events_out_per_millisecond: 0.01,
millis_per_event: 1.8,
@ -78,9 +70,6 @@ describe('get_pipeline', () => {
it('returns correct stats', () => {
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
expect(result).to.eql({
events_in: 10000,
events_out: 9000,
duration_in_millis: 18000,
events_in_per_millisecond: 0.011111111111111112,
events_out_per_millisecond: 0.01,
millis_per_event: 1.8,
@ -93,7 +82,6 @@ describe('get_pipeline', () => {
describe('_enrichStateWithStatsAggregation function', () => {
let stateDocument;
let statsAggregation;
let version;
let timeseriesInterval;
beforeEach(() => {
@ -160,278 +148,96 @@ describe('get_pipeline', () => {
};
statsAggregation = {
timeseriesStats: [
{
key: { time_bucket: 1516131120000 },
pipelines: {
scoped: {
vertices: {
vertex_id: {
buckets: [
{
key: 'mystdout',
events_in_total: { value: 1000 },
events_out_total: { value: 1000 },
duration_in_millis_total: { value: 15 },
queue_push_duration_in_millis_total: { value: 0 }
},
{
key: 'mystdin',
events_in_total: { value: 0 },
events_out_total: { value: 1000 },
duration_in_millis_total: { value: 0 },
queue_push_duration_in_millis_total: { value: 13547 }
}
]
}
},
total_processor_duration_stats: {
count: 276,
min: 0,
max: 15904756,
avg: 6591773.384057971,
sum: 1819329454
}
}
}
},
{
key: { time_bucket: 1516131180000 },
pipelines: {
scoped: {
vertices: {
vertex_id: {
buckets: [
{
key: 'mystdout',
events_in_total: { value: 2000 },
events_out_total: { value: 2000 },
duration_in_millis_total: { value: 20 },
queue_push_duration_in_millis_total: { value: 0 }
},
{
key: 'mystdin',
events_in_total: { value: 0 },
events_out_total: { value: 2000 },
duration_in_millis_total: { value: 0 },
queue_push_duration_in_millis_total: { value: 25073 }
}
]
}
},
total_processor_duration_stats: {
count: 276,
min: 0,
max: 15953756,
avg: 6591773.384057971,
sum: 1819329454
aggregations: {
pipelines: {
scoped: {
vertices: {
vertex_id: {
buckets: [
{
key: 'mystdout',
events_in_total: { value: 1000 },
events_out_total: { value: 1000 },
duration_in_millis_total: { value: 15 },
},
{
key: 'mystdin',
events_in_total: { value: 0 },
events_out_total: { value: 1000 },
duration_in_millis_total: { value: 0 },
}
]
}
},
total_processor_duration_stats: {
count: 276,
min: 0,
max: 15904756,
avg: 6591773.384057971,
sum: 1819329454
}
}
}
]
}
};
version = {
hash: 'eada8baceee81726f6be9d0a071beefad3d9a2fd1b5f5d916011dca9fa66d081',
firstSeen: 1516131138639,
lastSeen: 1516135440463
};
timeseriesInterval = 30;
});
it('enriches the state document correctly with stats', () => {
const enrichedStateDocument = _enrichStateWithStatsAggregation(stateDocument, statsAggregation, version, timeseriesInterval);
const enrichedStateDocument = _enrichStateWithStatsAggregation(stateDocument, statsAggregation, timeseriesInterval);
expect(enrichedStateDocument).to.eql({
pipeline: {
batch_size: 125,
ephemeral_id: '2c53e689-62e8-4ef3-bc57-ea968531a848',
batch_size: 125,
ephemeral_id: '2c53e689-62e8-4ef3-bc57-ea968531a848',
hash: 'eada8baceee81726f6be9d0a071beefad3d9a2fd1b5f5d916011dca9fa66d081',
id: 'main',
representation: {
type: 'lir',
version: '0.0.0',
hash: 'eada8baceee81726f6be9d0a071beefad3d9a2fd1b5f5d916011dca9fa66d081',
id: 'main',
representation: {
type: 'lir',
version: '0.0.0',
hash: 'eada8baceee81726f6be9d0a071beefad3d9a2fd1b5f5d916011dca9fa66d081',
graph: {
vertices: [
{
config_name: 'stdin',
id: 'mystdin',
type: 'plugin',
plugin_type: 'input',
stats: {
duration_in_millis: {
data: [
[ 1516131120000, 0 ],
[ 1516131180000, 0 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_in: {
data: [
[ 1516131120000, 0 ],
[ 1516131180000, 0 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_out: {
data: [
[ 1516131120000, 1000 ],
[ 1516131180000, 2000 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_out_per_millisecond: {
data: [
[ 1516131120000, 0.03333333333333333 ],
[ 1516131180000, 0.06666666666666667 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
millis_per_event: {
data: [
[ 1516131120000, 0 ],
[ 1516131180000, 0 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
queue_push_duration_in_millis: {
data: [
[ 1516131120000, 13547 ],
[ 1516131180000, 25073 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
queue_push_duration_in_millis_per_event: {
data: [
[ 1516131120000, 13.547 ],
[ 1516131180000, 12.5365 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
}
}
},
{
config_name: 'stdout',
id: 'mystdout',
type: 'plugin',
plugin_type: 'output',
stats: {
duration_in_millis: {
data: [
[ 1516131120000, 15 ],
[ 1516131180000, 20 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_in: {
data: [
[ 1516131120000, 1000 ],
[ 1516131180000, 2000 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_out: {
data: [
[ 1516131120000, 1000 ],
[ 1516131180000, 2000 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_in_per_millisecond: {
data: [
[1516131120000, 0.03333333333333333],
[1516131180000, 0.06666666666666667]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_out_per_millisecond: {
data: [
[ 1516131120000, 0.03333333333333333 ],
[ 1516131180000, 0.06666666666666667 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
millis_per_event: {
data: [
[ 1516131120000, 0.015 ],
[ 1516131180000, 0.01 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
percent_of_total_processor_duration: {
data: [
[ 1516131120000, 0.0000009431141225932671 ],
[ 1516131180000, 0.0000012536232846986 ]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
}
}
graph: {
vertices: [
{
config_name: 'stdin',
id: 'mystdin',
type: 'plugin',
plugin_type: 'input',
stats: {
events_out_per_millisecond: 0.03333333333333333,
millis_per_event: 0
}
],
edges: [
{
id: 'c56369ba2e160c8add43e8f105ca17c374b27f4b4627ea4566f066b0ead0bcc7',
from: 'mystdin',
to: '__QUEUE__',
type: 'plain'
},
{
id: '8a5222282b023399a14195011f2a14aa54a4d97810cd9e0a63c5cd98856bb70f',
from: '__QUEUE__',
to: 'mystdout',
type: 'plain'
},
{
config_name: 'stdout',
id: 'mystdout',
type: 'plugin',
plugin_type: 'output',
stats: {
events_in_per_millisecond: 0.03333333333333333,
events_out_per_millisecond: 0.03333333333333333,
millis_per_event: 0.015,
percent_of_total_processor_duration: 0.0000009431141225932671
}
]
},
plugins: []
}
],
edges: [
{
id: 'c56369ba2e160c8add43e8f105ca17c374b27f4b4627ea4566f066b0ead0bcc7',
from: 'mystdin',
to: '__QUEUE__',
type: 'plain'
},
{
id: '8a5222282b023399a14195011f2a14aa54a4d97810cd9e0a63c5cd98856bb70f',
from: '__QUEUE__',
to: 'mystdout',
type: 'plain'
}
]
},
workers: 1
}
plugins: []
},
workers: 1
});
});
});

View file

@ -9,7 +9,6 @@ import { get } from 'lodash';
import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document';
import { getPipelineStatsAggregation } from './get_pipeline_stats_aggregation';
import { getPipelineVersions } from './get_pipeline_versions';
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds) {
@ -24,7 +23,6 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;
const inputStats = {};
const processorStats = {};
const eventsProcessedStats = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis
@ -34,8 +32,6 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
if (isInput) {
eventsTotal = eventsOutTotal;
inputStats.queue_push_duration_in_millis = vertexStatsBucket.queue_push_duration_in_millis_total.value;
inputStats.queue_push_duration_in_millis_per_event = inputStats.queue_push_duration_in_millis / eventsTotal;
}
if (isProcessor) {
@ -45,11 +41,7 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
}
return {
events_in: eventsInTotal,
events_out: eventsOutTotal,
duration_in_millis: durationInMillis,
millis_per_event: durationInMillis / eventsTotal,
...inputStats,
...processorStats,
...eventsProcessedStats
};
@ -57,15 +49,15 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
/**
* The UI needs a list of all vertices for the requested pipeline version, with each vertex in the list having its timeseries metrics associated with it. The
* stateDocument object provides the list of vertices while the statsAggregation object provides timeseries metrics. This function stitches the two together
* and returns the modified stateDocument object.
* stateDocument object provides the list of vertices while the statsAggregation object provides the latest metrics for each of these vertices.
* This function stitches the two together and returns the modified stateDocument object.
*
* @param {Object} stateDocument
* @param {Object} statsAggregation
* @param {Object} First and last seen timestamps for pipeline version we're getting data for
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/
export function _enrichStateWithStatsAggregation(stateDocument, statsAggregation, { firstSeen, lastSeen }, timeseriesIntervalInSeconds) {
export function _enrichStateWithStatsAggregation(stateDocument, statsAggregation, timeseriesIntervalInSeconds) {
const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices;
@ -75,67 +67,27 @@ export function _enrichStateWithStatsAggregation(stateDocument, statsAggregation
vertex.stats = {};
});
// The statsAggregation object buckets by time first, then by vertex ID. However, the logstashState object (which is part of the
// stateDocument object) buckets by vertex ID first. The UI desires the latter structure so it can look up stats by vertex. So we
// transpose statsAggregation to bucket by vertex ID first, then by time. This then allows us to stitch the per-vertex timeseries stats
// from the transposed statsAggregation object onto the logstashState object.
const timeseriesBuckets = statsAggregation.timeseriesStats;
timeseriesBuckets.forEach(timeseriesBucket => {
// each bucket calculates stats for total pipeline CPU time for the associated timeseries
const totalDurationStats = timeseriesBucket.pipelines.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;
const totalDurationStats = statsAggregation.aggregations.pipelines.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;
// Each timeseriesBucket contains a list of vertices and their stats for a single timeseries interval
const timestamp = timeseriesBucket.key.time_bucket;
const vertexStatsByIdBuckets = get(timeseriesBucket, 'pipelines.scoped.vertices.vertex_id.buckets', []);
const verticesWithStatsBuckets = statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets;
verticesWithStatsBuckets.forEach(vertexStatsBucket => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
const vertex = verticesById[vertexId];
vertexStatsByIdBuckets.forEach(vertexStatsBucket => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
const vertex = verticesById[vertexId];
if (vertex !== undefined) {
// We extract this vertex's stats from vertexStatsBucket
const vertexStats = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
// For each stat (metric), we add it to the stats property of the vertex object in logstashState
const metrics = Object.keys(vertexStats);
metrics.forEach(metric => {
// Create metric object if it doesn't already exist
if (!vertex.stats.hasOwnProperty(metric)) {
vertex.stats[metric] = {
timeRange: {
min: firstSeen,
max: lastSeen
},
data: []
};
}
vertex.stats[metric].data.push([ timestamp, vertexStats[metric]]);
});
}
});
if (vertex !== undefined) {
// We extract this vertex's stats from vertexStatsBucket
vertex.stats = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
}
});
return stateDocument.logstash_state;
return stateDocument.logstash_state.pipeline;
}
export async function getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, pipelineHash) {
export async function getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, version) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const versions = await getPipelineVersions(callWithRequest, req, config, lsIndexPattern, clusterUuid, pipelineId);
let version;
if (pipelineHash) {
// Find version corresponding to given hash
version = versions.find(({ hash }) => hash === pipelineHash);
} else {
// Go with latest version
version = versions[0];
}
const options = {
clusterUuid,
pipelineId,
@ -147,17 +99,13 @@ export async function getPipeline(req, config, lsIndexPattern, clusterUuid, pipe
const timeseriesInterval = calculateTimeseriesInterval(version.firstSeen, version.lastSeen, minIntervalSeconds);
const [ stateDocument, statsAggregation ] = await Promise.all([
getPipelineStateDocument(callWithRequest, req, lsIndexPattern, options),
getPipelineStatsAggregation(callWithRequest, req, lsIndexPattern, timeseriesInterval, options)
getPipelineStateDocument(req, lsIndexPattern, options),
getPipelineStatsAggregation(req, lsIndexPattern, timeseriesInterval, options),
]);
if (stateDocument === null) {
return boom.notFound(`Pipeline [${pipelineId} @ ${version.hash}] not found in the selected time range for cluster [${clusterUuid}].`);
}
const result = {
..._enrichStateWithStatsAggregation(stateDocument, statsAggregation, version, timeseriesInterval),
versions
};
return result;
return _enrichStateWithStatsAggregation(stateDocument, statsAggregation, timeseriesInterval);
}

View file

@ -8,8 +8,10 @@ import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';
import { get } from 'lodash';
export async function getPipelineStateDocument(callWithRequest, req, logstashIndexPattern,
export async function getPipelineStateDocument(req, logstashIndexPattern,
{ clusterUuid, pipelineId, version }) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const filters = [
{ term: { 'logstash_state.pipeline.id': pipelineId } },
{ term: { 'logstash_state.pipeline.hash': version.hash } }
@ -32,6 +34,7 @@ export async function getPipelineStateDocument(callWithRequest, req, logstashInd
size: 1,
ignoreUnavailable: true,
body: {
_source: { excludes: 'logstash_state.pipeline.representation.plugins' },
sort: { timestamp: { order: 'desc' } },
query,
terminate_after: 1 // Safe to do because all these documents are functionally identical

View file

@ -4,7 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { get, last } from 'lodash';
import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';
@ -62,7 +61,6 @@ function nestedVertices(maxBucketSize) {
...scalarCounterAggregation('events_in', fieldPath, ephemeralIdField, maxBucketSize),
...scalarCounterAggregation('events_out', fieldPath, ephemeralIdField, maxBucketSize),
...scalarCounterAggregation('duration_in_millis', fieldPath, ephemeralIdField, maxBucketSize),
...scalarCounterAggregation('queue_push_duration_in_millis', fieldPath, ephemeralIdField, maxBucketSize)
}
}
}
@ -90,62 +88,37 @@ function createScopedAgg(pipelineId, pipelineHash, agg) {
};
}
function createTimeseriesAggs(pipelineId, pipelineHash, maxBucketSize, timeseriesInterval, lastTimeBucket) {
return {
by_time: {
composite: {
sources: [
{
time_bucket: {
date_histogram: {
field: 'logstash_stats.timestamp',
interval: timeseriesInterval + 's'
}
}
}
],
after: {
time_bucket: lastTimeBucket
}
},
aggs: createScopedAgg(pipelineId, pipelineHash, {
vertices: nestedVertices(maxBucketSize),
total_processor_duration_stats: {
stats: {
field: "logstash_stats.pipelines.events.duration_in_millis"
}
}
})
}
};
}
function fetchPipelineTimeseriesStats(query, logstashIndexPattern, pipelineId, version,
maxBucketSize, timeseriesInterval, callWithRequest, req, lastTimeBucket = 0) {
function fetchPipelineLatestStats(query, logstashIndexPattern, pipelineId, version, maxBucketSize, callWithRequest, req) {
const params = {
index: logstashIndexPattern,
size: 0,
ignoreUnavailable: true,
filterPath: [
'aggregations.by_time.buckets.key.time_bucket',
'aggregations.by_time.buckets.pipelines.scoped.vertices.vertex_id.buckets.key',
'aggregations.by_time.buckets.pipelines.scoped.vertices.vertex_id.buckets.events_in_total',
'aggregations.by_time.buckets.pipelines.scoped.vertices.vertex_id.buckets.events_out_total',
'aggregations.by_time.buckets.pipelines.scoped.vertices.vertex_id.buckets.duration_in_millis_total',
'aggregations.by_time.buckets.pipelines.scoped.vertices.vertex_id.buckets.queue_push_duration_in_millis_total',
'aggregations.by_time.buckets.pipelines.scoped.total_processor_duration_stats'
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.key',
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.events_in_total',
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.events_out_total',
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.duration_in_millis_total',
'aggregations.pipelines.scoped.total_processor_duration_stats'
],
body: {
query: query,
aggs: createTimeseriesAggs(pipelineId, version.hash, maxBucketSize, timeseriesInterval, lastTimeBucket)
aggs: createScopedAgg(pipelineId, version.hash, {
vertices: nestedVertices(maxBucketSize),
total_processor_duration_stats: {
stats: {
field: 'logstash_stats.pipelines.events.duration_in_millis'
}
}
})
}
};
return callWithRequest(req, 'search', params);
}
export async function getPipelineStatsAggregation(callWithRequest, req, logstashIndexPattern, timeseriesInterval,
export function getPipelineStatsAggregation(req, logstashIndexPattern, timeseriesInterval,
{ clusterUuid, start, end, pipelineId, version }) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const filters = [
{
nested: {
@ -162,6 +135,9 @@ export async function getPipelineStatsAggregation(callWithRequest, req, logstash
}
];
start = version.lastSeen - (timeseriesInterval * 1000);
end = version.lastSeen;
const query = createQuery({
type: 'logstash_stats',
start,
@ -173,24 +149,6 @@ export async function getPipelineStatsAggregation(callWithRequest, req, logstash
const config = req.server.config();
const timeBuckets = [];
let paginatedTimeBuckets;
do {
const lastTimeBucket = get(last(paginatedTimeBuckets), 'key.time_bucket', 0);
const paginatedResponse = await fetchPipelineTimeseriesStats(query, logstashIndexPattern, pipelineId, version,
config.get('xpack.monitoring.max_bucket_size'), timeseriesInterval, callWithRequest, req, lastTimeBucket);
paginatedTimeBuckets = get(paginatedResponse, 'aggregations.by_time.buckets', []);
timeBuckets.push(...paginatedTimeBuckets);
} while (paginatedTimeBuckets.length > 0);
// Drop the last bucket if it is partial (spoiler alert: this will be the case most of the time)
const lastTimeBucket = last(timeBuckets);
if (version.lastSeen - lastTimeBucket.key.time_bucket < timeseriesInterval * 1000) {
timeBuckets.pop();
}
return {
timeseriesStats: timeBuckets
};
return fetchPipelineLatestStats(query, logstashIndexPattern, pipelineId, version,
config.get('xpack.monitoring.max_bucket_size'), callWithRequest, req);
}

View file

@ -10,8 +10,9 @@ import { get } from 'lodash';
import { checkParam } from '../error_missing_required';
function fetchPipelineVersions(...args) {
const [ callWithRequest, req, config, logstashIndexPattern, clusterUuid, pipelineId ] = args;
const [ req, config, logstashIndexPattern, clusterUuid, pipelineId ] = args;
checkParam(logstashIndexPattern, 'logstashIndexPattern in getPipelineVersions');
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const filters = [
{

View file

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import boom from 'boom';
import { get } from 'lodash';
import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document';
import { getPipelineVertexStatsAggregation } from './get_pipeline_vertex_stats_aggregation';
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds) {
const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
const timeseriesIntervalInMillis = timeseriesIntervalInSeconds * 1000;
const eventsInTotal = vertexStatsBucket.events_in_total.value;
const eventsOutTotal = get(vertexStatsBucket, 'events_out_total.value', null);
const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;
const inputStats = {};
const processorStats = {};
const eventsProcessedStats = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis
};
let eventsTotal;
if (isInput) {
eventsTotal = eventsOutTotal;
inputStats.queue_push_duration_in_millis = vertexStatsBucket.queue_push_duration_in_millis_total.value;
inputStats.queue_push_duration_in_millis_per_event = inputStats.queue_push_duration_in_millis / eventsTotal;
}
if (isProcessor) {
eventsTotal = eventsInTotal;
processorStats.percent_of_total_processor_duration = durationInMillis / totalProcessorsDurationInMillis;
eventsProcessedStats.events_in_per_millisecond = eventsInTotal / timeseriesIntervalInMillis;
}
return {
events_in: eventsInTotal,
events_out: eventsOutTotal,
duration_in_millis: durationInMillis,
millis_per_event: durationInMillis / eventsTotal,
...inputStats,
...processorStats,
...eventsProcessedStats
};
}
/**
* The UI needs a list of all vertices for the requested pipeline version, with each vertex in the list having its timeseries metrics associated with it. The
* stateDocument object provides the list of vertices while the statsAggregation object provides the timeseries metrics for each of these vertices.
* This function stitches the two together and returns the modified stateDocument object.
*
* @param {Object} stateDocument
* @param {Object} vertexStatsAggregation
* @param {Object} First and last seen timestamps for pipeline version we're getting data for
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/
export function _enrichVertexStateWithStatsAggregation(stateDocument, vertexStatsAggregation, vertexId, timeseriesIntervalInSeconds) {
const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices;
// First, filter out the vertex we care about
const vertex = vertices.find(v => v.id === vertexId);
vertex.stats = {};
// Next, iterate over timeseries metrics and attach them to vertex
const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets;
timeSeriesBuckets.forEach(timeSeriesBucket => {
// each bucket calculates stats for total pipeline CPU time for the associated timeseries
const totalDurationStats = timeSeriesBucket.pipelines.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;
const timestamp = timeSeriesBucket.key;
const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id;
const vertexStats = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
Object.keys(vertexStats).forEach(stat => {
if (!vertex.stats.hasOwnProperty(stat)) {
vertex.stats[stat] = { data: [] };
}
vertex.stats[stat].data.push([ timestamp, vertexStats[stat] ]);
});
});
return vertex;
}
export async function getPipelineVertex(req, config, lsIndexPattern, clusterUuid, pipelineId, version, vertexId) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');
const options = {
clusterUuid,
pipelineId,
version,
vertexId
};
// Determine metrics' timeseries interval based on version's timespan
const minIntervalSeconds = config.get('xpack.monitoring.min_interval_seconds');
const timeseriesInterval = calculateTimeseriesInterval(version.firstSeen, version.lastSeen, minIntervalSeconds);
const [ stateDocument, statsAggregation ] = await Promise.all([
getPipelineStateDocument(req, lsIndexPattern, options),
getPipelineVertexStatsAggregation(req, lsIndexPattern, timeseriesInterval, options),
]);
if (stateDocument === null) {
return boom.notFound(`Pipeline [${pipelineId} @ ${version.hash}] not found in the selected time range for cluster [${clusterUuid}].`);
}
return _enrichVertexStateWithStatsAggregation(stateDocument, statsAggregation, vertexId, timeseriesInterval);
}

View file

@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';
function scalarCounterAggregation(field, fieldPath, ephemeralIdField, maxBucketSize) {
const fullPath = `${fieldPath}.${field}`;
const byEphemeralIdName = `${field}_temp_by_ephemeral_id`;
const sumName = `${field}_total`;
const aggs = {};
aggs[byEphemeralIdName] = {
terms: {
field: ephemeralIdField,
size: maxBucketSize,
},
aggs: {
stats: {
stats: { field: fullPath }
},
difference: {
bucket_script: {
script: 'params.max - params.min',
buckets_path: {
min: 'stats.min',
max: 'stats.max'
}
}
}
}
};
aggs[sumName] = {
sum_bucket: {
buckets_path: `${byEphemeralIdName}>difference`
}
};
return aggs;
}
function createAggsObjectFromAggsList(aggsList) {
return aggsList.reduce((aggsSoFar, agg) => ({ ...aggsSoFar, ...agg }), {});
}
function createNestedVertexAgg(vertexId, maxBucketSize) {
const fieldPath = 'logstash_stats.pipelines.vertices';
const ephemeralIdField = 'logstash_stats.pipelines.vertices.pipeline_ephemeral_id';
return {
vertices: {
nested: { path: 'logstash_stats.pipelines.vertices' },
aggs: {
vertex_id: {
filter: {
term: {
'logstash_stats.pipelines.vertices.id': vertexId
}
},
aggs: {
...scalarCounterAggregation('events_in', fieldPath, ephemeralIdField, maxBucketSize),
...scalarCounterAggregation('events_out', fieldPath, ephemeralIdField, maxBucketSize),
...scalarCounterAggregation('duration_in_millis', fieldPath, ephemeralIdField, maxBucketSize),
...scalarCounterAggregation('queue_push_duration_in_millis', fieldPath, ephemeralIdField, maxBucketSize)
}
}
}
}
};
}
function createTotalProcessorDurationStatsAgg() {
return {
total_processor_duration_stats: {
stats: {
field: 'logstash_stats.pipelines.events.duration_in_millis'
}
}
};
}
function createScopedAgg(pipelineId, pipelineHash, ...aggsList) {
return {
pipelines: {
nested: { path: 'logstash_stats.pipelines' },
aggs: {
scoped: {
filter: {
bool: {
filter: [
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
{ term: { 'logstash_stats.pipelines.hash': pipelineHash } }
]
}
},
aggs: createAggsObjectFromAggsList(aggsList)
}
}
}
};
}
function createTimeSeriesAgg(timeSeriesIntervalInSeconds, ...aggsList) {
return {
timeseries: {
date_histogram: {
field: 'timestamp',
interval: timeSeriesIntervalInSeconds + 's'
},
aggs: createAggsObjectFromAggsList(aggsList)
}
};
}
function fetchPipelineVertexTimeSeriesStats(query, logstashIndexPattern, pipelineId, version, vertexId,
timeSeriesIntervalInSeconds, maxBucketSize, callWithRequest, req) {
const aggs = {
...createTimeSeriesAgg(timeSeriesIntervalInSeconds,
createScopedAgg(pipelineId, version.hash,
createNestedVertexAgg(vertexId, maxBucketSize),
createTotalProcessorDurationStatsAgg()
)
)
};
const params = {
index: logstashIndexPattern,
size: 0,
ignoreUnavailable: true,
filterPath: [
'aggregations.timeseries.buckets.key',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.events_in_total',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.events_out_total',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.duration_in_millis_total',
'aggregations.timeseries.buckets.pipelines.scoped.vertices.vertex_id.queue_push_duration_in_millis_total',
'aggregations.timeseries.buckets.pipelines.scoped.total_processor_duration_stats'
],
body: {
query: query,
aggs
}
};
return callWithRequest(req, 'search', params);
}
export function getPipelineVertexStatsAggregation(req, logstashIndexPattern, timeSeriesIntervalInSeconds,
{ clusterUuid, start, end, pipelineId, version, vertexId }) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const filters = [
{
nested: {
path: 'logstash_stats.pipelines',
query: {
bool: {
must: [
{ term: { 'logstash_stats.pipelines.hash': version.hash } },
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
]
}
}
}
}
];
start = version.firstSeen;
end = version.lastSeen;
const query = createQuery({
type: 'logstash_stats',
start,
end,
metric: LogstashMetric.getMetricFields(),
clusterUuid,
filters
});
const config = req.server.config();
return fetchPipelineVertexTimeSeriesStats(query, logstashIndexPattern, pipelineId, version, vertexId,
timeSeriesIntervalInSeconds, config.get('xpack.monitoring.max_bucket_size'), callWithRequest, req);
}

View file

@ -6,9 +6,17 @@
import Joi from 'joi';
import { handleError } from '../../../../lib/errors';
import { getPipelineVersions } from '../../../../lib/logstash/get_pipeline_versions';
import { getPipeline } from '../../../../lib/logstash/get_pipeline';
import { getPipelineVertex } from '../../../../lib/logstash/get_pipeline_vertex';
import { prefixIndexPattern } from '../../../../lib/ccs_utils';
function getPipelineVersion(versions, pipelineHash) {
return pipelineHash
? versions.find(({ hash }) => hash === pipelineHash)
: versions[0];
}
/*
* Logstash Pipeline route.
*/
@ -33,22 +41,46 @@ export function logstashPipelineRoute(server) {
pipelineHash: Joi.string().optional()
}),
payload: Joi.object({
ccs: Joi.string().optional()
ccs: Joi.string().optional(),
detailVertexId: Joi.string().optional()
})
}
},
handler: (req) => {
handler: async (req) => {
const config = server.config();
const ccs = req.payload.ccs;
const clusterUuid = req.params.clusterUuid;
const detailVertexId = req.payload.detailVertexId;
const lsIndexPattern = prefixIndexPattern(config, 'xpack.monitoring.logstash.index_pattern', ccs);
const pipelineId = req.params.pipelineId;
// Optional params default to empty string, set to null to be more explicit.
const pipelineHash = req.params.pipelineHash || null;
return getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, pipelineHash)
.catch(err => handleError(err, req));
// Figure out which version of the pipeline we want to show
let versions;
try {
versions = await getPipelineVersions(req, config, lsIndexPattern, clusterUuid, pipelineId);
} catch (err) {
return handleError(err, req);
}
const version = getPipelineVersion(versions, pipelineHash);
const promises = [ getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, version) ];
if (detailVertexId) {
promises.push(getPipelineVertex(req, config, lsIndexPattern, clusterUuid, pipelineId, version, detailVertexId));
}
try {
const [ pipeline, vertex ] = await Promise.all(promises);
return {
versions,
pipeline,
vertex
};
} catch (err) {
return handleError(err, req);
}
}
});
}