#7754 ensure we actually test periodic flushing in specs + make flush work without data input data

Fixes #7863
This commit is contained in:
Armin 2017-08-01 12:22:15 +02:00 committed by Andrew Cholakian
parent 563d5720bd
commit ee7e6fb932
2 changed files with 28 additions and 9 deletions

View file

@ -231,6 +231,7 @@ module LogStash; class Pipeline < BasePipeline
@running = Concurrent::AtomicBoolean.new(false)
@flushing = Concurrent::AtomicReference.new(false)
@force_shutdown = Concurrent::AtomicBoolean.new(false)
@outputs_registered = Concurrent::AtomicBoolean.new(false)
end # def initialize
def ready?
@ -392,9 +393,9 @@ module LogStash; class Pipeline < BasePipeline
def start_workers
@worker_threads.clear # In case we're restarting the pipeline
@outputs_registered.make_false
begin
register_plugins(@outputs)
register_plugins(@filters)
maybe_setup_out_plugins
pipeline_workers = safe_pipeline_worker_count
batch_size = @settings.get("pipeline.batch.size")
@ -460,16 +461,17 @@ module LogStash; class Pipeline < BasePipeline
shutdown_requested |= signal.shutdown? # latch on shutdown signal
batch = @filter_queue_client.read_batch # metrics are started in read_batch
if (batch.size > 0)
if batch.size > 0
@events_consumed.increment(batch.size)
filter_batch(batch)
flush_filters_to_batch(batch, :final => false) if signal.flush?
end
flush_filters_to_batch(batch, :final => false) if signal.flush?
if batch.size > 0
output_batch(batch)
unless @force_shutdown.true? # ack the current batch
@filter_queue_client.close_batch(batch)
end
end
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
break if (shutdown_requested && !draining_queue?) || @force_shutdown.true?
end
@ -652,11 +654,11 @@ module LogStash; class Pipeline < BasePipeline
# for backward compatibility in devutils for the rspec helpers, this method is not used
# in the pipeline anymore.
def filter(event, &block)
maybe_setup_out_plugins
# filter_func returns all filtered events, including cancelled ones
filter_func(event).each { |e| block.call(e) }
filter_func(event).each {|e| block.call(e)}
end
# perform filters flush and yield flushed event to the passed block
# @param options [Hash]
# @option options [Boolean] :final => true to signal a final shutdown flush
@ -791,6 +793,13 @@ module LogStash; class Pipeline < BasePipeline
private
def maybe_setup_out_plugins
if @outputs_registered.make_true
register_plugins(@outputs)
register_plugins(@filters)
end
end
def default_logging_keys(other_keys = {})
keys = super
keys[:thread] = thread.inspect if thread

View file

@ -93,11 +93,21 @@ class DummyFlushingFilter < LogStash::Filters::Base
true
end
def flush(options)
return [::LogStash::Event.new("message" => "dummy_flush")]
[::LogStash::Event.new("message" => "dummy_flush")]
end
def close() end
end
class DummyFlushingFilterPeriodic < DummyFlushingFilter
config_name "dummyflushingfilterperiodic"
def flush(options)
# Don't generate events on the shutdown flush to make sure we actually test the
# periodic flush.
options[:final] ? [] : [::LogStash::Event.new("message" => "dummy_flush")]
end
end
class TestPipeline < LogStash::Pipeline
attr_reader :outputs, :settings
end
@ -627,7 +637,7 @@ describe LogStash::Pipeline do
before do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilter)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
end