mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
PERFORMANCE: filter_func can optionally take an array of events to make batched filters much faster
Fixes #8428 Fixes #8444
This commit is contained in:
parent
25e2b000d6
commit
840439722d
5 changed files with 21 additions and 13 deletions
|
@ -77,7 +77,7 @@ module LogStash; module Config; module AST
|
|||
# of the output/filter function
|
||||
definitions << "define_singleton_method :#{type}_func do |event|"
|
||||
definitions << " targeted_outputs = []" if type == "output"
|
||||
definitions << " events = [event]" if type == "filter"
|
||||
definitions << " events = event" if type == "filter"
|
||||
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", \"event\" => event.to_hash)"
|
||||
|
||||
sections.select { |s| s.plugin_type.text_value == type }.each do |s|
|
||||
|
|
|
@ -498,13 +498,9 @@ module LogStash; class Pipeline < BasePipeline
|
|||
end
|
||||
|
||||
def filter_batch(batch)
|
||||
batch.each do |event|
|
||||
return if @force_shutdown.true?
|
||||
|
||||
filter_func(event).each do |e|
|
||||
#these are both original and generated events
|
||||
batch.merge(e) unless e.cancelled?
|
||||
end
|
||||
filter_func(batch.to_a).each do |e|
|
||||
#these are both original and generated events
|
||||
batch.merge(e) unless e.cancelled?
|
||||
end
|
||||
@filter_queue_client.add_filtered_metrics(batch)
|
||||
@events_filtered.increment(batch.size)
|
||||
|
@ -662,7 +658,7 @@ module LogStash; class Pipeline < BasePipeline
|
|||
def filter(event, &block)
|
||||
maybe_setup_out_plugins
|
||||
# filter_func returns all filtered events, including cancelled ones
|
||||
filter_func(event).each {|e| block.call(e)}
|
||||
filter_func([event]).each {|e| block.call(e)}
|
||||
end
|
||||
|
||||
# perform filters flush and yield flushed event to the passed block
|
||||
|
|
|
@ -287,6 +287,12 @@ module LogStash; module Util
|
|||
# @cancelled[event] = true
|
||||
end
|
||||
|
||||
def to_a
|
||||
events = []
|
||||
each {|e| events << e}
|
||||
events
|
||||
end
|
||||
|
||||
def each(&blk)
|
||||
# take care not to cause @originals or @generated to change during iteration
|
||||
|
||||
|
|
|
@ -218,6 +218,12 @@ module LogStash; module Util
|
|||
# @cancelled[event] = true
|
||||
end
|
||||
|
||||
def to_a
|
||||
events = []
|
||||
each {|e| events << e}
|
||||
events
|
||||
end
|
||||
|
||||
def each(&blk)
|
||||
# take care not to cause @originals or @generated to change during iteration
|
||||
@iterating = true
|
||||
|
|
|
@ -618,9 +618,9 @@ describe LogStash::Pipeline do
|
|||
|
||||
it "should handle evaluating different config" do
|
||||
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -700,9 +700,9 @@ describe LogStash::Pipeline do
|
|||
# in the current instance and was returning an array containing nil values for
|
||||
# the match.
|
||||
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func([LogStash::Event.new])).not_to include(nil)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue