The initial implementation of inter-pipeline comms doesn't handle inter-pipeline dependencies correctly.

It just blocks and doesn't handle the concurrency situation. One can think of the network of connected pipelines as a DAG (We explicitly ask users not to create cycles in our docs). In fact there are two different correct answers to the pipeline shutdown and reload problem.

When reloading pipelines we should assume the user understands whatever changes they're making to the topology. If a downstream pipeline is to be removed, we can assume that's intentional. We don't lose any data from upstream pipelines since those block until that downstream pipeline is restored. To accomplish this none of the `PipelineBus` methods block by default.

When shutting down Logstash we must: 1.) not lose in-flight data, and 2.) not deadlock. The only way to do both is to shutdown the pipelines in order. All state changes happen simultaneously on all piping via multithreading. As a result we don't need to implement a Topological sort or other algorithm to order dependencies, we simply issue a shutdown to all pipelines, and have ones that are dependent block waiting for upstream pipelines.

This patch also correctly handles the case where multiple signals cause pipeline actions to be created simultaneously. If we see two concurrent creates or stops, one of those actions becomes a noop.

Currently the Logstash plugin API has lifecycle methods for starting and stopping, but not reloading. We need to call a different method when a `pipeline` input is stopped due to a pipeline reload vs an agent shutdown. Ideally, we would enrich our plugin API. In the interest of expedience here, however, I added a flag to the `PipelineBus` that changes the shutdown mode of `pipeline` inputs, to be either blocking or non-blocking. It is switched to blocking if a shutdown is triggered.

This also reverts b78d32dede in favor of a better more concurrent approach without mutexes

This is a forward port of https://github.com/elastic/logstash/pull/9650 to master / 6.x

Fixes #9656
This commit is contained in:
Andrew Cholakian 2018-05-24 07:56:07 -05:00
parent f439124f81
commit 9311f8b8d0
7 changed files with 203 additions and 79 deletions

View file

@ -1,5 +1,5 @@
[[pipeline-to-pipeline]]
=== Pipeline-to-Pipeline Communication
=== Pipeline-to-Pipeline Communication (Beta)
When using the multiple pipeline feature of Logstash, you may want to connect multiple pipelines within the same Logstash instance. This configuration can be useful to isolate the execution of these pipelines, as well as to help break-up the logic of complex pipelines. The `pipeline` input/output enables a number of advanced architectural patterns discussed later in this document.
@ -41,6 +41,8 @@ By default, the `ensure_delivery` option on the `pipeline` output is set to `tru
A blocked downstream pipeline blocks the sending output/pipeline regardless of the value of the `ensure_delivery` flag.
These delivery guarantees also inform the shutdown behavior of this feature. When performing a pipeline reload, changes will be made immediately as the user requests, even if that means removing a downstream pipeline an upstream pipeline sends to. This will cause the upstream pipeline to block. You must restore the downstream pipeline to cleanly shutdown Logstash. You may issue a force kill, but inflight events may be lost, unless the persistent queue is in use.
[[avoid-cycles]]
===== Avoid cycles

View file

@ -38,7 +38,6 @@ class LogStash::Agent
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
@pipelines = java.util.concurrent.ConcurrentHashMap.new();
@converge_state_mutex = Mutex.new
@name = setting("node.name")
@http_host = setting("http.host")
@ -131,41 +130,35 @@ class LogStash::Agent
end
def converge_state_and_update
# We want to enforce that only one state converge event can happen at a time
# This is especially important in tests, where code will invoke the agent, then
# call this method directly
# TODO: Switch to an explicit queue for pending changes?
@converge_state_mutex.synchronize do
results = @source_loader.fetch
results = @source_loader.fetch
unless results.success?
if auto_reload?
logger.debug("Could not fetch the configuration to converge, will retry", :message => results.error, :retrying_in => @reload_interval)
return
else
raise "Could not fetch the configuration, message: #{results.error}"
end
unless results.success?
if auto_reload?
logger.debug("Could not fetch the configuration to converge, will retry", :message => results.error, :retrying_in => @reload_interval)
return
else
raise "Could not fetch the configuration, message: #{results.error}"
end
# We Lock any access on the pipelines, since the actions will modify the
# content of it.
converge_result = nil
pipeline_actions = resolve_actions(results.response)
converge_result = converge_state(pipeline_actions)
update_metrics(converge_result)
logger.info(
"Pipelines running",
:count => running_pipelines.size,
:running_pipelines => running_pipelines.keys,
:non_running_pipelines => non_running_pipelines.keys
) if converge_result.success? && converge_result.total > 0
dispatch_events(converge_result)
converge_result
end
# We Lock any access on the pipelines, since the actions will modify the
# content of it.
converge_result = nil
pipeline_actions = resolve_actions(results.response)
converge_result = converge_state(pipeline_actions)
update_metrics(converge_result)
logger.info(
"Pipelines running",
:count => running_pipelines.size,
:running_pipelines => running_pipelines.keys,
:non_running_pipelines => non_running_pipelines.keys
) if converge_result.success? && converge_result.total > 0
dispatch_events(converge_result)
converge_result
rescue => e
logger.error("An exception happened when converging configuration", :exception => e.class, :message => e.message, :backtrace => e.backtrace)
end
@ -178,6 +171,10 @@ class LogStash::Agent
end
def shutdown
# Since we're shutting down we need to shutdown the DAG of pipelines that are talking to each other
# in order of dependency.
pipeline_bus.setBlockOnUnlisten(true)
stop_collecting_metrics
stop_webserver
transition_to_stopped

View file

@ -42,9 +42,7 @@ module LogStash module PipelineAction
status = nil
pipelines.compute(pipeline_id) do |id,value|
if value
message = "Attempted to create a pipeline that already exists! This shouldn't be possible"
logger.error(message, :pipeline_id => id, :pipelines => pipelines)
raise message
LogStash::ConvergeResult::ActionResult.create(self, true)
end
status = pipeline.start # block until the pipeline is correctly started or crashed
pipeline # The pipeline is successfully started we can add it to the map

View file

@ -20,10 +20,6 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Output
pipeline_bus.sendEvents(self, events, ensure_delivery)
end
def pipeline_shutting_down?
execution_context.pipeline.inputs.all? {|input| input.stop?}
end
def close
pipeline_bus.unregisterSender(self, @send_to)
end

View file

@ -39,9 +39,14 @@ public class AddressState {
* @return true if successful, false if another input is listening
*/
public synchronized boolean assignInputIfMissing(PipelineInput newInput) {
if (input != newInput && input != null) return false;
this.input = newInput;
return true;
if (input == null) {
input = newInput;
return true;
} else if (input == newInput) {
return true; // We aren't changing anything
}
return false;
}
/**

View file

@ -7,20 +7,20 @@ import org.logstash.ext.JrubyEventExtLibrary;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
/**
* This class is essentially the communication bus / central state for the `pipeline` inputs/outputs to talk to each
* other.
*
* This class is threadsafe. Most method locking is coarse grained with `synchronized` since contention for all these methods
* shouldn't matter
* This class is threadsafe.
*/
public class PipelineBus {
final HashMap<String, AddressState> addressStates = new HashMap<>();
ConcurrentHashMap<PipelineOutput, ConcurrentHashMap<String, AddressState>> outputsToAddressStates = new ConcurrentHashMap<>();
private static final Logger logger = LogManager.getLogger(PipelineBus.class);
final ConcurrentHashMap<String, AddressState> addressStates = new ConcurrentHashMap<>();
final ConcurrentHashMap<PipelineOutput, ConcurrentHashMap<String, AddressState>> outputsToAddressStates = new ConcurrentHashMap<>();
volatile boolean blockOnUnlisten = false;
private static final Logger logger = LogManager.getLogger(PipelineBus.class);
/**
* Sends events from the provided output.
@ -31,6 +31,8 @@ public class PipelineBus {
public void sendEvents(final PipelineOutput sender,
final Collection<JrubyEventExtLibrary.RubyEvent> events,
final boolean ensureDelivery) {
if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations
final ConcurrentHashMap<String, AddressState> addressesToInputs = outputsToAddressStates.get(sender);
addressesToInputs.forEach( (address, addressState) -> {
@ -62,10 +64,14 @@ public class PipelineBus {
* @param output
* @param addresses
*/
public synchronized void registerSender(final PipelineOutput output, final Iterable<String> addresses) {
public void registerSender(final PipelineOutput output, final Iterable<String> addresses) {
addresses.forEach((String address) -> {
final AddressState state = addressStates.computeIfAbsent(address, AddressState::new);
state.addOutput(output);
addressStates.compute(address, (k, value) -> {
final AddressState state = value != null ? value : new AddressState(address);
state.addOutput(output);
return state;
});
});
updateOutputReceivers(output);
@ -76,13 +82,15 @@ public class PipelineBus {
* @param output output that will be unregistered
* @param addresses collection of addresses this sender was registered with
*/
public synchronized void unregisterSender(final PipelineOutput output, final Iterable<String> addresses) {
public void unregisterSender(final PipelineOutput output, final Iterable<String> addresses) {
addresses.forEach(address -> {
final AddressState state = addressStates.get(address);
if (state != null) {
addressStates.computeIfPresent(address, (k, state) -> {
state.removeOutput(output);
if (state.isEmpty()) addressStates.remove(address);
}
if (state.isEmpty()) return null;
return state;
});
});
outputsToAddressStates.remove(output);
@ -93,12 +101,15 @@ public class PipelineBus {
* in the inputs receiving events from it.
* @param output
*/
private synchronized void updateOutputReceivers(final PipelineOutput output) {
ConcurrentHashMap<String, AddressState> outputAddressToInputMapping =
outputsToAddressStates.computeIfAbsent(output, o -> new ConcurrentHashMap<>());
private void updateOutputReceivers(final PipelineOutput output) {
outputsToAddressStates.compute(output, (k, value) -> {
ConcurrentHashMap<String, AddressState> outputAddressToInputMapping = value != null ? value : new ConcurrentHashMap<>();
addressStates.forEach( (address, state) -> {
if (state.hasOutput(output)) outputAddressToInputMapping.put(address, state);
addressStates.forEach( (address, state) -> {
if (state.hasOutput(output)) outputAddressToInputMapping.put(address, state);
});
return outputAddressToInputMapping;
});
}
@ -109,13 +120,38 @@ public class PipelineBus {
* @param input
* @return true if the listener successfully subscribed
*/
public synchronized boolean listen(final PipelineInput input, final String address) {
final AddressState state = addressStates.computeIfAbsent(address, AddressState::new);
if (state.assignInputIfMissing(input)) {
state.getOutputs().forEach(this::updateOutputReceivers);
return true;
public boolean listen(final PipelineInput input, final String address) {
final boolean[] result = new boolean[1];
addressStates.compute(address, (k, value) -> {
AddressState state = value != null ? value : new AddressState(address);
if (state.assignInputIfMissing(input)) {
state.getOutputs().forEach(this::updateOutputReceivers);
result[0] = true;
} else {
result[0] = false;
}
return state;
});
return result[0];
}
/**
* Stop listening on the given address with the given listener
* Will change behavior depending on whether {@link #isBlockOnUnlisten()} is true or not.
* Will call a blocking method if it is, a non-blocking one if it isn't
* @param input
* @param address
*/
public void unlisten(final PipelineInput input, final String address) throws InterruptedException {
if (isBlockOnUnlisten()) {
unlistenBlock(input, address);
} else {
unlistenNonblock(input, address);
}
return false;
}
/**
@ -123,12 +159,58 @@ public class PipelineBus {
* @param address
* @param input
*/
public synchronized void unlisten(final PipelineInput input, final String address) {
final AddressState state = addressStates.get(address);
if (state != null) {
state.unassignInput(input);
if (state.isEmpty()) addressStates.remove(address);
state.getOutputs().forEach(this::updateOutputReceivers);
public void unlistenBlock(final PipelineInput input, final String address) throws InterruptedException {
final boolean[] waiting = {true};
// Block until all senders are done
// Outputs shutdown before their connected inputs
while (true) {
addressStates.compute(address, (k, state) -> {
// If this happens the pipeline was asked to shutdown
// twice, so there's no work to do
if (state == null) {
waiting[0] = false;
return null;
}
if (state.getOutputs().isEmpty()) {
state.unassignInput(input);
waiting[0] = false;
return null;
}
return state;
});
if (waiting[0] == false) {
break;
} else {
Thread.sleep(100);
}
}
}
/**
* Unlisten to use during reloads. This lets upstream outputs block while this input is missing
* @param input
* @param address
*/
public void unlistenNonblock(final PipelineInput input, final String address) {
addressStates.computeIfPresent(address, (k, state) -> {
state.unassignInput(input);
state.getOutputs().forEach(this::updateOutputReceivers);
return state.isEmpty() ? null : state;
});
}
public boolean isBlockOnUnlisten() {
return blockOnUnlisten;
}
public void setBlockOnUnlisten(boolean blockOnUnlisten) {
this.blockOnUnlisten = blockOnUnlisten;
}
}

View file

@ -2,7 +2,10 @@ package org.logstash.plugins.pipeline;
import org.junit.Before;
import org.junit.Test;
import static junit.framework.TestCase.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.core.Is.is;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
@ -30,7 +33,7 @@ public class PipelineBusTest {
}
@Test
public void subscribeUnsubscribe() {
public void subscribeUnsubscribe() throws InterruptedException {
assertThat(bus.listen(input, address)).isTrue();
assertThat(bus.addressStates.get(address).getInput()).isSameAs(input);
@ -56,7 +59,7 @@ public class PipelineBusTest {
public void activeSenderPreventsPrune() {
bus.registerSender(output, addresses);
bus.listen(input, address);
bus.unlisten(input, address);
bus.unlistenNonblock(input, address);
assertThat(bus.addressStates.containsKey(address)).isTrue();
bus.unregisterSender(output, addresses);
@ -65,7 +68,7 @@ public class PipelineBusTest {
@Test
public void activeListenerPreventsPrune() {
public void activeListenerPreventsPrune() throws InterruptedException {
bus.registerSender(output, addresses);
bus.listen(input, address);
bus.unregisterSender(output, addresses);
@ -92,7 +95,7 @@ public class PipelineBusTest {
}
@Test
public void listenUnlistenUpdatesOutputReceivers() {
public void listenUnlistenUpdatesOutputReceivers() throws InterruptedException {
bus.registerSender(output, addresses);
bus.listen(input, address);
@ -112,6 +115,12 @@ public class PipelineBusTest {
assertThat(input.eventCount.longValue()).isEqualTo(1L);
}
@Test
public void sendingEmptyListToNowhereStillReturns() {
bus.registerSender(output, Arrays.asList("not_an_address"));
bus.sendEvents(output, Collections.emptyList(), true);
}
@Test
public void missingInputEventuallySucceeds() throws InterruptedException {
bus.registerSender(output, addresses);
@ -141,6 +150,41 @@ public class PipelineBusTest {
assertThat(input.eventCount.longValue()).isEqualTo(1L);
}
@Test
public void whenInDefaultNonBlockingModeInputsShutdownInstantly() throws InterruptedException {
// Test confirms the default. If we decide to change the default we should change this test
assertThat(bus.isBlockOnUnlisten()).isFalse();
bus.registerSender(output, addresses);
bus.listen(input, address);
bus.unlisten(input, address); // This test would block forever if this is not non-block
bus.unregisterSender(output, addresses);
}
@Test
public void whenInBlockingModeInputsShutdownLast() throws InterruptedException {
bus.registerSender(output, addresses);
bus.listen(input, address);
bus.setBlockOnUnlisten(true);
Thread unlistenThread = new Thread( () -> {
try {
bus.unlisten(input, address);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
unlistenThread.start();
// This should unblock the listener thread
bus.unregisterSender(output, addresses);
unlistenThread.join();
assertThat(bus.addressStates).isEmpty();
}
private JrubyEventExtLibrary.RubyEvent rubyEvent() {
return JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY);