mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
Provide events in & out per millisecond for Logstash pipelines (#19446)
* Port changes to dedicated branch to isolate PR. * Rename variable in test to match function name.
This commit is contained in:
parent
c2f7b13b56
commit
88c2b44a79
4 changed files with 40 additions and 14 deletions
|
@ -40,11 +40,17 @@ describe('PluginVertex', () => {
|
|||
[ 1516667386000, 0.3 ]
|
||||
]
|
||||
},
|
||||
events_per_millisecond: {
|
||||
events_in_per_millisecond: {
|
||||
data: [
|
||||
[ 1516667383000, 0.01 ],
|
||||
[ 1516667386000, 0.02 ]
|
||||
]
|
||||
},
|
||||
events_out_per_millisecond: {
|
||||
data: [
|
||||
[ 1516667383000, 0.01 ],
|
||||
[ 1516667386000, 0.03 ]
|
||||
]
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -57,7 +57,9 @@ export class PluginVertex extends Vertex {
|
|||
}
|
||||
|
||||
get eventsPerSecond() {
|
||||
const eventsPerMillisecond = this.stats.events_per_millisecond;
|
||||
const eventsPerMillisecond = this.isInput
|
||||
? this.stats.events_out_per_millisecond
|
||||
: this.stats.events_in_per_millisecond;
|
||||
return {
|
||||
...omit(eventsPerMillisecond, 'data'),
|
||||
data: get(eventsPerMillisecond, 'data', []).map(([x, y]) => [x, y * 1000])
|
||||
|
|
|
@ -15,7 +15,7 @@ describe('get_pipeline', () => {
|
|||
let vertex;
|
||||
let vertexStatsBucket;
|
||||
let totalProcessorsDurationInMillis;
|
||||
let timeboundsInMillis;
|
||||
let timeseriesIntervalInSeconds;
|
||||
|
||||
beforeEach(() => {
|
||||
vertex = {
|
||||
|
@ -31,16 +31,16 @@ describe('get_pipeline', () => {
|
|||
};
|
||||
|
||||
totalProcessorsDurationInMillis = 24000;
|
||||
timeboundsInMillis = 15 * 60 * 1000;
|
||||
timeseriesIntervalInSeconds = 15 * 60;
|
||||
});
|
||||
|
||||
it('returns correct stats', () => {
|
||||
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeboundsInMillis);
|
||||
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
|
||||
expect(result).to.eql({
|
||||
events_in: 10000,
|
||||
events_out: 9000,
|
||||
duration_in_millis: 18000,
|
||||
events_per_millisecond: 0.00001,
|
||||
events_out_per_millisecond: 0.01,
|
||||
millis_per_event: 2,
|
||||
queue_push_duration_in_millis: 100000,
|
||||
queue_push_duration_in_millis_per_event: 11.11111111111111
|
||||
|
@ -55,12 +55,13 @@ describe('get_pipeline', () => {
|
|||
});
|
||||
|
||||
it('returns correct stats', () => {
|
||||
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeboundsInMillis);
|
||||
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
|
||||
expect(result).to.eql({
|
||||
events_in: 10000,
|
||||
events_out: 9000,
|
||||
duration_in_millis: 18000,
|
||||
events_per_millisecond: 0.000011111111111111112,
|
||||
events_in_per_millisecond: 0.011111111111111112,
|
||||
events_out_per_millisecond: 0.01,
|
||||
millis_per_event: 1.8,
|
||||
percent_of_total_processor_duration: 0.75
|
||||
});
|
||||
|
@ -75,12 +76,13 @@ describe('get_pipeline', () => {
|
|||
});
|
||||
|
||||
it('returns correct stats', () => {
|
||||
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeboundsInMillis);
|
||||
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
|
||||
expect(result).to.eql({
|
||||
events_in: 10000,
|
||||
events_out: 9000,
|
||||
duration_in_millis: 18000,
|
||||
events_per_millisecond: 0.000011111111111111112,
|
||||
events_in_per_millisecond: 0.011111111111111112,
|
||||
events_out_per_millisecond: 0.01,
|
||||
millis_per_event: 1.8,
|
||||
percent_of_total_processor_duration: 0.75
|
||||
});
|
||||
|
@ -289,7 +291,7 @@ describe('get_pipeline', () => {
|
|||
max: 1516135440463
|
||||
}
|
||||
},
|
||||
events_per_millisecond: {
|
||||
events_out_per_millisecond: {
|
||||
data: [
|
||||
[ 1516131120000, 0.03333333333333333 ],
|
||||
[ 1516131180000, 0.06666666666666667 ]
|
||||
|
@ -367,7 +369,17 @@ describe('get_pipeline', () => {
|
|||
max: 1516135440463
|
||||
}
|
||||
},
|
||||
events_per_millisecond: {
|
||||
events_in_per_millisecond: {
|
||||
data: [
|
||||
[1516131120000, 0.03333333333333333],
|
||||
[1516131180000, 0.06666666666666667]
|
||||
],
|
||||
timeRange: {
|
||||
min: 1516131138639,
|
||||
max: 1516135440463
|
||||
}
|
||||
},
|
||||
events_out_per_millisecond: {
|
||||
data: [
|
||||
[ 1516131120000, 0.03333333333333333 ],
|
||||
[ 1516131180000, 0.06666666666666667 ]
|
||||
|
|
|
@ -17,6 +17,8 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
|
|||
const isInput = vertex.plugin_type === 'input';
|
||||
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
|
||||
|
||||
const timeseriesIntervalInMillis = timeseriesIntervalInSeconds * 1000;
|
||||
|
||||
const eventsInTotal = vertexStatsBucket.events_in_total.value;
|
||||
const eventsOutTotal = get(vertexStatsBucket, 'events_out_total.value', null);
|
||||
|
||||
|
@ -24,6 +26,9 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
|
|||
|
||||
const inputStats = {};
|
||||
const processorStats = {};
|
||||
const eventsProcessedStats = {
|
||||
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis
|
||||
};
|
||||
|
||||
let eventsTotal;
|
||||
|
||||
|
@ -36,16 +41,17 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
|
|||
if (isProcessor) {
|
||||
eventsTotal = eventsInTotal;
|
||||
processorStats.percent_of_total_processor_duration = durationInMillis / totalProcessorsDurationInMillis;
|
||||
eventsProcessedStats.events_in_per_millisecond = eventsInTotal / timeseriesIntervalInMillis;
|
||||
}
|
||||
|
||||
return {
|
||||
events_in: eventsInTotal,
|
||||
events_out: eventsOutTotal,
|
||||
duration_in_millis: durationInMillis,
|
||||
events_per_millisecond: eventsTotal / (timeseriesIntervalInSeconds * 1000),
|
||||
millis_per_event: durationInMillis / eventsTotal,
|
||||
...inputStats,
|
||||
...processorStats
|
||||
...processorStats,
|
||||
...eventsProcessedStats
|
||||
};
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue