mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
8a048ccc27
commit
d03c0f14a4
5 changed files with 40 additions and 47 deletions
|
@ -44,10 +44,7 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input
|
||||||
@queue << event
|
@queue << event
|
||||||
end
|
end
|
||||||
|
|
||||||
return true
|
true
|
||||||
rescue => e
|
|
||||||
require 'pry'; binding.pry
|
|
||||||
return true
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def stop
|
def stop
|
||||||
|
|
|
@ -4,16 +4,14 @@ import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class for representing the state of an internal address.
|
* Represents the state of an internal address.
|
||||||
*/
|
*/
|
||||||
public class AddressState {
|
public class AddressState {
|
||||||
private final String address;
|
|
||||||
private final Set<PipelineOutput> outputs = ConcurrentHashMap.newKeySet();
|
private final Set<PipelineOutput> outputs = ConcurrentHashMap.newKeySet();
|
||||||
private volatile PipelineInput input = null;
|
private volatile PipelineInput input = null;
|
||||||
|
|
||||||
AddressState(String address) {
|
AddressState(String address) {}
|
||||||
this.address = address;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add the given output and ensure associated input's receivers are updated
|
* Add the given output and ensure associated input's receivers are updated
|
||||||
|
@ -38,14 +36,13 @@ public class AddressState {
|
||||||
* @return true if successful, false if another input is listening
|
* @return true if successful, false if another input is listening
|
||||||
*/
|
*/
|
||||||
public synchronized boolean assignInputIfMissing(PipelineInput newInput) {
|
public synchronized boolean assignInputIfMissing(PipelineInput newInput) {
|
||||||
|
// We aren't changing anything
|
||||||
if (input == null) {
|
if (input == null) {
|
||||||
input = newInput;
|
input = newInput;
|
||||||
return true;
|
return true;
|
||||||
} else if (input == newInput) {
|
} else {
|
||||||
return true; // We aren't changing anything
|
return input == newInput;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -5,37 +5,35 @@ import org.apache.logging.log4j.Logger;
|
||||||
import org.logstash.RubyUtil;
|
import org.logstash.RubyUtil;
|
||||||
import org.logstash.ext.JrubyEventExtLibrary;
|
import org.logstash.ext.JrubyEventExtLibrary;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.Collection;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is essentially the communication bus / central state for the `pipeline` inputs/outputs to talk to each
|
* This class is essentially the communication bus / central state for the `pipeline` inputs/outputs to talk to each
|
||||||
* other.
|
* other. This class is threadsafe.
|
||||||
*
|
|
||||||
* This class is threadsafe.
|
|
||||||
*/
|
*/
|
||||||
public class PipelineBus {
|
public class PipelineBus {
|
||||||
final ConcurrentHashMap<String, AddressState> addressStates = new ConcurrentHashMap<>();
|
final ConcurrentHashMap<String, AddressState> addressStates = new ConcurrentHashMap<>();
|
||||||
final ConcurrentHashMap<PipelineOutput, ConcurrentHashMap<String, AddressState>> outputsToAddressStates = new ConcurrentHashMap<>();
|
final ConcurrentHashMap<PipelineOutput, ConcurrentHashMap<String, AddressState>> outputsToAddressStates = new ConcurrentHashMap<>();
|
||||||
volatile boolean blockOnUnlisten = false;
|
volatile boolean blockOnUnlisten = false;
|
||||||
private static final Logger logger = LogManager.getLogger(PipelineBus.class);
|
private static final Logger logger = LogManager.getLogger(PipelineBus.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends events from the provided output.
|
* Sends events from the provided output.
|
||||||
* @param sender The output sending the events.
|
*
|
||||||
* @param events A collection of JRuby events
|
* @param sender The output sending the events.
|
||||||
|
* @param events A collection of JRuby events
|
||||||
* @param ensureDelivery set to true if you want this to retry indefinitely in the event an event send fails
|
* @param ensureDelivery set to true if you want this to retry indefinitely in the event an event send fails
|
||||||
*/
|
*/
|
||||||
public void sendEvents(final PipelineOutput sender,
|
public void sendEvents(final PipelineOutput sender,
|
||||||
final Collection<JrubyEventExtLibrary.RubyEvent> events,
|
final Collection<JrubyEventExtLibrary.RubyEvent> events,
|
||||||
final boolean ensureDelivery) {
|
final boolean ensureDelivery) {
|
||||||
if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations
|
if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations
|
||||||
|
|
||||||
final ConcurrentHashMap<String, AddressState> addressesToInputs = outputsToAddressStates.get(sender);
|
final ConcurrentHashMap<String, AddressState> addressesToInputs = outputsToAddressStates.get(sender);
|
||||||
|
|
||||||
addressesToInputs.forEach( (address, addressState) -> {
|
addressesToInputs.forEach((address, addressState) -> {
|
||||||
final Stream<JrubyEventExtLibrary.RubyEvent> clones = events.stream().map(e -> e.rubyClone(RubyUtil.RUBY));
|
final Stream<JrubyEventExtLibrary.RubyEvent> clones = events.stream().map(e -> e.rubyClone(RubyUtil.RUBY));
|
||||||
|
|
||||||
PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
|
PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
|
||||||
|
@ -61,16 +59,17 @@ public class PipelineBus {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should be called by an output on register
|
* Should be called by an output on register
|
||||||
* @param output output to be registered
|
*
|
||||||
|
* @param output output to be registered
|
||||||
* @param addresses collection of addresses on which to register this sender
|
* @param addresses collection of addresses on which to register this sender
|
||||||
*/
|
*/
|
||||||
public void registerSender(final PipelineOutput output, final Iterable<String> addresses) {
|
public void registerSender(final PipelineOutput output, final Iterable<String> addresses) {
|
||||||
addresses.forEach((String address) -> {
|
addresses.forEach((String address) -> {
|
||||||
addressStates.compute(address, (k, value) -> {
|
addressStates.compute(address, (k, value) -> {
|
||||||
final AddressState state = value != null ? value : new AddressState(address);
|
final AddressState state = value != null ? value : new AddressState(address);
|
||||||
state.addOutput(output);
|
state.addOutput(output);
|
||||||
|
|
||||||
return state;
|
return state;
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -79,7 +78,8 @@ public class PipelineBus {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should be called by an output on close
|
* Should be called by an output on close
|
||||||
* @param output output that will be unregistered
|
*
|
||||||
|
* @param output output that will be unregistered
|
||||||
* @param addresses collection of addresses this sender was registered with
|
* @param addresses collection of addresses this sender was registered with
|
||||||
*/
|
*/
|
||||||
public void unregisterSender(final PipelineOutput output, final Iterable<String> addresses) {
|
public void unregisterSender(final PipelineOutput output, final Iterable<String> addresses) {
|
||||||
|
@ -99,13 +99,14 @@ public class PipelineBus {
|
||||||
/**
|
/**
|
||||||
* Updates the internal state for this output to reflect the fact that there may be a change
|
* Updates the internal state for this output to reflect the fact that there may be a change
|
||||||
* in the inputs receiving events from it.
|
* in the inputs receiving events from it.
|
||||||
|
*
|
||||||
* @param output output to update
|
* @param output output to update
|
||||||
*/
|
*/
|
||||||
private void updateOutputReceivers(final PipelineOutput output) {
|
private void updateOutputReceivers(final PipelineOutput output) {
|
||||||
outputsToAddressStates.compute(output, (k, value) -> {
|
outputsToAddressStates.compute(output, (k, value) -> {
|
||||||
ConcurrentHashMap<String, AddressState> outputAddressToInputMapping = value != null ? value : new ConcurrentHashMap<>();
|
ConcurrentHashMap<String, AddressState> outputAddressToInputMapping = value != null ? value : new ConcurrentHashMap<>();
|
||||||
|
|
||||||
addressStates.forEach( (address, state) -> {
|
addressStates.forEach((address, state) -> {
|
||||||
if (state.hasOutput(output)) outputAddressToInputMapping.put(address, state);
|
if (state.hasOutput(output)) outputAddressToInputMapping.put(address, state);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -116,7 +117,8 @@ public class PipelineBus {
|
||||||
/**
|
/**
|
||||||
* Listens to a given address with the provided listener
|
* Listens to a given address with the provided listener
|
||||||
* Only one listener can listen on an address at a time
|
* Only one listener can listen on an address at a time
|
||||||
* @param input Input to register as listener
|
*
|
||||||
|
* @param input Input to register as listener
|
||||||
* @param address Address on which to listen
|
* @param address Address on which to listen
|
||||||
* @return true if the listener successfully subscribed
|
* @return true if the listener successfully subscribed
|
||||||
*/
|
*/
|
||||||
|
@ -124,7 +126,7 @@ public class PipelineBus {
|
||||||
final boolean[] result = new boolean[1];
|
final boolean[] result = new boolean[1];
|
||||||
|
|
||||||
addressStates.compute(address, (k, value) -> {
|
addressStates.compute(address, (k, value) -> {
|
||||||
AddressState state = value != null ? value : new AddressState(address);
|
AddressState state = value != null ? value : new AddressState(address);
|
||||||
|
|
||||||
if (state.assignInputIfMissing(input)) {
|
if (state.assignInputIfMissing(input)) {
|
||||||
state.getOutputs().forEach(this::updateOutputReceivers);
|
state.getOutputs().forEach(this::updateOutputReceivers);
|
||||||
|
@ -143,7 +145,8 @@ public class PipelineBus {
|
||||||
* Stop listening on the given address with the given listener
|
* Stop listening on the given address with the given listener
|
||||||
* Will change behavior depending on whether {@link #isBlockOnUnlisten()} is true or not.
|
* Will change behavior depending on whether {@link #isBlockOnUnlisten()} is true or not.
|
||||||
* Will call a blocking method if it is, a non-blocking one if it isn't
|
* Will call a blocking method if it is, a non-blocking one if it isn't
|
||||||
* @param input Input that should stop listening
|
*
|
||||||
|
* @param input Input that should stop listening
|
||||||
* @param address Address on which the input should stop listening
|
* @param address Address on which the input should stop listening
|
||||||
* @throws InterruptedException if interrupted while attempting to stop listening
|
* @throws InterruptedException if interrupted while attempting to stop listening
|
||||||
*/
|
*/
|
||||||
|
@ -157,7 +160,8 @@ public class PipelineBus {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop listening on the given address with the given listener
|
* Stop listening on the given address with the given listener
|
||||||
* @param input Input that should stop listening
|
*
|
||||||
|
* @param input Input that should stop listening
|
||||||
* @param address Address on which to stop listening
|
* @param address Address on which to stop listening
|
||||||
* @throws InterruptedException if interrupted while attempting to stop listening
|
* @throws InterruptedException if interrupted while attempting to stop listening
|
||||||
*/
|
*/
|
||||||
|
@ -185,7 +189,7 @@ public class PipelineBus {
|
||||||
return state;
|
return state;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (waiting[0] == false) {
|
if (!waiting[0]) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
@ -195,14 +199,15 @@ public class PipelineBus {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unlisten to use during reloads. This lets upstream outputs block while this input is missing
|
* Unlisten to use during reloads. This lets upstream outputs block while this input is missing
|
||||||
* @param input Input that should stop listening
|
*
|
||||||
|
* @param input Input that should stop listening
|
||||||
* @param address Address on which to stop listening
|
* @param address Address on which to stop listening
|
||||||
*/
|
*/
|
||||||
public void unlistenNonblock(final PipelineInput input, final String address) {
|
public void unlistenNonblock(final PipelineInput input, final String address) {
|
||||||
addressStates.computeIfPresent(address, (k, state) -> {
|
addressStates.computeIfPresent(address, (k, state) -> {
|
||||||
state.unassignInput(input);
|
state.unassignInput(input);
|
||||||
state.getOutputs().forEach(this::updateOutputReceivers);
|
state.getOutputs().forEach(this::updateOutputReceivers);
|
||||||
return state.isEmpty() ? null : state;
|
return state.isEmpty() ? null : state;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,15 +6,14 @@ import java.util.stream.Stream;
|
||||||
|
|
||||||
public interface PipelineInput {
|
public interface PipelineInput {
|
||||||
/**
|
/**
|
||||||
* Accepts an event
|
* Accepts an event. It might be rejected if the input is stopping.
|
||||||
* It might be rejected if the input is stopping
|
*
|
||||||
* @param events a collection of events
|
* @param events a collection of events
|
||||||
* @return true if the event was successfully received
|
* @return true if the event was successfully received
|
||||||
*/
|
*/
|
||||||
boolean internalReceive(Stream<JrubyEventExtLibrary.RubyEvent> events);
|
boolean internalReceive(Stream<JrubyEventExtLibrary.RubyEvent> events);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @return true if the input is running
|
* @return true if the input is running
|
||||||
*/
|
*/
|
||||||
boolean isRunning();
|
boolean isRunning();
|
||||||
|
|
|
@ -1,9 +1,4 @@
|
||||||
package org.logstash.plugins.pipeline;
|
package org.logstash.plugins.pipeline;
|
||||||
|
|
||||||
import org.logstash.ext.JrubyEventExtLibrary;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
public interface PipelineOutput {
|
public interface PipelineOutput {
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue