update pipeline to call teardown of output workers when :workers > 1

Fixes #2180
This commit is contained in:
Tal Levy 2014-12-04 16:25:48 -08:00 committed by Jordan Sissel
parent bbb4fbb204
commit d6cb066e03
3 changed files with 138 additions and 15 deletions

View file

@ -33,6 +33,8 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1
attr_reader :worker_plugins
public
def workers_not_supported(message=nil)
return if @workers == 1
@ -62,20 +64,20 @@ class LogStash::Outputs::Base < LogStash::Plugin
public
def worker_setup
return unless @workers > 1
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_threads = @workers.times do |i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}")
worker_params = params.merge("workers" => 1, "codec" => @codec.clone)
worker_plugin = self.class.new(worker_params)
worker_plugin.register
while true
event = queue.pop
worker_plugin.handle(event)
if @workers == 1
@worker_plugins = [self]
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_plugins = @workers.times.map { self.class.new(params.merge("workers" => 1, "codec" => @codec.clone)) }
@worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}")
plugin.register
while true
event = queue.pop
plugin.handle(event)
end
end
end
end

View file

@ -225,12 +225,16 @@ class LogStash::Pipeline
def outputworker
LogStash::Util::set_thread_name(">output")
@outputs.each(&:worker_setup)
while true
event = @filter_to_output.pop
break if event.is_a?(LogStash::ShutdownEvent)
output(event)
end # while true
@outputs.each(&:teardown)
@outputs.each do |output|
output.worker_plugins.each(&:teardown)
end
end # def outputworker
# Shutdown this pipeline.

117
spec/core/pipeline_spec.rb Normal file
View file

@ -0,0 +1,117 @@
require "logstash/devutils/rspec/spec_helper"
class DummyInput < LogStash::Inputs::Base
config_name "dummyinput"
milestone 2
def register
end
def run(queue)
end
def teardown
end
end
class DummyCodec < LogStash::Codecs::Base
config_name "dummycodec"
milestone 2
def decode(data)
data
end
def encode(event)
event
end
def teardown
end
end
class DummyOutput < LogStash::Outputs::Base
config_name "dummyoutput"
milestone 2
attr_reader :num_teardowns
def initialize(params={})
super
@num_teardowns = 0
end
def register
end
def receive(event)
end
def teardown
@num_teardowns += 1
end
end
class TestPipeline < LogStash::Pipeline
attr_reader :outputs
end
describe LogStash::Pipeline do
before(:each) do
LogStash::Plugin.stub(:lookup)
.with("input", "dummyinput").and_return(DummyInput)
LogStash::Plugin.stub(:lookup)
.with("codec", "plain").and_return(DummyCodec)
LogStash::Plugin.stub(:lookup)
.with("output", "dummyoutput").and_return(DummyOutput)
end
let(:test_config_without_output_workers) {
<<-eos
input {
dummyinput {}
}
output {
dummyoutput {}
}
eos
}
let(:test_config_with_output_workers) {
<<-eos
input {
dummyinput {}
}
output {
dummyoutput {
workers => 2
}
}
eos
}
context "output teardown" do
it "should call teardown of output without output-workers" do
pipeline = TestPipeline.new(test_config_without_output_workers)
pipeline.run
insist { pipeline.outputs.size } == 1
insist { pipeline.outputs.first.worker_plugins.size } == 1
insist { pipeline.outputs.first.worker_plugins.first.num_teardowns } == 1
end
it "should call output teardown correctly with output workers" do
pipeline = TestPipeline.new(test_config_with_output_workers)
pipeline.run
insist { pipeline.outputs.size } == 1
insist { pipeline.outputs.first.num_teardowns } == 0
pipeline.outputs.first.worker_plugins.each do |plugin|
insist { plugin.num_teardowns } == 1
end
end
end
end