mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
PERFORMANCE: filter_func can optionally take an array of events to make batched filters much faster
Fixes #8428
This commit is contained in:
parent
e15a5868ec
commit
179eaff57e
5 changed files with 21 additions and 11 deletions
|
@ -77,7 +77,7 @@ module LogStash; module Config; module AST
|
|||
# of the output/filter function
|
||||
definitions << "define_singleton_method :#{type}_func do |event|"
|
||||
definitions << " targeted_outputs = []" if type == "output"
|
||||
definitions << " events = [event]" if type == "filter"
|
||||
definitions << " events = event" if type == "filter"
|
||||
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", \"event\" => event.to_hash)"
|
||||
|
||||
sections.select { |s| s.plugin_type.text_value == type }.each do |s|
|
||||
|
|
|
@ -495,11 +495,9 @@ module LogStash; class Pipeline < BasePipeline
|
|||
end
|
||||
|
||||
def filter_batch(batch)
|
||||
batch.each do |event|
|
||||
filter_func(event).each do |e|
|
||||
#these are both original and generated events
|
||||
batch.merge(e) unless e.cancelled?
|
||||
end
|
||||
filter_func(batch.to_a).each do |e|
|
||||
#these are both original and generated events
|
||||
batch.merge(e) unless e.cancelled?
|
||||
end
|
||||
@filter_queue_client.add_filtered_metrics(batch)
|
||||
@events_filtered.increment(batch.size)
|
||||
|
@ -652,7 +650,7 @@ module LogStash; class Pipeline < BasePipeline
|
|||
def filter(event, &block)
|
||||
maybe_setup_out_plugins
|
||||
# filter_func returns all filtered events, including cancelled ones
|
||||
filter_func(event).each {|e| block.call(e)}
|
||||
filter_func([event]).each {|e| block.call(e)}
|
||||
end
|
||||
|
||||
# perform filters flush and yield flushed event to the passed block
|
||||
|
|
|
@ -285,6 +285,12 @@ module LogStash; module Util
|
|||
# @cancelled[event] = true
|
||||
end
|
||||
|
||||
def to_a
|
||||
events = []
|
||||
each {|e| events << e}
|
||||
events
|
||||
end
|
||||
|
||||
def each(&blk)
|
||||
# take care not to cause @originals or @generated to change during iteration
|
||||
|
||||
|
|
|
@ -183,6 +183,12 @@ module LogStash; module Util
|
|||
# @cancelled[event] = true
|
||||
end
|
||||
|
||||
def to_a
|
||||
events = []
|
||||
each {|e| events << e}
|
||||
events
|
||||
end
|
||||
|
||||
def each(&blk)
|
||||
# take care not to cause @originals or @generated to change during iteration
|
||||
@is_iterating = true
|
||||
|
|
|
@ -618,9 +618,9 @@ describe LogStash::Pipeline do
|
|||
|
||||
it "should handle evaluating different config" do
|
||||
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -700,9 +700,9 @@ describe LogStash::Pipeline do
|
|||
# in the current instance and was returning an array containing nil values for
|
||||
# the match.
|
||||
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue