mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
#7754 ensure we actually test periodic flushing in specs + make flush work without data input data
Fixes #7863
This commit is contained in:
parent
5d2c803847
commit
46dfcad215
2 changed files with 28 additions and 9 deletions
|
@ -224,6 +224,7 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
@running = Concurrent::AtomicBoolean.new(false)
|
@running = Concurrent::AtomicBoolean.new(false)
|
||||||
@flushing = Concurrent::AtomicReference.new(false)
|
@flushing = Concurrent::AtomicReference.new(false)
|
||||||
@force_shutdown = Concurrent::AtomicBoolean.new(false)
|
@force_shutdown = Concurrent::AtomicBoolean.new(false)
|
||||||
|
@outputs_registered = Concurrent::AtomicBoolean.new(false)
|
||||||
end # def initialize
|
end # def initialize
|
||||||
|
|
||||||
def ready?
|
def ready?
|
||||||
|
@ -385,9 +386,9 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
|
|
||||||
def start_workers
|
def start_workers
|
||||||
@worker_threads.clear # In case we're restarting the pipeline
|
@worker_threads.clear # In case we're restarting the pipeline
|
||||||
|
@outputs_registered.make_false
|
||||||
begin
|
begin
|
||||||
register_plugins(@outputs)
|
maybe_setup_out_plugins
|
||||||
register_plugins(@filters)
|
|
||||||
|
|
||||||
pipeline_workers = safe_pipeline_worker_count
|
pipeline_workers = safe_pipeline_worker_count
|
||||||
batch_size = @settings.get("pipeline.batch.size")
|
batch_size = @settings.get("pipeline.batch.size")
|
||||||
|
@ -454,16 +455,17 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
shutdown_requested |= signal.shutdown? # latch on shutdown signal
|
shutdown_requested |= signal.shutdown? # latch on shutdown signal
|
||||||
|
|
||||||
batch = @filter_queue_client.read_batch # metrics are started in read_batch
|
batch = @filter_queue_client.read_batch # metrics are started in read_batch
|
||||||
if (batch.size > 0)
|
if batch.size > 0
|
||||||
@events_consumed.increment(batch.size)
|
@events_consumed.increment(batch.size)
|
||||||
filter_batch(batch)
|
filter_batch(batch)
|
||||||
|
end
|
||||||
flush_filters_to_batch(batch, :final => false) if signal.flush?
|
flush_filters_to_batch(batch, :final => false) if signal.flush?
|
||||||
|
if batch.size > 0
|
||||||
output_batch(batch)
|
output_batch(batch)
|
||||||
unless @force_shutdown.true? # ack the current batch
|
unless @force_shutdown.true? # ack the current batch
|
||||||
@filter_queue_client.close_batch(batch)
|
@filter_queue_client.close_batch(batch)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
|
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
|
||||||
break if (shutdown_requested && !draining_queue?) || @force_shutdown.true?
|
break if (shutdown_requested && !draining_queue?) || @force_shutdown.true?
|
||||||
end
|
end
|
||||||
|
@ -646,11 +648,11 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
# for backward compatibility in devutils for the rspec helpers, this method is not used
|
# for backward compatibility in devutils for the rspec helpers, this method is not used
|
||||||
# in the pipeline anymore.
|
# in the pipeline anymore.
|
||||||
def filter(event, &block)
|
def filter(event, &block)
|
||||||
|
maybe_setup_out_plugins
|
||||||
# filter_func returns all filtered events, including cancelled ones
|
# filter_func returns all filtered events, including cancelled ones
|
||||||
filter_func(event).each { |e| block.call(e) }
|
filter_func(event).each {|e| block.call(e)}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
# perform filters flush and yield flushed event to the passed block
|
# perform filters flush and yield flushed event to the passed block
|
||||||
# @param options [Hash]
|
# @param options [Hash]
|
||||||
# @option options [Boolean] :final => true to signal a final shutdown flush
|
# @option options [Boolean] :final => true to signal a final shutdown flush
|
||||||
|
@ -785,6 +787,13 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
|
def maybe_setup_out_plugins
|
||||||
|
if @outputs_registered.make_true
|
||||||
|
register_plugins(@outputs)
|
||||||
|
register_plugins(@filters)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def default_logging_keys(other_keys = {})
|
def default_logging_keys(other_keys = {})
|
||||||
keys = super
|
keys = super
|
||||||
keys[:thread] = thread.inspect if thread
|
keys[:thread] = thread.inspect if thread
|
||||||
|
|
|
@ -93,11 +93,21 @@ class DummyFlushingFilter < LogStash::Filters::Base
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
def flush(options)
|
def flush(options)
|
||||||
return [::LogStash::Event.new("message" => "dummy_flush")]
|
[::LogStash::Event.new("message" => "dummy_flush")]
|
||||||
end
|
end
|
||||||
def close() end
|
def close() end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class DummyFlushingFilterPeriodic < DummyFlushingFilter
|
||||||
|
config_name "dummyflushingfilterperiodic"
|
||||||
|
|
||||||
|
def flush(options)
|
||||||
|
# Don't generate events on the shutdown flush to make sure we actually test the
|
||||||
|
# periodic flush.
|
||||||
|
options[:final] ? [] : [::LogStash::Event.new("message" => "dummy_flush")]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
class TestPipeline < LogStash::Pipeline
|
class TestPipeline < LogStash::Pipeline
|
||||||
attr_reader :outputs, :settings
|
attr_reader :outputs, :settings
|
||||||
end
|
end
|
||||||
|
@ -627,7 +637,7 @@ describe LogStash::Pipeline do
|
||||||
before do
|
before do
|
||||||
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
|
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(DummyInput)
|
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(DummyInput)
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilter)
|
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic)
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
|
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
|
||||||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue