fix for cancelled events issue #6055

update comments

preserve abort_on_exception state
This commit is contained in:
Colin Surprenant 2016-10-14 16:46:01 +02:00
parent 35c3f0bd55
commit 3fe1bfc935
5 changed files with 103 additions and 23 deletions

View file

@ -257,14 +257,9 @@ module LogStash; class Pipeline
def filter_batch(batch)
batch.each do |event|
if event.is_a?(Event)
filtered = filter_func(event)
filtered.each do |e|
#these are both original and generated events
if e.cancelled?
batch.cancel(e)
else
batch.merge(e)
end
filter_func(event).each do |e|
# these are both original and generated events
batch.merge(e) unless e.cancelled?
end
end
end
@ -492,9 +487,7 @@ module LogStash; class Pipeline
def flush_filters_to_batch(batch, options = {})
options[:final] = batch.shutdown_signal_received?
flush_filters(options) do |event|
if event.cancelled?
batch.cancel(event)
else
unless event.cancelled?
@logger.debug? and @logger.debug("Pushing flushed events", :event => event)
batch.merge(event)
end

View file

@ -148,7 +148,10 @@ module LogStash; module Util
@shutdown_signal_received = false
@flush_signal_received = false
@originals = Hash.new
@cancelled = Hash.new
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
# @cancelled = Hash.new
@generated = Hash.new
@iterating_temp = Hash.new
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
@ -168,17 +171,22 @@ module LogStash; module Util
end
def cancel(event)
@cancelled[event] = true
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
raise("cancel is unsupported")
# @cancelled[event] = true
end
def each(&blk)
# take care not to cause @originals or @generated to change during iteration
@iterating = true
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
@originals.each do |e, _|
blk.call(e) unless @cancelled.include?(e)
blk.call(e) unless e.cancelled?
end
@generated.each do |e, _|
blk.call(e) unless @cancelled.include?(e)
blk.call(e) unless e.cancelled?
end
@iterating = false
update_generated
@ -197,7 +205,9 @@ module LogStash; module Util
end
def cancelled_size
@cancelled.size
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
raise("cancelled_size is unsupported ")
# @cancelled.size
end
def shutdown_signal_received?

View file

@ -100,6 +100,60 @@ describe LogStash::Pipeline do
pipeline_settings_obj.reset
end
describe "event cancellation" do
# test harness for https://github.com/elastic/logstash/issues/6055
let(:output) { DummyOutputWithEventsArray.new }
before do
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputwitheventsarray").and_return(DummyOutputWithEventsArray)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "drop").and_call_original
allow(LogStash::Plugin).to receive(:lookup).with("filter", "mutate").and_call_original
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_call_original
allow(DummyOutputWithEventsArray).to receive(:new).with(any_args).and_return(output)
end
let(:config) do
<<-CONFIG
input {
generator {
lines => ["1", "2", "END"]
count => 1
}
}
filter {
if [message] == "1" {
drop {}
}
mutate { add_tag => ["notdropped"] }
}
output { dummyoutputwitheventsarray {} }
CONFIG
end
it "should not propage cancelled events from filter to output" do
abort_on_exception_state = Thread.abort_on_exception
Thread.abort_on_exception = true
pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj)
Thread.new { pipeline.run }
sleep 0.1 while !pipeline.ready?
wait(3).for do
# give us a bit of time to flush the events
# puts("*****" + output.events.map{|e| e.message}.to_s)
output.events.map{|e| e.get("message")}.include?("END")
end.to be_truthy
expect(output.events.size).to eq(2)
expect(output.events[0].get("tags")).to eq(["notdropped"])
expect(output.events[1].get("tags")).to eq(["notdropped"])
pipeline.shutdown
Thread.abort_on_exception = abort_on_exception_state
end
end
describe "defaulting the pipeline workers based on thread safety" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)

View file

@ -94,22 +94,23 @@ describe LogStash::Util::WrappedSynchronousQueue do
it "appends batches to the queue" do
batch = write_client.get_new_batch
5.times {|i| batch.push("value-#{i}")}
5.times {|i| batch.push(LogStash::Event.new({"message" => "value-#{i}"}))}
write_client.push_batch(batch)
read_batch = read_client.take_batch
expect(read_batch.size).to eq(5)
i = 0
read_batch.each do |data|
expect(data).to eq("value-#{i}")
read_batch.cancel("value-#{i}") if i > 2
read_batch.merge("generated-#{i}") if i > 2
expect(data.get("message")).to eq("value-#{i}")
# read_batch.cancel("value-#{i}") if i > 2 # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
data.cancel if i > 2
read_batch.merge(LogStash::Event.new({"message" => "generated-#{i}"})) if i > 2
i += 1
end
expect(read_batch.cancelled_size).to eq(2)
# expect(read_batch.cancelled_size).to eq(2) # disabled for https://github.com/elastic/logstash/issues/6055
i = 0
read_batch.each do |data|
expect(data).to eq("value-#{i}") if i < 3
expect(data).to eq("generated-#{i}") if i > 2
expect(data.get("message")).to eq("value-#{i}") if i < 3
expect(data.get("message")).to eq("generated-#{i}") if i > 2
i += 1
end
end

View file

@ -25,3 +25,25 @@ class DummyOutput < LogStash::Outputs::Base
@num_closes = 1
end
end
class DummyOutputWithEventsArray < LogStash::Outputs::Base
config_name "dummyoutput2"
milestone 2
attr_reader :events
def initialize(params={})
super
@events = []
end
def register
end
def receive(event)
@events << event
end
def close
end
end