mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Make test for pipeline flushing more resilient.
Instead of depending on the now deprecated multiline filter we use a dummy filter that just emits events. This simplifies the test and dramatically reduces timing issues. I also increased the max-wait for the timer just in case Fixes #7024 Fixes #7131
This commit is contained in:
parent
0c282105eb
commit
7e9529fe47
1 changed files with 28 additions and 17 deletions
|
@ -81,6 +81,21 @@ class DummySafeFilter < LogStash::Filters::Base
|
||||||
def close() end
|
def close() end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class DummyFlushingFilter < LogStash::Filters::Base
|
||||||
|
config_name "dummyflushingfilter"
|
||||||
|
milestone 2
|
||||||
|
|
||||||
|
def register() end
|
||||||
|
def filter(event) end
|
||||||
|
def periodic_flush
|
||||||
|
true
|
||||||
|
end
|
||||||
|
def flush(options)
|
||||||
|
return [::LogStash::Event.new("message" => "dummy_flush")]
|
||||||
|
end
|
||||||
|
def close() end
|
||||||
|
end
|
||||||
|
|
||||||
class TestPipeline < LogStash::Pipeline
|
class TestPipeline < LogStash::Pipeline
|
||||||
attr_reader :outputs, :settings
|
attr_reader :outputs, :settings
|
||||||
end
|
end
|
||||||
|
@ -565,23 +580,16 @@ describe LogStash::Pipeline do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "Periodic Flush" do
|
context "Periodic Flush" do
|
||||||
let(:number_of_events) { 100 }
|
|
||||||
let(:config) do
|
let(:config) do
|
||||||
<<-EOS
|
<<-EOS
|
||||||
input {
|
input {
|
||||||
generator {
|
dummy_input {}
|
||||||
count => #{number_of_events}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
filter {
|
filter {
|
||||||
multiline {
|
dummy_flushing_filter {}
|
||||||
pattern => "^NeverMatch"
|
|
||||||
negate => true
|
|
||||||
what => "previous"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
output {
|
output {
|
||||||
dummyoutput {}
|
dummy_output {}
|
||||||
}
|
}
|
||||||
EOS
|
EOS
|
||||||
end
|
end
|
||||||
|
@ -589,24 +597,27 @@ describe LogStash::Pipeline do
|
||||||
|
|
||||||
before do
|
before do
|
||||||
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
|
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
|
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(DummyInput)
|
||||||
|
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilter)
|
||||||
|
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline)
|
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it "flushes the buffered contents of the filter" do
|
it "flush periodically" do
|
||||||
Thread.abort_on_exception = true
|
Thread.abort_on_exception = true
|
||||||
|
|
||||||
pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj)
|
pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj)
|
||||||
t = Thread.new { pipeline.run }
|
t = Thread.new { pipeline.run }
|
||||||
sleep(0.1) until pipeline.ready?
|
sleep(0.1) until pipeline.ready?
|
||||||
wait(3).for do
|
wait(10).for do
|
||||||
# give us a bit of time to flush the events
|
# give us a bit of time to flush the events
|
||||||
output.events.empty?
|
output.events.empty?
|
||||||
end.to be_falsey
|
end.to be_falsey
|
||||||
event = output.events.pop
|
|
||||||
expect(event.get("message").count("\n")).to eq(99)
|
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
|
||||||
|
|
||||||
pipeline.shutdown
|
pipeline.shutdown
|
||||||
|
|
||||||
t.join
|
t.join
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue