Support for inter-pipeline comms with a new pipeline input/output

This also makes the load / reload cycle of pipelines threadsafe
and concurrent in the Agent class.

Fixes #9225
This commit is contained in:
Andrew Cholakian 2018-03-01 17:33:45 -08:00
parent 8bc137b437
commit a1c0e417e5
28 changed files with 1062 additions and 120 deletions

View file

@ -0,0 +1,200 @@
[[pipeline-to-pipeline]]
=== Configuring Pipeline-to-Pipeline Communication
When using the multiple pipeline feature of Logstash you may want to connect multiple pipelines on the same Logstash instance together. This can be useful to isolate the execution of these pipelines, as well as to help break-up the logic of complex pipelines. The `pipeline` input / output enables a number of advanced architectural patterns discussed later in this document.
Where communication is needed between Logstash instances you will need to use either {logstash-ref}/ls-to-ls.html[Logstash-to-Logstash] communications, or an intermediary queue, such as Kafka or Redis.
[[pipeline-to-pipeline-overview]]
==== Configuration overview
Use the `pipeline` input and `pipeline` output to connect two Logstash pipelines running within the same instance. These inputs use a client / server approach, where the `pipeline` input registers a virtual address that a `pipeline` output can connect to.
. Create a 'downstream' pipeline that listens for events on a virtual address.
. Create an 'upstream' pipeline that produces events, sending them through a `pipeline` output to one or more virtual addresses
A simple example of this configuration can be seen in the below example.
[source,yaml]
----
# config/pipelines.yml
- pipeline.id: upstream
config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } }
- pipeline.id: downstream
config.string: input { pipeline { address => myVirtualAddress } }
----
[[how-it-works]]
===== How it works
The `pipeline` input acts as a virtual server listening on a single virtual address in the local process. Only `pipeline` outputs running on the same local Logstash can send events to this address. Pipeline `outputs` can send events to a list of virtual addresses. A `pipeline` output will block if the downstream pipeline is either unavailable or blocked.
When events are sent across pipelines their data is fully copied. Modifications to an event in a downstream pipeline will not affect any other pipelines that event may be used within.
Copying events does, however incur a performance cost. While the `pipeline` plugin may be the most efficient way to communicate between pipelines it still does incur a cost. Logstash must duplicate each event in full on the Java heap for each downstream pipeline a `pipeline` output sends to. Beware that using this feature may affect the heap memory utilization of Logstash.
[[delivery-guarantees]]
===== Delivery Guarantees
In its standard configuration the `pipeline` input/output have at-least-once delivery guarantees. The output wil block if the address is unavailable or blocked.
By default, the `ensure_delivery` option on the `pipeline` output is set to `true. If the `ensure_delivery` flag is set to `false`, an unavailable downstream pipeline will cause the sent message to be discarded. A blocked downstream pipeline will block the sending output/pipeline regardless of the value of the `ensure_delivery` flag.
[[avoid-cycles]]
===== Avoid cycles
It is important when connecting pipelines that the data only flow in one direction. Looping data back around, or connecting the pipelines into a cyclic graph, can cause problems. Logstash waits for each pipeline's work to complete before shutting down. If the pipelines loop data between them this can prevent Logstash from cleanly shutting down.
[[architectural-patterns]]
==== Architectural patterns
You can use the `pipeline` input and output to better organize code, streamline control flow, and isolate the performance of complex configurations. There are an infinite number of ways to connect pipelines. The ones presented here are hardly comprehensive.
* <<distributor-pattern>>
* <<output-isolator-pattern>>
* <<forked-path-pattern>>
[[distributor-pattern]]
====== The distributor pattern
The Distributor pattern is used in situations where there are multiple types of data coming through a single input, each with its own complex set of processing rules. With the distributor pattern one pipeline is used to route data to other pipelines based on type. Each type is routed to a pipeline with only the logic for handling that type. In this way each type's logic can be isolated.
As an example, in many organizations a single beats input may be used to receive traffic from a variety of sources, each with its own processing logic. A common way of dealing with this type of data is to have a number of `if` conditions separating the traffic and processing each type differently. This approach can quickly get messy when configs are long and complex.
An example distributor configuration is listed below:
[source,yaml]
----
# config/pipelines.yml
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [type] == apache {
pipeline { send_to => weblogs }
} else if [type] == system {
pipeline { send_to => syslog }
} else {
pipeline { send_to => fallback }
}
}
- pipeline.id: weblog-processing
config.string: |
input { pipeline { address => weblogs } }
filter {
# Weblog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_a_host] }
}
- pipeline.id: syslog-processing
config.string: |
input { pipeline { address => syslog } }
filter {
# Syslog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_b_host] }
}
- pipeline.id: fallback-processing
config.string: |
input { pipeline { address => fallback } }
output { elasticsearch { hosts => [es_cluster_b_host] } }
----
Notice how following the flow of data is a simple due to the fact that each pipeline only works on a single specific task.
[[output-isolator-pattern]]
==== The output isolator pattern
The output isolator pattern is used to prevent Logstash from blocking in the case where there are multiple outputs and one output experiences a temporary failure. For example, a server might be configured to send log data to both Elasticsearch and an HTTP endpoint. It might be the case that the HTTP endpoint is frequently unavailable due to regular service or some other reason.
Logstash, by default, will block when any single output is down. This is an important behavior in guaranteeing at-least-once delivery of data. Unfortunately, in our above scenario this means that whenever the HTTP endpoint is down data is also paused from sending to Elasticsearch. Using the `pipeline` input and output, along with persistent queues, we can continue sending to Elasticsearch even when one output is down, by using the output isolator pattern.
We could employ this pattern for the scenario described above with the following config:
[[source,yaml]]
----
# config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => es } }
output { elasticsearch { } }
- pipeline.id: buffered-http
queue.type: persisted
config.string: |
input { pipeline { address => http } }
output { http { } }
----
Please note, that in this architecture, each stage has its own queue, with its own tuning and settings. This would use up to three times as much disk space, and incur three times as much serialization / deserialization cost, as a single pipeline.
[[forked-path-pattern]]
==== The forked path pattern
The forked path pattern is used in situations where a single event must be processed more than once according to different sets of rules. If not using the `pipeline` input and output this is commonly solved through creative use of the `clone` filter and `if/else` rules.
As an example, let's imagine that we have a use case where we receive data, and index the full event in our own systems, but publish a redacted version of the data to a partner's S3 bucket. We might use the output isolator pattern described above to decouple our writes to either system. The distinguishing feature of the forked path pattern is the existence of additional rules in the downstream pipelines.
An example of this pattern is in the following config:
[[source,yaml]]
----
# config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => partner } }
# Index the full event
output { elasticsearch { } }
- pipeline.id: partner
queue.type: persisted
config.string: |
input { pipeline { address => s3 } }
filter {
# Remove the sensitive data
mutate { remove_field => 'sensitive-data' }
}
output { s3 { } } # Output to partner's bucket
----
[[collector-pattern]]
==== The collector pattern
The collector pattern is used in situations where you want to define a common set of outputs and pre-output filters that many disparate pipelines might use. This is the opposite of the distributor pattern. In this pattern many pipelines fan in to a single pipeline where outputs and other processing are shared. This pattern simplifies configuration at the cost of reducing isolation, since all data is sent through a single pipeline.
An example of this pattern can be seen below:
[[source,yaml]]
----
# config/pipelines.yml
- pipeline.id: beats
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
config.string: |
input { kafka { ... } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
# This common pipeline enforces the same logic whether data comes from Kafka or Beats
config.string: |
input { pipeline { address => commonOu } }
filter {
# Always remove sensitive data from all input sources
mutate { remove_field => 'sensitive-data' }
}
output { elasticsearch { } }
----

View file

@ -25,7 +25,7 @@ class LogStash::Agent
include LogStash::Util::Loggable
STARTED_AT = Time.now.freeze
attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id
attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipelines, :pipeline_bus
attr_accessor :logger
# initialize method for LogStash::Agent
@ -39,9 +39,10 @@ class LogStash::Agent
@auto_reload = setting("config.reload.automatic")
@ephemeral_id = SecureRandom.uuid
# Do not use @pipelines directly. Use #with_pipelines which does locking
@pipelines = {}
@pipelines_lock = java.util.concurrent.locks.ReentrantLock.new
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
@pipelines = java.util.concurrent.ConcurrentHashMap.new();
@name = setting("node.name")
@http_host = setting("http.host")
@ -133,17 +134,6 @@ class LogStash::Agent
!@running.value
end
# Safely perform an operation on the pipelines hash
# Using the correct synchronization
def with_pipelines
begin
@pipelines_lock.lock
yield @pipelines
ensure
@pipelines_lock.unlock
end
end
def converge_state_and_update
results = @source_loader.fetch
@ -160,14 +150,17 @@ class LogStash::Agent
# content of it.
converge_result = nil
# we don't use the variable here, but we want the locking
with_pipelines do |pipelines|
pipeline_actions = resolve_actions(results.response)
converge_result = converge_state(pipeline_actions)
update_metrics(converge_result)
end
report_currently_running_pipelines(converge_result)
logger.info(
"Pipelines running",
:count => running_pipelines.size,
:running_pipelines => running_pipelines.keys,
:non_running_pipelines => non_running_pipelines.keys
)
dispatch_events(converge_result)
converge_result
@ -229,21 +222,19 @@ class LogStash::Agent
end
def get_pipeline(pipeline_id)
with_pipelines do |pipelines|
pipelines[pipeline_id]
end
pipelines.get(pipeline_id)
end
def pipelines_count
with_pipelines do |pipelines|
pipelines.size
end
def running_pipelines
pipelines.select {|id,pipeline| running_pipeline?(id) }
end
def with_running_pipelines
with_pipelines do |pipelines|
yield pipelines.select {|pipeline_id, _| running_pipeline?(pipeline_id) }
end
def non_running_pipelines
pipelines.select {|id,pipeline| !running_pipeline?(id) }
end
def running_pipelines?
@ -251,25 +242,19 @@ class LogStash::Agent
end
def running_pipelines_count
with_running_pipelines do |pipelines|
pipelines.size
end
running_pipelines.size
end
def running_user_defined_pipelines?
with_running_user_defined_pipelines do |pipelines|
pipelines.size > 0
!running_user_defined_pipelines.empty?
end
def running_user_defined_pipelines
pipelines.select {|id, pipeline| running_pipeline?(id) && !pipeline.system? }
end
def with_running_user_defined_pipelines
with_pipelines do |pipelines|
found = pipelines.select do |_, pipeline|
pipeline.running? && !pipeline.system?
end
yield found
end
yield running_user_defined_pipelines
end
private
@ -296,7 +281,9 @@ class LogStash::Agent
converge_result = LogStash::ConvergeResult.new(pipeline_actions.size)
pipeline_actions.each do |action|
pipeline_actions.map do |action|
Thread.new do
java.lang.Thread.currentThread().setName("Converge #{action}");
# We execute every task we need to converge the current state of pipelines
# for every task we will record the action result, that will help us
# the results of all the task will determine if the converge was successful or not
@ -309,7 +296,6 @@ class LogStash::Agent
#
# This give us a bit more extensibility with the current startup/validation model
# that we currently have.
with_pipelines do |pipelines|
begin
logger.debug("Executing action", :action => action)
action_result = action.execute(self, pipelines)
@ -327,7 +313,7 @@ class LogStash::Agent
converge_result.add(action, e)
end
end
end
end.each(&:join)
if logger.trace?
logger.trace("Converge results", :success => converge_result.success?,
@ -339,18 +325,7 @@ class LogStash::Agent
end
def resolve_actions(pipeline_configs)
with_pipelines do |pipelines|
@state_resolver.resolve(pipelines, pipeline_configs)
end
end
def report_currently_running_pipelines(converge_result)
if converge_result.success? && converge_result.total > 0
with_running_pipelines do |pipelines|
number_of_running_pipeline = pipelines.size
logger.info("Pipelines running", :count => number_of_running_pipeline, :pipelines => pipelines.values.collect(&:pipeline_id) )
end
end
@state_resolver.resolve(@pipelines, pipeline_configs)
end
def dispatch_events(converge_results)
@ -413,22 +388,20 @@ class LogStash::Agent
# In this context I could just call shutdown, but I've decided to
# use the stop action implementation for that so we have the same code.
# This also give us some context into why a shutdown is failing
with_pipelines do |pipelines|
pipeline_actions = resolve_actions([]) # We stop all the pipeline, so we converge to a empty state
converge_state(pipeline_actions)
end
end
def running_pipeline?(pipeline_id)
thread = get_pipeline(pipeline_id).thread
pipeline = get_pipeline(pipeline_id)
return false unless pipeline
thread = pipeline.thread
thread.is_a?(Thread) && thread.alive?
end
def clean_state?
with_pipelines do |pipelines|
pipelines.empty?
end
end
def setting(key)
@settings.get(key)

View file

@ -60,7 +60,7 @@ module LogStash
def initialize(expected_actions_count)
@expected_actions_count = expected_actions_count
@actions = {}
@actions = java.util.concurrent.ConcurrentHashMap.new
end
def add(action, action_result)

View file

@ -40,13 +40,24 @@ module LogStash module PipelineAction
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
status = pipeline.start # block until the pipeline is correctly started or crashed
if status
pipelines[pipeline_id] = pipeline # The pipeline is successfully started we can add it to the hash
status = nil
pipelines.compute(pipeline_id) do |id,value|
if value
message = "Attempted to create a pipeline that already exists! This shouldn't be possible"
logger.error(message, :pipeline_id => id, :pipelines => pipelines)
raise message
end
status = pipeline.start # block until the pipeline is correctly started or crashed
pipeline # The pipeline is successfully started we can add it to the map
end
LogStash::ConvergeResult::ActionResult.create(self, status)
end
def to_s
"PipelineAction::Create<#{pipeline_id}>"
end
end
end end

View file

@ -19,6 +19,10 @@ module LogStash module PipelineAction
@pipeline_config.pipeline_id
end
def to_s
"PipelineAction::Reload<#{pipeline_id}>"
end
def execute(agent, pipelines)
old_pipeline = pipelines[pipeline_id]
@ -42,6 +46,8 @@ module LogStash module PipelineAction
end
logger.info("Reloading pipeline", "pipeline.id" => pipeline_id)
pipelines.compute(pipeline_id) do |_,pipeline|
status = Stop.new(pipeline_id).execute(agent, pipelines)
if status
@ -49,6 +55,8 @@ module LogStash module PipelineAction
else
return status
end
pipeline
end
end
end
end end

View file

@ -12,13 +12,18 @@ module LogStash module PipelineAction
end
def execute(agent, pipelines)
pipeline = pipelines[pipeline_id]
pipelines.compute(pipeline_id) do |_,pipeline|
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
pipeline.thread.join
pipelines.delete(pipeline_id)
nil # delete the pipeline
end
# If we reach this part of the code we have succeeded because
# the shutdown call will block.
return LogStash::ConvergeResult::SuccessfulAction.new
end
def to_s
"PipelineAction::Stop<#{pipeline_id}>"
end
end
end end

View file

@ -0,0 +1,2 @@
require "logstash/plugins/registry"
require 'logstash/plugins/builtin'

View file

@ -0,0 +1,7 @@
module ::LogStash::Plugins::Builtin
require 'logstash/plugins/builtin/pipeline/input'
require 'logstash/plugins/builtin/pipeline/output'
LogStash::PLUGIN_REGISTRY.add(:input, "pipeline", LogStash::Plugins::Builtin::Pipeline::Input)
LogStash::PLUGIN_REGISTRY.add(:output, "pipeline", LogStash::Plugins::Builtin::Pipeline::Output)
end

View file

@ -0,0 +1,63 @@
module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input < ::LogStash::Inputs::Base
include org.logstash.plugins.pipeline.PipelineInput
config_name "pipeline"
config :address, :validate => :string, :required => true
attr_reader :pipeline_bus
def register
# May as well set this up here, writers won't do anything until
# @running is set to false
@running = java.util.concurrent.atomic.AtomicBoolean.new(false)
@pipeline_bus = execution_context.agent.pipeline_bus
listen_successful = pipeline_bus.listen(self, address)
if !listen_successful
raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines."
end
end
def run(queue)
@queue = queue
@running.set(true)
while @running.get()
sleep 0.1
end
end
def running?
@running && @running.get()
end
# Returns false if the receive failed due to a stopping input
# To understand why this value is useful see Internal.send_to
# Note, this takes a java Stream, not a ruby array
def internalReceive(events)
return false if !@running.get()
# TODO This should probably push a batch at some point in the future when doing so
# buys us some efficiency
events.forEach do |event|
decorate(event)
@queue << event
end
return true
rescue => e
require 'pry'; binding.pry
return true
end
def stop
# We stop receiving events before we unlisten to prevent races
@running.set(false) if @running # If register wasn't yet called, no @running!
pipeline_bus.unlisten(self, address)
end
def isRunning
@running.get
end
end; end; end; end; end

View file

@ -0,0 +1,30 @@
module ::LogStash; module Plugins; module Builtin; module Pipeline; class Output < ::LogStash::Outputs::Base
include org.logstash.plugins.pipeline.PipelineOutput
config_name "pipeline"
concurrency :shared
config :send_to, :validate => :string, :required => true, :list => true
config :ensure_delivery, :validate => :boolean, :default => true
attr_reader :pipeline_bus
def register
@pipeline_bus = execution_context.agent.pipeline_bus
pipeline_bus.registerSender(self, @send_to)
end
def multi_receive(events)
pipeline_bus.sendEvents(self, events, ensure_delivery)
end
def pipeline_shutting_down?
execution_context.pipeline.inputs.all? {|input| input.stop?}
end
def close
pipeline_bus.unregisterSender(self, @send_to)
end
end; end; end; end; end

View file

@ -97,7 +97,13 @@ module LogStash module Plugins
attr_reader :hooks
def initialize
@registry = {}
@mutex = Mutex.new
# We need a threadsafe class here because we may perform
# get/set operations concurrently despite the fact we don't use
# the special atomic methods. That may not be apparent from this file,
# but it is the case that we can call lookups from multiple threads,
# when multiple pipelines are in play, and that a lookup may modify the registry.
@registry = java.util.concurrent.ConcurrentHashMap.new
@hooks = HooksRegistry.new
end
@ -123,6 +129,8 @@ module LogStash module Plugins
end
def load_available_plugins
require "logstash/plugins/builtin"
GemRegistry.logstash_plugins.each do |plugin_context|
# When a plugin has a HOOK_FILE defined, its the responsibility of the plugin
# to register itself to the registry of available plugins.
@ -140,17 +148,21 @@ module LogStash module Plugins
end
def lookup(type, plugin_name, &block)
plugin = get(type, plugin_name)
@mutex.synchronize do
plugin_spec = get(type, plugin_name)
# Assume that we have a legacy plugin
if plugin.nil?
plugin = legacy_lookup(type, plugin_name)
if plugin_spec.nil?
plugin_spec = legacy_lookup(type, plugin_name)
end
raise LoadError, "No plugin found with name '#{plugin_name}'" unless plugin_spec
if block_given? # if provided pass a block to do validation
raise LoadError, "Block validation fails for plugin named #{plugin_name} of type #{type}," unless block.call(plugin.klass, plugin_name)
raise LoadError, "Block validation fails for plugin named #{plugin_name} of type #{type}," unless block.call(plugin_spec.klass, plugin_name)
end
return plugin.klass
return plugin_spec.klass
end
end
# The legacy_lookup method uses the 1.5->5.0 file structure to find and match

View file

@ -29,7 +29,7 @@ require "logstash/shutdown_watcher"
require "logstash/patches/clamp"
require "logstash/settings"
require "logstash/version"
require "logstash/plugins/registry"
require 'logstash/plugins'
require "logstash/modules/util"
require "logstash/bootstrap_check/default_config"
require "logstash/bootstrap_check/bad_java"

View file

@ -134,7 +134,7 @@ describe LogStash::Agent do
it "does not upgrade the new config" do
t = Thread.new { subject.execute }
Timeout.timeout(timeout) do
sleep(0.01) until subject.with_pipelines {|pipelines| subject.running_pipelines? && pipelines.values.first.ready? }
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.ready?
end
expect(subject.converge_state_and_update).not_to be_a_successful_converge
expect(subject).to have_running_pipeline?(mock_config_pipeline)
@ -154,7 +154,7 @@ describe LogStash::Agent do
it "does upgrade the new config" do
t = Thread.new { subject.execute }
Timeout.timeout(timeout) do
sleep(0.01) until subject.with_pipelines {|pipelines| subject.pipelines_count > 0 && pipelines.values.first.ready? }
sleep(0.01) until subject.pipelines_count > 0 && subject.pipelines.values.first.ready?
end
expect(subject.converge_state_and_update).to be_a_successful_converge
@ -178,7 +178,7 @@ describe LogStash::Agent do
it "does not try to reload the pipeline" do
t = Thread.new { subject.execute }
Timeout.timeout(timeout) do
sleep(0.01) until subject.with_pipelines {|pipelines| subject.running_pipelines? && pipelines.values.first.running? }
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
end
expect(subject.converge_state_and_update).not_to be_a_successful_converge
expect(subject).to have_running_pipeline?(mock_config_pipeline)
@ -198,7 +198,7 @@ describe LogStash::Agent do
it "tries to reload the pipeline" do
t = Thread.new { subject.execute }
Timeout.timeout(timeout) do
sleep(0.01) until subject.with_pipelines {|pipelines| subject.running_pipelines? && pipelines.values.first.running? }
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
end
expect(subject.converge_state_and_update).to be_a_successful_converge

View file

@ -9,7 +9,7 @@ require "logstash/inputs/generator"
describe LogStash::PipelineAction::Create do
let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") }
let(:pipelines) { Hash.new }
let(:pipelines) { java.util.concurrent.ConcurrentHashMap.new }
let(:agent) { double("agent") }
before do

View file

@ -11,7 +11,7 @@ describe LogStash::PipelineAction::Reload do
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) }
let(:pipeline_config) { "input { generator {} } output { null {} }" }
let(:pipeline) { mock_pipeline_from_string(pipeline_config, mock_settings("pipeline.reloadable" => true)) }
let(:pipelines) { { pipeline_id => pipeline } }
let(:pipelines) { chm = java.util.concurrent.ConcurrentHashMap.new; chm[pipeline_id] = pipeline; chm }
let(:agent) { double("agent") }
subject { described_class.new(new_pipeline_config, metric) }

View file

@ -9,7 +9,7 @@ describe LogStash::PipelineAction::Stop do
let(:pipeline_config) { "input { generator {} } output { null {} }" }
let(:pipeline_id) { :main }
let(:pipeline) { mock_pipeline_from_string(pipeline_config) }
let(:pipelines) { { :main => pipeline } }
let(:pipelines) { chm = java.util.concurrent.ConcurrentHashMap.new; chm[:main] = pipeline; chm }
let(:agent) { double("agent") }
subject { described_class.new(pipeline_id) }

View file

@ -0,0 +1,175 @@
require 'spec_helper'
describe ::LogStash::Plugins::Builtin::Pipeline do
let(:address) { "fooAdr" }
let(:input_options) { { "address" => address }}
let(:output_options) { { "send_to" => [address] }}
let(:execution_context) { double("execution_context" )}
let(:agent) { double("agent") }
let(:pipeline_bus) { org.logstash.plugins.pipeline.PipelineBus.new }
let(:queue) { Queue.new }
let(:input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(input_options) }
let(:output) { ::LogStash::Plugins::Builtin::Pipeline::Output.new(output_options) }
let(:inputs) { [input] }
let(:event) { ::LogStash::Event.new("foo" => "bar") }
before(:each) do
allow(execution_context).to receive(:agent).and_return(agent)
allow(agent).to receive(:pipeline_bus).and_return(pipeline_bus)
inputs.each do |i|
allow(i).to receive(:execution_context).and_return(execution_context)
end
allow(output).to receive(:execution_context).and_return(execution_context)
end
def wait_input_running(input_plugin)
until input_plugin.running?
sleep 0.1
end
end
describe "Input/output pair" do
def start_input
input.register
@input_thread = Thread.new do
input.run(queue)
end
wait_input_running(input)
end
def stop_input
input.do_stop
input.do_close
@input_thread.join
end
context "with both initially running" do
before(:each) do
start_input
output.register
end
describe "sending a message" do
before(:each) do
output.multi_receive([event])
end
subject { queue.pop(true) }
it "should not send an object with the same identity, but rather, a clone" do
expect(subject).not_to equal(event)
end
it "should send a clone with the correct data" do
expect(subject.to_hash_with_metadata).to match(event.to_hash_with_metadata)
end
# A clone wouldn't be affected here
it "should no longer have the same content if the original event was modified" do
event.set("baz", "bot")
expect(subject.to_hash_with_metadata).not_to match(event.to_hash_with_metadata)
end
end
after(:each) do
stop_input
output.do_close
end
end
context "with the input initially stopped" do
before(:each) do
output.register
@receive_thread = Thread.new { output.multi_receive([event]) }
end
it "should deliver the message once the input comes up" do
sleep 3
start_input
@receive_thread.join
expect(queue.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata)
end
after(:each) do
stop_input
output.do_close
end
end
end
describe "one output to multiple inputs" do
describe "with all plugins up" do
let(:other_address) { "other" }
let(:other_input_options) { { "address" => other_address } }
let(:other_input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(other_input_options) }
let(:output_options) { { "send_to" => [address, other_address] } }
let(:inputs) { [input, other_input] }
let(:queues) { [Queue.new, Queue.new] }
let(:inputs_queues) { Hash[inputs.zip(queues)] }
before(:each) do
input.register
other_input.register
output.register
@input_threads = inputs_queues.map do |input_plugin,input_queue|
Thread.new do
input_plugin.run(input_queue)
end
end
inputs_queues.each do |input_plugin, input_queue|
wait_input_running(input_plugin)
end
end
describe "sending a message" do
before(:each) do
output.multi_receive([event])
end
it "should send the message to both outputs" do
inputs_queues.each do |i,q|
expect(q.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata)
end
end
end
context "with ensure delivery set to false" do
let(:output_options) { super.merge("ensure_delivery" => false) }
before(:each) do
other_input.do_stop
other_input.do_close
output.multi_receive([event])
end
it "should not send the event to the input that is down" do
expect(inputs_queues[input].pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata)
expect(inputs_queues[other_input].size).to eql(0)
end
# Test that the function isn't blocked on the last message
# a bug could conceivable cause this to hang
it "should not block subsequent sends" do
output.multi_receive([event])
expect(inputs_queues[input].pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata)
expect(inputs_queues[input].pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata)
expect(inputs_queues[other_input].size).to eql(0)
end
end
after(:each) do
inputs.each(&:do_stop)
inputs.each(&:do_close)
output.do_close
@input_threads.each(&:join)
end
end
end
end

View file

@ -26,7 +26,7 @@ shared_context "api setup" do
@agent.execute
pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }")
pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric)
@pipelines = Hash.new
@pipelines = java.util.concurrent.ConcurrentHashMap.new
expect(pipeline_creator.execute(@agent, @pipelines)).to be_truthy
end

View file

@ -160,6 +160,6 @@ public final class Logstash implements Runnable, AutoCloseable {
}
private static void uncleanShutdown(final Exception ex) {
throw new IllegalStateException("Logstash stopped processing because of an error:", ex);
throw new IllegalStateException("Logstash stopped processing because of an error: " + ex.getMessage(), ex);
}
}

View file

@ -48,6 +48,10 @@ public final class JrubyEventExtLibrary {
super(runtime, klass);
}
public static RubyEvent newRubyEvent(Ruby runtime) {
return newRubyEvent(runtime, new Event());
}
public static RubyEvent newRubyEvent(Ruby runtime, Event event) {
final RubyEvent ruby =
new RubyEvent(runtime, RubyUtil.RUBY_EVENT_CLASS);
@ -133,9 +137,14 @@ public final class JrubyEventExtLibrary {
}
@JRubyMethod(name = "clone")
public IRubyObject ruby_clone(ThreadContext context)
public IRubyObject rubyClone(ThreadContext context)
{
return RubyEvent.newRubyEvent(context.runtime, this.event.clone());
return rubyClone(context.runtime);
}
public RubyEvent rubyClone(Ruby runtime)
{
return RubyEvent.newRubyEvent(runtime, this.event.clone());
}
@JRubyMethod(name = "overwrite", required = 1)

View file

@ -0,0 +1,75 @@
package org.logstash.plugins.pipeline;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Class for representing the state of an internal address.
*/
public class AddressState {
private final String address;
private final Set<PipelineOutput> outputs = ConcurrentHashMap.newKeySet();
private volatile PipelineInput input = null;
AddressState(String address) {
this.address = address;
}
/**
* Add the given output and ensure associated input's receivers are updated
* @param output
* @return
*/
public boolean addOutput(PipelineOutput output) {
return outputs.add(output);
}
public boolean removeOutput(PipelineOutput output) {
return outputs.remove(output);
}
public PipelineInput getInput() {
return input;
}
/**
* Assigns an input to listen on this address. Will return false if another input is already listening.
* @param newInput
* @return true if successful, false if another input is listening
*/
public synchronized boolean assignInputIfMissing(PipelineInput newInput) {
if (input != newInput && input != null) return false;
this.input = newInput;
return true;
}
/**
* Unsubscribes the given input from this address
* @param unsubscribingInput
* @return true if this input was listening, false otherwise
*/
public synchronized boolean unassignInput(PipelineInput unsubscribingInput) {
if (input != unsubscribingInput) return false;
input = null;
return true;
}
public boolean isRunning() {
return input != null && input.isRunning();
}
public boolean isEmpty() {
return (input == null) && outputs.isEmpty();
}
// Just for tests
boolean hasOutput(PipelineOutput output) {
return outputs.contains(output);
}
public Set<PipelineOutput> getOutputs() {
return outputs;
}
}

View file

@ -0,0 +1,134 @@
package org.logstash.plugins.pipeline;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
/**
* This class is essentially the communication bus / central state for the `pipeline` inputs/outputs to talk to each
* other.
*
* This class is threadsafe. Most method locking is coarse grained with `synchronized` since contention for all these methods
* shouldn't matter
*/
public class PipelineBus {
final HashMap<String, AddressState> addressStates = new HashMap<>();
ConcurrentHashMap<PipelineOutput, ConcurrentHashMap<String, AddressState>> outputsToAddressStates = new ConcurrentHashMap<>();
private static final Logger logger = LogManager.getLogger(PipelineBus.class);
/**
* Sends events from the provided output.
* @param sender The output sending the events.
* @param events A collection of JRuby events
* @param ensureDelivery set to true if you want this to retry indefinitely in the event an event send fails
*/
public void sendEvents(final PipelineOutput sender,
final Collection<JrubyEventExtLibrary.RubyEvent> events,
final boolean ensureDelivery) {
final ConcurrentHashMap<String, AddressState> addressesToInputs = outputsToAddressStates.get(sender);
addressesToInputs.forEach( (address, addressState) -> {
final Stream<JrubyEventExtLibrary.RubyEvent> clones = events.stream().map(e -> e.rubyClone(RubyUtil.RUBY));
PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
boolean sendWasSuccess = input != null && input.internalReceive(clones);
// Retry send if the initial one failed
while (ensureDelivery && !sendWasSuccess) {
// We need to refresh the input in case the mapping has updated between loops
String message = String.format("Attempted to send event to '%s' but that address was unavailable. " +
"Maybe the destination pipeline is down or stopping? Will Retry.", address);
logger.warn(message);
input = addressState.getInput();
sendWasSuccess = input != null && input.internalReceive(clones);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Sleep unexpectedly interrupted in bus retry loop", e);
}
}
});
}
/**
* Should be called by an output on register
* @param output
* @param addresses
*/
public synchronized void registerSender(final PipelineOutput output, final Iterable<String> addresses) {
addresses.forEach((String address) -> {
final AddressState state = addressStates.computeIfAbsent(address, AddressState::new);
state.addOutput(output);
});
updateOutputReceivers(output);
}
/**
* Should be called by an output on close
* @param output output that will be unregistered
* @param addresses collection of addresses this sender was registered with
*/
public synchronized void unregisterSender(final PipelineOutput output, final Iterable<String> addresses) {
addresses.forEach(address -> {
final AddressState state = addressStates.get(address);
if (state != null) {
state.removeOutput(output);
if (state.isEmpty()) addressStates.remove(address);
}
});
outputsToAddressStates.remove(output);
}
/**
* Updates the internal state for this output to reflect the fact that there may be a change
* in the inputs receiving events from it.
* @param output
*/
private synchronized void updateOutputReceivers(final PipelineOutput output) {
ConcurrentHashMap<String, AddressState> outputAddressToInputMapping =
outputsToAddressStates.computeIfAbsent(output, o -> new ConcurrentHashMap<>());
addressStates.forEach( (address, state) -> {
if (state.hasOutput(output)) outputAddressToInputMapping.put(address, state);
});
}
/**
* Listens to a given address with the provided listener
* Only one listener can listen on an address at a time
* @param address
* @param input
* @return true if the listener successfully subscribed
*/
public synchronized boolean listen(final PipelineInput input, final String address) {
final AddressState state = addressStates.computeIfAbsent(address, AddressState::new);
if (state.assignInputIfMissing(input)) {
state.getOutputs().forEach(this::updateOutputReceivers);
return true;
}
return false;
}
/**
* Stop listing on the given address with the given listener
* @param address
* @param input
*/
public synchronized void unlisten(final PipelineInput input, final String address) {
final AddressState state = addressStates.get(address);
if (state != null) {
state.unassignInput(input);
if (state.isEmpty()) addressStates.remove(address);
state.getOutputs().forEach(this::updateOutputReceivers);
}
}
}

View file

@ -0,0 +1,21 @@
package org.logstash.plugins.pipeline;
import org.logstash.ext.JrubyEventExtLibrary;
import java.util.stream.Stream;
public interface PipelineInput {
/**
* Accepts an event
* It might be rejected if the input is stopping
* @param events a collection of events
* @return true if the event was successfully received
*/
boolean internalReceive(Stream<JrubyEventExtLibrary.RubyEvent> events);
/**
*
* @return true if the input is running
*/
boolean isRunning();
}

View file

@ -0,0 +1,9 @@
package org.logstash.plugins.pipeline;
import org.logstash.ext.JrubyEventExtLibrary;
import java.util.Map;
import java.util.function.Function;
public interface PipelineOutput {
}

View file

@ -0,0 +1,7 @@
package org.logstash;
import static org.junit.Assert.*;
public class ConvertedMapTest {
}

View file

@ -0,0 +1,166 @@
package org.logstash.plugins.pipeline;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
public class PipelineBusTest {
static String address = "fooAddr";
static String otherAddress = "fooAddr";
static Collection<String> addresses = Arrays.asList(address, otherAddress);
PipelineBus bus;
TestPipelineInput input;
TestPipelineOutput output;
@Before
public void setup() {
bus = new PipelineBus();
input = new TestPipelineInput();
output = new TestPipelineOutput();
}
@Test
public void subscribeUnsubscribe() {
assertThat(bus.listen(input, address)).isTrue();
assertThat(bus.addressStates.get(address).getInput()).isSameAs(input);
bus.unlisten(input, address);
// Key should have been pruned
assertThat(bus.addressStates.containsKey(address)).isFalse();
}
@Test
public void senderRegisterUnregister() {
bus.registerSender(output, addresses);
assertThat(bus.addressStates.get(address).hasOutput(output)).isTrue();
bus.unregisterSender(output, addresses);
// We should have pruned this address
assertThat(bus.addressStates.containsKey(address)).isFalse();
}
@Test
public void activeSenderPreventsPrune() {
bus.registerSender(output, addresses);
bus.listen(input, address);
bus.unlisten(input, address);
assertThat(bus.addressStates.containsKey(address)).isTrue();
bus.unregisterSender(output, addresses);
assertThat(bus.addressStates.containsKey(address)).isFalse();
}
@Test
public void activeListenerPreventsPrune() {
bus.registerSender(output, addresses);
bus.listen(input, address);
bus.unregisterSender(output, addresses);
assertThat(bus.addressStates.containsKey(address)).isTrue();
bus.unlisten(input, address);
assertThat(bus.addressStates.containsKey(address)).isFalse();
}
@Test
public void registerUnregisterListenerUpdatesOutputs() {
bus.registerSender(output, addresses);
bus.listen(input, address);
ConcurrentHashMap<String, AddressState> outputAddressesToInputs = bus.outputsToAddressStates.get(output);
assertThat(outputAddressesToInputs.size()).isEqualTo(1);
bus.unregisterSender(output, addresses);
assertThat(bus.outputsToAddressStates.get(output)).isNull();
bus.registerSender(output, addresses);
assertThat(bus.outputsToAddressStates.get(output).size()).isEqualTo(1);
}
@Test
public void listenUnlistenUpdatesOutputReceivers() {
bus.registerSender(output, addresses);
bus.listen(input, address);
final ConcurrentHashMap<String, AddressState> outputAddressesToInputs = bus.outputsToAddressStates.get(output);
outputAddressesToInputs.get(address).getInput().internalReceive(Stream.of(rubyEvent()));
assertThat(input.eventCount.longValue()).isEqualTo(1L);
bus.unlisten(input, address);
TestPipelineInput newInput = new TestPipelineInput();
bus.listen(newInput, address);
outputAddressesToInputs.get(address).getInput().internalReceive(Stream.of(rubyEvent()));
// The new event went to the new input, not the old one
assertThat(newInput.eventCount.longValue()).isEqualTo(1L);
assertThat(input.eventCount.longValue()).isEqualTo(1L);
}
@Test
public void missingInputEventuallySucceeds() throws InterruptedException {
bus.registerSender(output, addresses);
// bus.sendEvent should block at this point since there is no attached listener
// For this test we want to make sure that the background thread has had time to actually block
// since if we start the input too soon we aren't testing anything
// The below code attempts to make sure this happens, though it's hard to be deterministic
// without making sendEvent take weird arguments the non-test code really doesn't need
CountDownLatch sendLatch = new CountDownLatch(1);
Thread sendThread = new Thread(() -> {
sendLatch.countDown();
bus.sendEvents(output, Collections.singleton(rubyEvent()), true);
});
sendThread.start();
// Try to ensure that the send actually happened. The latch gets us close,
// the sleep takes us the full way (hopefully)
sendLatch.await();
Thread.sleep(1000);
bus.listen(input, address);
// This would block if there's an error in the code
sendThread.join();
assertThat(input.eventCount.longValue()).isEqualTo(1L);
}
private JrubyEventExtLibrary.RubyEvent rubyEvent() {
return JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY);
}
static class TestPipelineInput implements PipelineInput {
public LongAdder eventCount = new LongAdder();
@Override
public boolean internalReceive(Stream<JrubyEventExtLibrary.RubyEvent> events) {
eventCount.add(events.count());
return true;
}
@Override
public boolean isRunning() {
return true;
}
}
static class TestPipelineOutput implements PipelineOutput {
}
}

View file

@ -55,4 +55,39 @@ describe "Test Logstash service when multiple pipelines are used" do
expect(File.exist?(temporary_out_file_2)).to be(true)
expect(IO.readlines(temporary_out_file_2).size).to eq(1)
end
describe "inter-pipeline communication" do
let(:pipelines) do
[
{
"pipeline.id" => "test",
"pipeline.workers" => 1,
"pipeline.batch.size" => 1,
"config.string" => "input { generator { count => 1 } } output { pipeline { send_to => testaddr } }"
},
{
"pipeline.id" => "test2",
"pipeline.workers" => 1,
"pipeline.batch.size" => 1,
"config.string" => "input { pipeline { address => testaddr } } output { file { path => \"#{temporary_out_file_1}\" } }"
}
]
end
it "can communicate between pipelines" do
logstash_service = @fixture.get_service("logstash")
logstash_service.spawn_logstash("--path.settings", settings_dir, "--log.level=debug")
logstash_service.wait_for_logstash
# Wait for LS to come up
i = 0
until File.exist?(temporary_out_file_1) && IO.readlines(temporary_out_file_1).size >= 1
i += 1
sleep 1
break if i > 30
end
expect(IO.readlines(temporary_out_file_1).size).to eq(1)
puts "Done"
end
end
end

View file

@ -17,7 +17,7 @@ require "logstash/settings"
require 'rack/test'
require 'rspec'
require "json"
require 'logstash/runner'
class JSONIOThingy < IO
def initialize; end