Fix pipeline shutdown ordering

Fixes #10872
This commit is contained in:
Dan Hermann 2019-06-10 13:20:36 -05:00
parent ec07b43a53
commit 7d206b78d1
3 changed files with 31 additions and 6 deletions

View file

@ -48,9 +48,10 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input
end end
def stop def stop
# We stop receiving events before we unlisten to prevent races
@running.set(false) if @running # If register wasn't yet called, no @running!
pipeline_bus.unlisten(self, address) pipeline_bus.unlisten(self, address)
# We stop receiving events _after_ we unlisten to pick up any events sent by upstream outputs that
# have not yet stopped
@running.set(false) if @running # If register wasn't yet called, no @running!
end end
def isRunning def isRunning

View file

@ -101,6 +101,27 @@ describe ::LogStash::Plugins::Builtin::Pipeline do
output.do_close output.do_close
end end
end end
it "stopped input should process events until upstream outputs stop" do
start_input
output.register
pipeline_bus.setBlockOnUnlisten(true)
output.multi_receive([event])
expect(queue.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata)
@thread = Thread.new do
begin
input.do_stop
end
end
sleep 1
output.multi_receive([event])
expect(queue.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata)
output.do_close
end
end end
describe "one output to multiple inputs" do describe "one output to multiple inputs" do

View file

@ -10,10 +10,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* This class is essentially the communication bus / central state for the `pipeline` inputs/outputs to talk to each * This class is the communication bus for the `pipeline` inputs and outputs to talk to each other.
* other. This class is threadsafe. *
* This class is threadsafe.
*/ */
public class PipelineBus { public class PipelineBus {
final ConcurrentHashMap<String, AddressState> addressStates = new ConcurrentHashMap<>(); final ConcurrentHashMap<String, AddressState> addressStates = new ConcurrentHashMap<>();
final ConcurrentHashMap<PipelineOutput, ConcurrentHashMap<String, AddressState>> outputsToAddressStates = new ConcurrentHashMap<>(); final ConcurrentHashMap<PipelineOutput, ConcurrentHashMap<String, AddressState>> outputsToAddressStates = new ConcurrentHashMap<>();
volatile boolean blockOnUnlisten = false; volatile boolean blockOnUnlisten = false;
@ -159,9 +161,10 @@ public class PipelineBus {
} }
/** /**
* Stop listening on the given address with the given listener * Stop listening on the given address with the given listener. Blocks until upstream outputs have
* stopped.
* *
* @param input Input that should stop listening * @param input Input that should stop listening
* @param address Address on which to stop listening * @param address Address on which to stop listening
* @throws InterruptedException if interrupted while attempting to stop listening * @throws InterruptedException if interrupted while attempting to stop listening
*/ */