mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Add test for ordered pipeline flushing fix
This commit is contained in:
parent
713fbfbf14
commit
ad631e464e
1 changed files with 39 additions and 27 deletions
|
@ -776,8 +776,9 @@ describe LogStash::JavaPipeline do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "Periodic Flush" do
|
context "Periodic Flush" do
|
||||||
let(:config) do
|
shared_examples 'it flushes correctly' do
|
||||||
<<-EOS
|
let(:config) do
|
||||||
|
<<-EOS
|
||||||
input {
|
input {
|
||||||
dummy_input {}
|
dummy_input {}
|
||||||
}
|
}
|
||||||
|
@ -787,37 +788,48 @@ describe LogStash::JavaPipeline do
|
||||||
output {
|
output {
|
||||||
dummy_output {}
|
dummy_output {}
|
||||||
}
|
}
|
||||||
EOS
|
EOS
|
||||||
end
|
|
||||||
let(:output) { ::LogStash::Outputs::DummyOutput.new }
|
|
||||||
|
|
||||||
before do
|
|
||||||
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
|
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(LogStash::Inputs::DummyBlockingInput)
|
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic)
|
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
|
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
|
||||||
end
|
|
||||||
|
|
||||||
it "flush periodically" do
|
|
||||||
Thread.abort_on_exception = true
|
|
||||||
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
|
|
||||||
Timeout.timeout(timeout) do
|
|
||||||
pipeline.start
|
|
||||||
end
|
end
|
||||||
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
|
let(:output) { ::LogStash::Outputs::DummyOutput.new }
|
||||||
wait(10).for do
|
|
||||||
# give us a bit of time to flush the events
|
before do
|
||||||
output.events.empty?
|
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
|
||||||
end.to be_falsey
|
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(LogStash::Inputs::DummyBlockingInput)
|
||||||
|
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic)
|
||||||
|
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
|
||||||
|
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
|
it "flush periodically" do
|
||||||
|
Thread.abort_on_exception = true
|
||||||
|
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
|
||||||
|
Timeout.timeout(timeout) do
|
||||||
|
pipeline.start
|
||||||
|
end
|
||||||
|
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
|
||||||
|
wait(10).for do
|
||||||
|
# give us a bit of time to flush the events
|
||||||
|
output.events.empty?
|
||||||
|
end.to be_falsey
|
||||||
|
end
|
||||||
|
|
||||||
pipeline.shutdown
|
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
|
||||||
|
|
||||||
|
pipeline.shutdown
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
it_behaves_like 'it flushes correctly'
|
||||||
|
|
||||||
|
context 'with pipeline ordered' do
|
||||||
|
before do
|
||||||
|
pipeline_settings_obj.set("pipeline.workers", 1)
|
||||||
|
pipeline_settings_obj.set("pipeline.ordered", true)
|
||||||
|
end
|
||||||
|
it_behaves_like 'it flushes correctly'
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "Periodic Flush that intermittently returns nil" do
|
context "Periodic Flush that intermittently returns nil" do
|
||||||
let(:config) do
|
let(:config) do
|
||||||
<<-EOS
|
<<-EOS
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue