From d03c0f14a439e8674ec7d47ee9ca539d64e055b0 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 6 May 2019 16:52:12 -0500 Subject: [PATCH] Remove debug code for p2p plus formatting Fixes #10840 --- .../plugins/builtin/pipeline/input.rb | 5 +- .../plugins/pipeline/AddressState.java | 15 ++--- .../plugins/pipeline/PipelineBus.java | 57 ++++++++++--------- .../plugins/pipeline/PipelineInput.java | 5 +- .../plugins/pipeline/PipelineOutput.java | 5 -- 5 files changed, 40 insertions(+), 47 deletions(-) diff --git a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb index 340bd1ef5..e08b8a85e 100644 --- a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb +++ b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb @@ -44,10 +44,7 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input @queue << event end - return true - rescue => e - require 'pry'; binding.pry - return true + true end def stop diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/AddressState.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/AddressState.java index 2b90453d8..0266e59d5 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/AddressState.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/AddressState.java @@ -4,16 +4,14 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** - * Class for representing the state of an internal address. + * Represents the state of an internal address. */ public class AddressState { - private final String address; + private final Set outputs = ConcurrentHashMap.newKeySet(); private volatile PipelineInput input = null; - AddressState(String address) { - this.address = address; - } + AddressState(String address) {} /** * Add the given output and ensure associated input's receivers are updated @@ -38,14 +36,13 @@ public class AddressState { * @return true if successful, false if another input is listening */ public synchronized boolean assignInputIfMissing(PipelineInput newInput) { + // We aren't changing anything if (input == null) { input = newInput; return true; - } else if (input == newInput) { - return true; // We aren't changing anything + } else { + return input == newInput; } - - return false; } /** diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java index 1d5f92f00..11886569a 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java @@ -5,37 +5,35 @@ import org.apache.logging.log4j.Logger; import org.logstash.RubyUtil; import org.logstash.ext.JrubyEventExtLibrary; -import java.util.*; +import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; /** * This class is essentially the communication bus / central state for the `pipeline` inputs/outputs to talk to each - * other. - * - * This class is threadsafe. + * other. This class is threadsafe. */ public class PipelineBus { final ConcurrentHashMap addressStates = new ConcurrentHashMap<>(); final ConcurrentHashMap> outputsToAddressStates = new ConcurrentHashMap<>(); volatile boolean blockOnUnlisten = false; - private static final Logger logger = LogManager.getLogger(PipelineBus.class); + private static final Logger logger = LogManager.getLogger(PipelineBus.class); /** * Sends events from the provided output. - * @param sender The output sending the events. - * @param events A collection of JRuby events + * + * @param sender The output sending the events. + * @param events A collection of JRuby events * @param ensureDelivery set to true if you want this to retry indefinitely in the event an event send fails */ public void sendEvents(final PipelineOutput sender, - final Collection events, - final boolean ensureDelivery) { + final Collection events, + final boolean ensureDelivery) { if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations final ConcurrentHashMap addressesToInputs = outputsToAddressStates.get(sender); - addressesToInputs.forEach( (address, addressState) -> { + addressesToInputs.forEach((address, addressState) -> { final Stream clones = events.stream().map(e -> e.rubyClone(RubyUtil.RUBY)); PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile @@ -61,16 +59,17 @@ public class PipelineBus { /** * Should be called by an output on register - * @param output output to be registered + * + * @param output output to be registered * @param addresses collection of addresses on which to register this sender */ public void registerSender(final PipelineOutput output, final Iterable addresses) { addresses.forEach((String address) -> { addressStates.compute(address, (k, value) -> { - final AddressState state = value != null ? value : new AddressState(address); - state.addOutput(output); + final AddressState state = value != null ? value : new AddressState(address); + state.addOutput(output); - return state; + return state; }); }); @@ -79,7 +78,8 @@ public class PipelineBus { /** * Should be called by an output on close - * @param output output that will be unregistered + * + * @param output output that will be unregistered * @param addresses collection of addresses this sender was registered with */ public void unregisterSender(final PipelineOutput output, final Iterable addresses) { @@ -99,13 +99,14 @@ public class PipelineBus { /** * Updates the internal state for this output to reflect the fact that there may be a change * in the inputs receiving events from it. + * * @param output output to update */ private void updateOutputReceivers(final PipelineOutput output) { outputsToAddressStates.compute(output, (k, value) -> { ConcurrentHashMap outputAddressToInputMapping = value != null ? value : new ConcurrentHashMap<>(); - addressStates.forEach( (address, state) -> { + addressStates.forEach((address, state) -> { if (state.hasOutput(output)) outputAddressToInputMapping.put(address, state); }); @@ -116,7 +117,8 @@ public class PipelineBus { /** * Listens to a given address with the provided listener * Only one listener can listen on an address at a time - * @param input Input to register as listener + * + * @param input Input to register as listener * @param address Address on which to listen * @return true if the listener successfully subscribed */ @@ -124,7 +126,7 @@ public class PipelineBus { final boolean[] result = new boolean[1]; addressStates.compute(address, (k, value) -> { - AddressState state = value != null ? value : new AddressState(address); + AddressState state = value != null ? value : new AddressState(address); if (state.assignInputIfMissing(input)) { state.getOutputs().forEach(this::updateOutputReceivers); @@ -143,7 +145,8 @@ public class PipelineBus { * Stop listening on the given address with the given listener * Will change behavior depending on whether {@link #isBlockOnUnlisten()} is true or not. * Will call a blocking method if it is, a non-blocking one if it isn't - * @param input Input that should stop listening + * + * @param input Input that should stop listening * @param address Address on which the input should stop listening * @throws InterruptedException if interrupted while attempting to stop listening */ @@ -157,7 +160,8 @@ public class PipelineBus { /** * Stop listening on the given address with the given listener - * @param input Input that should stop listening + * + * @param input Input that should stop listening * @param address Address on which to stop listening * @throws InterruptedException if interrupted while attempting to stop listening */ @@ -185,7 +189,7 @@ public class PipelineBus { return state; }); - if (waiting[0] == false) { + if (!waiting[0]) { break; } else { Thread.sleep(100); @@ -195,14 +199,15 @@ public class PipelineBus { /** * Unlisten to use during reloads. This lets upstream outputs block while this input is missing - * @param input Input that should stop listening + * + * @param input Input that should stop listening * @param address Address on which to stop listening */ public void unlistenNonblock(final PipelineInput input, final String address) { addressStates.computeIfPresent(address, (k, state) -> { - state.unassignInput(input); - state.getOutputs().forEach(this::updateOutputReceivers); - return state.isEmpty() ? null : state; + state.unassignInput(input); + state.getOutputs().forEach(this::updateOutputReceivers); + return state.isEmpty() ? null : state; }); } diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineInput.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineInput.java index 0e63864a4..204958a8e 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineInput.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineInput.java @@ -6,15 +6,14 @@ import java.util.stream.Stream; public interface PipelineInput { /** - * Accepts an event - * It might be rejected if the input is stopping + * Accepts an event. It might be rejected if the input is stopping. + * * @param events a collection of events * @return true if the event was successfully received */ boolean internalReceive(Stream events); /** - * * @return true if the input is running */ boolean isRunning(); diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineOutput.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineOutput.java index d80609496..0a110f379 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineOutput.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineOutput.java @@ -1,9 +1,4 @@ package org.logstash.plugins.pipeline; -import org.logstash.ext.JrubyEventExtLibrary; - -import java.util.Map; -import java.util.function.Function; - public interface PipelineOutput { }