serialize access to PipelineBus methods on a per-plugin basis, code cleanup in AddressState

Fixes #10872
This commit is contained in:
Dan Hermann 2019-06-27 11:09:02 -05:00
parent 7d206b78d1
commit d1e92862c3
2 changed files with 66 additions and 55 deletions

View file

@ -11,8 +11,6 @@ public class AddressState {
private final Set<PipelineOutput> outputs = ConcurrentHashMap.newKeySet();
private volatile PipelineInput input = null;
AddressState(String address) {}
/**
* Add the given output and ensure associated input's receivers are updated
* @param output output to be added

View file

@ -1,5 +1,6 @@
package org.logstash.plugins.pipeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.RubyUtil;
@ -33,30 +34,32 @@ public class PipelineBus {
final boolean ensureDelivery) {
if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations
final ConcurrentHashMap<String, AddressState> addressesToInputs = outputsToAddressStates.get(sender);
synchronized (sender) {
final ConcurrentHashMap<String, AddressState> addressesToInputs = outputsToAddressStates.get(sender);
addressesToInputs.forEach((address, addressState) -> {
final Stream<JrubyEventExtLibrary.RubyEvent> clones = events.stream().map(e -> e.rubyClone(RubyUtil.RUBY));
addressesToInputs.forEach((address, addressState) -> {
final Stream<JrubyEventExtLibrary.RubyEvent> clones = events.stream().map(e -> e.rubyClone(RubyUtil.RUBY));
PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
boolean sendWasSuccess = input != null && input.internalReceive(clones);
PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
boolean sendWasSuccess = input != null && input.internalReceive(clones);
// Retry send if the initial one failed
while (ensureDelivery && !sendWasSuccess) {
// We need to refresh the input in case the mapping has updated between loops
String message = String.format("Attempted to send event to '%s' but that address was unavailable. " +
"Maybe the destination pipeline is down or stopping? Will Retry.", address);
logger.warn(message);
input = addressState.getInput();
sendWasSuccess = input != null && input.internalReceive(clones);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Sleep unexpectedly interrupted in bus retry loop", e);
// Retry send if the initial one failed
while (ensureDelivery && !sendWasSuccess) {
// We need to refresh the input in case the mapping has updated between loops
String message = String.format("Attempted to send event to '%s' but that address was unavailable. " +
"Maybe the destination pipeline is down or stopping? Will Retry.", address);
logger.warn(message);
input = addressState.getInput();
sendWasSuccess = input != null && input.internalReceive(clones);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Sleep unexpectedly interrupted in bus retry loop", e);
}
}
}
});
});
}
}
/**
@ -66,16 +69,18 @@ public class PipelineBus {
* @param addresses collection of addresses on which to register this sender
*/
public void registerSender(final PipelineOutput output, final Iterable<String> addresses) {
addresses.forEach((String address) -> {
addressStates.compute(address, (k, value) -> {
final AddressState state = value != null ? value : new AddressState(address);
state.addOutput(output);
synchronized (output) {
addresses.forEach((String address) -> {
addressStates.compute(address, (k, value) -> {
final AddressState state = value != null ? value : new AddressState();
state.addOutput(output);
return state;
return state;
});
});
});
updateOutputReceivers(output);
updateOutputReceivers(output);
}
}
/**
@ -85,17 +90,19 @@ public class PipelineBus {
* @param addresses collection of addresses this sender was registered with
*/
public void unregisterSender(final PipelineOutput output, final Iterable<String> addresses) {
addresses.forEach(address -> {
addressStates.computeIfPresent(address, (k, state) -> {
state.removeOutput(output);
synchronized (output) {
addresses.forEach(address -> {
addressStates.computeIfPresent(address, (k, state) -> {
state.removeOutput(output);
if (state.isEmpty()) return null;
if (state.isEmpty()) return null;
return state;
return state;
});
});
});
outputsToAddressStates.remove(output);
outputsToAddressStates.remove(output);
}
}
/**
@ -125,22 +132,24 @@ public class PipelineBus {
* @return true if the listener successfully subscribed
*/
public boolean listen(final PipelineInput input, final String address) {
final boolean[] result = new boolean[1];
synchronized (input) {
final boolean[] result = new boolean[1];
addressStates.compute(address, (k, value) -> {
AddressState state = value != null ? value : new AddressState(address);
addressStates.compute(address, (k, value) -> {
AddressState state = value != null ? value : new AddressState();
if (state.assignInputIfMissing(input)) {
state.getOutputs().forEach(this::updateOutputReceivers);
result[0] = true;
} else {
result[0] = false;
}
if (state.assignInputIfMissing(input)) {
state.getOutputs().forEach(this::updateOutputReceivers);
result[0] = true;
} else {
result[0] = false;
}
return state;
});
return state;
});
return result[0];
return result[0];
}
}
/**
@ -153,10 +162,12 @@ public class PipelineBus {
* @throws InterruptedException if interrupted while attempting to stop listening
*/
public void unlisten(final PipelineInput input, final String address) throws InterruptedException {
if (isBlockOnUnlisten()) {
unlistenBlock(input, address);
} else {
unlistenNonblock(input, address);
synchronized (input) {
if (isBlockOnUnlisten()) {
unlistenBlock(input, address);
} else {
unlistenNonblock(input, address);
}
}
}
@ -168,7 +179,7 @@ public class PipelineBus {
* @param address Address on which to stop listening
* @throws InterruptedException if interrupted while attempting to stop listening
*/
public void unlistenBlock(final PipelineInput input, final String address) throws InterruptedException {
private void unlistenBlock(final PipelineInput input, final String address) throws InterruptedException {
final boolean[] waiting = {true};
// Block until all senders are done
@ -206,7 +217,8 @@ public class PipelineBus {
* @param input Input that should stop listening
* @param address Address on which to stop listening
*/
public void unlistenNonblock(final PipelineInput input, final String address) {
@VisibleForTesting
void unlistenNonblock(final PipelineInput input, final String address) {
addressStates.computeIfPresent(address, (k, state) -> {
state.unassignInput(input);
state.getOutputs().forEach(this::updateOutputReceivers);
@ -214,7 +226,8 @@ public class PipelineBus {
});
}
public boolean isBlockOnUnlisten() {
@VisibleForTesting
boolean isBlockOnUnlisten() {
return blockOnUnlisten;
}