PERFORMANCE: Avoid redundant (and high overhead) size calls to Java HashMap

Fixes #8448
This commit is contained in:
Armin 2017-10-06 17:25:02 +02:00 committed by Armin Braun
parent b86f89335f
commit a0e0ef7861
5 changed files with 19 additions and 24 deletions

View file

@ -702,9 +702,10 @@ module LogStash; class JavaPipeline < JavaBasePipeline
def execute_batch(batched_execution, batch, flush)
batched_execution.compute(batch, flush, false)
@filter_queue_client.add_output_metrics(batch)
@filter_queue_client.add_filtered_metrics(batch)
@events_filtered.increment(batch.size)
filtered_size = batch.filtered_size
@filter_queue_client.add_output_metrics(filtered_size)
@filter_queue_client.add_filtered_metrics(filtered_size)
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
# but if an exception is raised up to the worker thread they are considered

View file

@ -500,7 +500,7 @@ module LogStash; class Pipeline < BasePipeline
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
@filter_queue_client.add_filtered_metrics(batch)
@filter_queue_client.add_filtered_metrics(batch.filtered_size)
@events_filtered.increment(batch.size)
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
@ -532,7 +532,7 @@ module LogStash; class Pipeline < BasePipeline
events.clear
end
@filter_queue_client.add_output_metrics(batch)
@filter_queue_client.add_output_metrics(batch.filtered_size)
end
def wait_inputs

View file

@ -220,20 +220,14 @@ module LogStash; module Util
end
end
def add_starting_metrics(batch)
return if @event_metric.nil? || @pipeline_metric.nil?
@event_metric.increment(:in, batch.starting_size)
@pipeline_metric.increment(:in, batch.starting_size)
def add_filtered_metrics(filtered_size)
@event_metric.increment(:filtered, filtered_size)
@pipeline_metric.increment(:filtered, filtered_size)
end
def add_filtered_metrics(batch)
@event_metric.increment(:filtered, batch.filtered_size)
@pipeline_metric.increment(:filtered, batch.filtered_size)
end
def add_output_metrics(batch)
@event_metric.increment(:out, batch.filtered_size)
@pipeline_metric.increment(:out, batch.filtered_size)
def add_output_metrics(filtered_size)
@event_metric.increment(:out, filtered_size)
@pipeline_metric.increment(:out, filtered_size)
end
end

View file

@ -143,14 +143,14 @@ module LogStash; module Util
end
end
def add_filtered_metrics(batch)
@event_metric_filtered.increment(batch.filtered_size)
@pipeline_metric_filtered.increment(batch.filtered_size)
def add_filtered_metrics(filtered_size)
@event_metric_filtered.increment(filtered_size)
@pipeline_metric_filtered.increment(filtered_size)
end
def add_output_metrics(batch)
@event_metric_out.increment(batch.filtered_size)
@pipeline_metric_out.increment(batch.filtered_size)
def add_output_metrics(filtered_size)
@event_metric_out.increment(filtered_size)
@pipeline_metric_out.increment(filtered_size)
end
end

View file

@ -68,8 +68,8 @@ describe LogStash::Util::WrappedSynchronousQueue do
sleep(0.1) # simulate some work for the `duration_in_millis`
# TODO: this interaction should be cleaned in an upcoming PR,
# This is what the current pipeline does.
read_client.add_filtered_metrics(read_batch)
read_client.add_output_metrics(read_batch)
read_client.add_filtered_metrics(read_batch.filtered_size)
read_client.add_output_metrics(read_batch.filtered_size)
read_client.close_batch(read_batch)
store = collector.snapshot_metric.metric_store