mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
fix to support legacy plugins specs monkeypatching pipelines
This commit is contained in:
parent
9477db2768
commit
1cbe279d11
1 changed files with 9 additions and 7 deletions
|
@ -212,7 +212,7 @@ module LogStash; class Pipeline
|
|||
signal = false
|
||||
batch_size.times do |t|
|
||||
event = (t == 0) ? @input_queue.take : @input_queue.poll(batch_delay)
|
||||
|
||||
|
||||
if event.nil?
|
||||
next
|
||||
elsif event == LogStash::SHUTDOWN || event == LogStash::FLUSH
|
||||
|
@ -255,16 +255,18 @@ module LogStash; class Pipeline
|
|||
outputs_events = batch.reduce(Hash.new { |h, k| h[k] = [] }) do |acc, event|
|
||||
# We ask the AST to tell us which outputs to send each event to
|
||||
# Then, we stick it in the correct bin
|
||||
output_func(event).each do |output|
|
||||
acc[output] << event
|
||||
end
|
||||
|
||||
# output_func should never return anything other than an Array but we have lots of legacy specs
|
||||
# that monkeypatch it and return nil. We can deprecate "|| []" after fixing these specs
|
||||
outputs_for_event = output_func(event) || []
|
||||
|
||||
outputs_for_event.each { |output| acc[output] << event }
|
||||
acc
|
||||
end
|
||||
|
||||
# Now that we have our output to event mapping we can just invoke each output
|
||||
# once with its list of events
|
||||
outputs_events.each do |output, events|
|
||||
output.multi_receive(events)
|
||||
end
|
||||
outputs_events.each { |output, events| output.multi_receive(events) }
|
||||
end
|
||||
|
||||
def set_current_thread_inflight_batch(batch)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue