mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
Cleans up output delegators and simplifies a few other aspects of plugins
The new way Output Delegators work is that events flow from: OutputDelegator -> OutputDelegatorStrategy -> Output The output delegator handles all the common denominator tasks (like metrics) and a few other things. The OutputDelegatorStrategy handles concurrency and Output instantiation. This is a significant improvement and simplification over the past where we used mixins and dynamic method redifinition. This removes the concept of plugin 'unique_names' and replaces it with the 'id'. Also consistently autogenerates plugin IDs based on a given config file using SHA1 hashing. Fixes #5752
This commit is contained in:
parent
f00fadd6fc
commit
f68b2c0841
20 changed files with 380 additions and 396 deletions
43
docs/static/include/pluginbody.asciidoc
vendored
43
docs/static/include/pluginbody.asciidoc
vendored
|
@ -275,40 +275,31 @@ require "logstash/namespace"
|
|||
class LogStash::{pluginclass}::{pluginnamecap} < LogStash::{pluginclass}::Base
|
||||
config_name "example"
|
||||
|
||||
# If declared logstash will only allow a single instance of this plugin
|
||||
# to exist, regardless of how many CPU cores logstash detects. This is best
|
||||
# used in cases like the File output, where separate threads writing to a single
|
||||
# File would only cause problems.
|
||||
# This sets the concurrency behavior of this plugin. By default it is :legacy, which was the standard
|
||||
# way concurrency worked before Logstash 2.4
|
||||
#
|
||||
# You should explicitly set it to either :single or :shared as :legacy will be removed in Logstash 6.0
|
||||
#
|
||||
# When configured as :single a single instance of the Output will be shared among the
|
||||
# pipeline worker threads. Access to the `#multi_receive/#multi_receive_encoded/#receive` method will be synchronized
|
||||
# i.e. only one thread will be active at a time making threadsafety much simpler.
|
||||
#
|
||||
# You can set this to :shared if your output is threadsafe. This will maximize
|
||||
# concurrency but you will need to make appropriate uses of mutexes in `#multi_receive/#receive`.
|
||||
#
|
||||
# respond_to? check needed for backwards compatibility with < 2.2 Logstashes
|
||||
declare_workers_not_supported! if self.respond_to?(:declare_workers_not_supported!)
|
||||
|
||||
# If declared threadsafe logstash will only ever create one
|
||||
# instance of this plugin per pipeline.
|
||||
# That instance will be shared across all workers
|
||||
# It is up to the plugin author to correctly write concurrent code!
|
||||
#
|
||||
# respond_to? check needed for backwards compatibility with < 2.2 Logstashes
|
||||
declare_threadsafe! if self.respond_to?(:declare_threadsafe!)
|
||||
|
||||
# Only the `#multi_receive/#multi_receive_encoded` methods need to actually be threadsafe, the other methods
|
||||
# will only be executed in a single thread
|
||||
concurrency :single
|
||||
|
||||
public
|
||||
def register
|
||||
# Does the same thing as declare_workers_not_supported!
|
||||
# But works in < 2.2 logstashes
|
||||
# workers_not_supported
|
||||
def register
|
||||
end # def register
|
||||
|
||||
public
|
||||
# Takes an array of events
|
||||
# Must be threadsafe if `concurrency :shared` is set
|
||||
def multi_receive(events)
|
||||
end # def multi_receive
|
||||
|
||||
public
|
||||
# Needed for logstash < 2.2 compatibility
|
||||
# Takes events one at a time
|
||||
def receive(event)
|
||||
end # def receive
|
||||
|
||||
end # class LogStash::{pluginclass}::{pluginnamecap}
|
||||
----------------------------------
|
||||
endif::multi_receive_method[]
|
||||
|
|
|
@ -70,7 +70,7 @@ module LogStash
|
|||
# Turn the `plugins` stats hash into an array of [ {}, {}, ... ]
|
||||
# This is to produce an array of data points, one point for each
|
||||
# plugin instance.
|
||||
return [] unless stats[:plugins].include?(plugin_type)
|
||||
return [] unless stats[:plugins] && stats[:plugins].include?(plugin_type)
|
||||
stats[:plugins][plugin_type].collect do |id, data|
|
||||
{ :id => id }.merge(data)
|
||||
end
|
||||
|
|
|
@ -209,7 +209,7 @@ module LogStash::Config::Mixin
|
|||
|
||||
name = name.to_s if name.is_a?(Symbol)
|
||||
@config[name] = opts # ok if this is empty
|
||||
|
||||
|
||||
if name.is_a?(String)
|
||||
define_method(name) { instance_variable_get("@#{name}") }
|
||||
define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) }
|
||||
|
|
|
@ -13,15 +13,14 @@ module LogStash
|
|||
]
|
||||
def_delegators :@filter, *DELEGATED_METHODS
|
||||
|
||||
def initialize(logger, klass, metric, *args)
|
||||
options = args.reduce({}, :merge)
|
||||
|
||||
def initialize(logger, klass, metric, plugin_args)
|
||||
@logger = logger
|
||||
@klass = klass
|
||||
@filter = klass.new(options)
|
||||
@id = plugin_args["id"]
|
||||
@filter = klass.new(plugin_args)
|
||||
|
||||
# Scope the metrics to the plugin
|
||||
namespaced_metric = metric.namespace(@filter.plugin_unique_name.to_sym)
|
||||
namespaced_metric = metric.namespace("#{@klass.config_name}_#{@id}".to_sym)
|
||||
@filter.metric = namespaced_metric
|
||||
|
||||
@metric_events = namespaced_metric.namespace(:events)
|
||||
|
|
|
@ -1,192 +1,56 @@
|
|||
# encoding: utf-8
|
||||
require "concurrent/atomic/atomic_fixnum"
|
||||
java_import "java.util.concurrent.CopyOnWriteArrayList"
|
||||
require "logstash/output_delegator_strategy_registry"
|
||||
|
||||
require "logstash/output_delegator_strategies/shared"
|
||||
require "logstash/output_delegator_strategies/single"
|
||||
require "logstash/output_delegator_strategies/legacy"
|
||||
|
||||
# This class goes hand in hand with the pipeline to provide a pool of
|
||||
# free workers to be used by pipeline worker threads. The pool is
|
||||
# internally represented with a SizedQueue set the the size of the number
|
||||
# of 'workers' the output plugin is configured with.
|
||||
#
|
||||
# This plugin also records some basic statistics
|
||||
module LogStash class OutputDelegator
|
||||
attr_reader :workers, :config, :threadsafe
|
||||
attr_reader :metric, :metric_events, :strategy, :namespaced_metric, :metric_events , :plugin_args, :strategy_registry
|
||||
|
||||
# The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them
|
||||
# Internally these just get merged together into a single hash
|
||||
def initialize(logger, klass, default_worker_count, metric, *plugin_args)
|
||||
def initialize(logger, output_class, metric, strategy_registry, plugin_args)
|
||||
@logger = logger
|
||||
@threadsafe = klass.threadsafe?
|
||||
@config = plugin_args.reduce({}, :merge)
|
||||
@klass = klass
|
||||
@workers = java.util.concurrent.CopyOnWriteArrayList.new
|
||||
@default_worker_count = default_worker_count
|
||||
@registered = false
|
||||
@output_class = output_class
|
||||
@metric = metric
|
||||
@plugin_args = plugin_args
|
||||
@strategy_registry ||= ::LogStash::OutputDelegatorStrategyRegistry.instance
|
||||
raise ArgumentError, "No ID specified! Got args #{plugin_args}" unless id
|
||||
|
||||
build_strategy!
|
||||
|
||||
# Create an instance of the input so we can fetch the identifier
|
||||
output = @klass.new(@config)
|
||||
|
||||
# Scope the metrics to the plugin
|
||||
namespaced_metric = metric.namespace(output.plugin_unique_name.to_sym)
|
||||
output.metric = namespaced_metric
|
||||
|
||||
@metric_events = namespaced_metric.namespace(:events)
|
||||
namespaced_metric.gauge(:name, config_name)
|
||||
|
||||
@events_received = Concurrent::AtomicFixnum.new(0)
|
||||
end
|
||||
|
||||
def threadsafe?
|
||||
!!@threadsafe
|
||||
end
|
||||
|
||||
def warn_on_worker_override!
|
||||
# The user has configured extra workers, but this plugin doesn't support it :(
|
||||
if worker_limits_overriden?
|
||||
message = @klass.workers_not_supported_message
|
||||
warning_meta = {:plugin => @klass.config_name, :worker_count => @config["workers"]}
|
||||
if message
|
||||
warning_meta[:message] = message
|
||||
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", warning_meta))
|
||||
else
|
||||
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", warning_meta))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def worker_limits_overriden?
|
||||
@config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported?
|
||||
end
|
||||
|
||||
def target_worker_count
|
||||
# Remove in 5.0 after all plugins upgraded to use class level declarations
|
||||
raise ArgumentError, "Attempted to detect target worker count before instantiating a worker to test for legacy workers_not_supported!" if @workers.size == 0
|
||||
|
||||
if @threadsafe || @klass.workers_not_supported?
|
||||
1
|
||||
else
|
||||
@config["workers"] || @default_worker_count
|
||||
end
|
||||
@namespaced_metric = metric.namespace(id.to_sym)
|
||||
@metric_events = @namespaced_metric.namespace(:events)
|
||||
@namespaced_metric.gauge(:name, id)
|
||||
end
|
||||
|
||||
def config_name
|
||||
@klass.config_name
|
||||
@output_class.config_name
|
||||
end
|
||||
|
||||
def concurrency
|
||||
@output_class.concurrency
|
||||
end
|
||||
|
||||
def build_strategy!
|
||||
@strategy = strategy_registry.
|
||||
class_for(self.concurrency).
|
||||
new(@logger, @output_class, @metric, @plugin_args)
|
||||
end
|
||||
|
||||
def id
|
||||
@plugin_args["id"]
|
||||
end
|
||||
|
||||
def register
|
||||
raise ArgumentError, "Attempted to register #{self} twice!" if @registered
|
||||
@registered = true
|
||||
# We define this as an array regardless of threadsafety
|
||||
# to make reporting simpler, even though a threadsafe plugin will just have
|
||||
# a single instance
|
||||
#
|
||||
# Older plugins invoke the instance method Outputs::Base#workers_not_supported
|
||||
# To detect these we need an instance to be created first :()
|
||||
# TODO: In the next major version after 2.x remove support for this
|
||||
@workers << @klass.new(@config)
|
||||
@workers.first.register # Needed in case register calls `workers_not_supported`
|
||||
|
||||
@logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass.name)
|
||||
|
||||
# Threadsafe versions don't need additional workers
|
||||
setup_additional_workers!(target_worker_count) unless @threadsafe
|
||||
# We skip the first worker because that's pre-registered to deal with legacy workers_not_supported
|
||||
@workers.subList(1,@workers.size).each(&:register)
|
||||
setup_multi_receive!
|
||||
@strategy.register
|
||||
end
|
||||
|
||||
def setup_additional_workers!(target_worker_count)
|
||||
warn_on_worker_override!
|
||||
|
||||
(target_worker_count - 1).times do
|
||||
inst = @klass.new(@config)
|
||||
inst.metric = @metric
|
||||
@workers << inst
|
||||
end
|
||||
|
||||
# This queue is used to manage sharing across threads
|
||||
@worker_queue = SizedQueue.new(target_worker_count)
|
||||
@workers.each {|w| @worker_queue << w }
|
||||
end
|
||||
|
||||
def setup_multi_receive!
|
||||
# One might wonder why we don't use something like
|
||||
# define_singleton_method(:multi_receive, method(:threadsafe_multi_receive)
|
||||
# and the answer is this is buggy on Jruby 1.7.x . It works 98% of the time!
|
||||
# The other 2% you get weird errors about rebinding to the same object
|
||||
# Until we switch to Jruby 9.x keep the define_singleton_method parts
|
||||
# the way they are, with a block
|
||||
# See https://github.com/jruby/jruby/issues/3582
|
||||
if threadsafe?
|
||||
@threadsafe_worker = @workers.first
|
||||
define_singleton_method(:multi_receive) do |events|
|
||||
threadsafe_multi_receive(events)
|
||||
end
|
||||
else
|
||||
define_singleton_method(:multi_receive) do |events|
|
||||
worker_multi_receive(events)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def threadsafe_multi_receive(events)
|
||||
@events_received.increment(events.length)
|
||||
def multi_receive(events)
|
||||
@metric_events.increment(:in, events.length)
|
||||
|
||||
clock = @metric_events.time(:duration_in_millis)
|
||||
@threadsafe_worker.multi_receive(events)
|
||||
clock.stop
|
||||
@strategy.multi_receive(events)
|
||||
@metric_events.increment(:out, events.length)
|
||||
end
|
||||
|
||||
def worker_multi_receive(events)
|
||||
@events_received.increment(events.length)
|
||||
@metric_events.increment(:in, events.length)
|
||||
|
||||
worker = @worker_queue.pop
|
||||
begin
|
||||
clock = @metric_events.time(:duration_in_millis)
|
||||
worker.multi_receive(events)
|
||||
clock.stop
|
||||
@metric_events.increment(:out, events.length)
|
||||
ensure
|
||||
@worker_queue.push(worker)
|
||||
end
|
||||
end
|
||||
|
||||
def do_close
|
||||
@logger.debug("closing output delegator", :klass => @klass.name)
|
||||
|
||||
if @threadsafe
|
||||
@workers.each(&:do_close)
|
||||
else
|
||||
worker_count.times do
|
||||
worker = @worker_queue.pop
|
||||
worker.do_close
|
||||
end
|
||||
end
|
||||
@strategy.do_close
|
||||
end
|
||||
|
||||
def events_received
|
||||
@events_received.value
|
||||
end
|
||||
|
||||
# There's no concept of 'busy' workers for a threadsafe plugin!
|
||||
def busy_workers
|
||||
if @threadsafe
|
||||
0
|
||||
else
|
||||
# The pipeline reporter can run before the outputs are registered trying to pull a value here
|
||||
# In that case @worker_queue is empty, we just return 0
|
||||
return 0 unless @worker_queue
|
||||
@workers.size - @worker_queue.size
|
||||
end
|
||||
end
|
||||
|
||||
def worker_count
|
||||
@workers.size
|
||||
end
|
||||
|
||||
private
|
||||
# Needed for testing, so private
|
||||
attr_reader :threadsafe_worker, :worker_queue
|
||||
end end
|
||||
end; end
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
# Remove this in Logstash 6.0
|
||||
module LogStash module OutputDelegatorStrategies class Legacy
|
||||
attr_reader :worker_count, :workers
|
||||
|
||||
def initialize(logger, klass, metric, plugin_args={})
|
||||
@worker_count = (plugin_args["workers"] || 1).to_i
|
||||
@workers = @worker_count.times.map {|t| klass.new(plugin_args)}
|
||||
@worker_queue = SizedQueue.new(@worker_count)
|
||||
@workers.each {|w| @worker_queue << w}
|
||||
end
|
||||
|
||||
def register
|
||||
@workers.each(&:register)
|
||||
end
|
||||
|
||||
def multi_receive(events)
|
||||
worker = @worker_queue.pop
|
||||
worker.multi_receive(events)
|
||||
ensure
|
||||
@worker_queue << worker if worker
|
||||
end
|
||||
|
||||
def do_close
|
||||
# No mutex needed since this is only called when the pipeline is clear
|
||||
@workers.each(&:do_close)
|
||||
end
|
||||
|
||||
::LogStash::OutputDelegatorStrategyRegistry.instance.register(:legacy, self)
|
||||
end; end; end
|
|
@ -0,0 +1,20 @@
|
|||
module LogStash module OutputDelegatorStrategies class Shared
|
||||
def initialize(logger, klass, metric, xopts={}, plugin_args={})
|
||||
@output = klass.new(plugin_args)
|
||||
end
|
||||
|
||||
def register
|
||||
@output.register
|
||||
end
|
||||
|
||||
def multi_receive(events)
|
||||
@output.multi_receive(events)
|
||||
end
|
||||
|
||||
def do_close
|
||||
@output.do_close
|
||||
end
|
||||
|
||||
::LogStash::OutputDelegatorStrategyRegistry.instance.register(:shared, self)
|
||||
end; end; end
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
module LogStash module OutputDelegatorStrategies class Single
|
||||
def initialize(logger, klass, metric, xopts={}, plugin_args={})
|
||||
@output = klass.new(plugin_args)
|
||||
@mutex = Mutex.new
|
||||
end
|
||||
|
||||
def register
|
||||
@output.register
|
||||
end
|
||||
|
||||
def multi_receive(events)
|
||||
@mutex.synchronize do
|
||||
@output.multi_receive(events)
|
||||
end
|
||||
end
|
||||
|
||||
def do_close
|
||||
# No mutex needed since this is only called when the pipeline is clear
|
||||
@output.do_close
|
||||
end
|
||||
|
||||
::LogStash::OutputDelegatorStrategyRegistry.instance.register(:single, self)
|
||||
end; end; end
|
|
@ -0,0 +1,36 @@
|
|||
module LogStash; class OutputDelegatorStrategyRegistry
|
||||
class InvalidStrategyError < StandardError; end
|
||||
|
||||
# This is generally used as a singleton
|
||||
# Except perhaps during testing
|
||||
def self.instance
|
||||
@instance ||= self.new
|
||||
end
|
||||
|
||||
def initialize()
|
||||
@map = {}
|
||||
end
|
||||
|
||||
def classes
|
||||
@map.values
|
||||
end
|
||||
|
||||
def types
|
||||
@map.keys
|
||||
end
|
||||
|
||||
def class_for(type)
|
||||
klass = @map[type]
|
||||
|
||||
if !klass
|
||||
raise InvalidStrategyError, "Could not find output delegator strategy of type '#{type}'. Valid strategies: #{@strategy_registry.types}"
|
||||
end
|
||||
|
||||
klass
|
||||
end
|
||||
|
||||
def register(type, klass)
|
||||
@map[type] = klass
|
||||
end
|
||||
|
||||
end; end
|
|
@ -20,42 +20,37 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
|
||||
# The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline.
|
||||
config :codec, :validate => :codec, :default => "plain"
|
||||
# TODO remove this in Logstash 6.0
|
||||
# when we no longer support the :legacy type
|
||||
# This is hacky, but it can only be herne
|
||||
config :workers, :type => :number, :default => 1
|
||||
|
||||
# Set or return concurrency type
|
||||
def self.concurrency(type=nil)
|
||||
if type
|
||||
@concurrency = type
|
||||
else
|
||||
@concurrency || :legacy # default is :legacyo
|
||||
end
|
||||
end
|
||||
|
||||
# The number of workers to use for this output.
|
||||
# Note that this setting may not be useful for all outputs.
|
||||
config :workers, :validate => :number, :default => 1
|
||||
|
||||
attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins, :workers_not_supported
|
||||
|
||||
# Deprecated: Favor `concurrency :shared`
|
||||
def self.declare_threadsafe!
|
||||
declare_workers_not_supported!
|
||||
@threadsafe = true
|
||||
concurrency :shared
|
||||
end
|
||||
|
||||
# Deprecated: Favor `#concurrency`
|
||||
def self.threadsafe?
|
||||
@threadsafe == true
|
||||
concurrency == :shared
|
||||
end
|
||||
|
||||
# Deprecated: Favor `concurrency :single`
|
||||
# Remove in Logstash 6.0.0
|
||||
def self.declare_workers_not_supported!(message=nil)
|
||||
@workers_not_supported_message = message
|
||||
@workers_not_supported = true
|
||||
end
|
||||
|
||||
def self.workers_not_supported_message
|
||||
@workers_not_supported_message
|
||||
end
|
||||
|
||||
def self.workers_not_supported?
|
||||
!!@workers_not_supported
|
||||
concurrency :single
|
||||
end
|
||||
|
||||
public
|
||||
# TODO: Remove this in the next major version after Logstash 2.x
|
||||
# Post 2.x it should raise an error and tell people to use the class level
|
||||
# declaration
|
||||
def workers_not_supported(message=nil)
|
||||
self.class.declare_workers_not_supported!(message)
|
||||
end
|
||||
|
||||
def self.plugin_type
|
||||
"output"
|
||||
|
@ -66,6 +61,10 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
super
|
||||
config_init(@params)
|
||||
|
||||
if self.workers != 1
|
||||
raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option"
|
||||
end
|
||||
|
||||
# If we're running with a single thread we must enforce single-threaded concurrency by default
|
||||
# Maybe in a future version we'll assume output plugins are threadsafe
|
||||
@single_worker_mutex = Mutex.new
|
||||
|
@ -87,6 +86,10 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
events.each {|event| receive(event) }
|
||||
end
|
||||
|
||||
def concurrency
|
||||
self.class.concurrency
|
||||
end
|
||||
|
||||
private
|
||||
def output?(event)
|
||||
# TODO: noop for now, remove this once we delete this call from all plugins
|
||||
|
|
|
@ -32,6 +32,7 @@ module LogStash; class Pipeline
|
|||
:started_at,
|
||||
:thread,
|
||||
:config_str,
|
||||
:config_hash,
|
||||
:settings,
|
||||
:metric,
|
||||
:filter_queue_client,
|
||||
|
@ -45,11 +46,18 @@ module LogStash; class Pipeline
|
|||
|
||||
def initialize(config_str, settings = LogStash::SETTINGS, namespaced_metric = nil)
|
||||
@config_str = config_str
|
||||
@config_hash = Digest::SHA1.hexdigest(@config_str)
|
||||
# Every time #plugin is invoked this is incremented to give each plugin
|
||||
# a unique id when auto-generating plugin ids
|
||||
@plugin_counter ||= 0
|
||||
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
@settings = settings
|
||||
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
|
||||
@reporter = LogStash::PipelineReporter.new(@logger, self)
|
||||
|
||||
# A list of plugins indexed by id
|
||||
@plugins_by_id = {}
|
||||
@inputs = nil
|
||||
@filters = nil
|
||||
@outputs = nil
|
||||
|
@ -387,23 +395,38 @@ module LogStash; class Pipeline
|
|||
end
|
||||
|
||||
def plugin(plugin_type, name, *args)
|
||||
args << {} if args.empty?
|
||||
@plugin_counter += 1
|
||||
|
||||
# Collapse the array of arguments into a single merged hash
|
||||
args = args.reduce({}, &:merge)
|
||||
|
||||
id = if args["id"].nil? || args["id"].empty?
|
||||
args["id"] = "#{@config_hash}-#{@plugin_counter}"
|
||||
else
|
||||
args["id"]
|
||||
end
|
||||
|
||||
raise LogStash::ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
|
||||
|
||||
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
|
||||
|
||||
klass = LogStash::Plugin.lookup(plugin_type, name)
|
||||
|
||||
if plugin_type == "output"
|
||||
LogStash::OutputDelegator.new(@logger, klass, @settings.get("pipeline.output.workers"), pipeline_scoped_metric.namespace(:outputs), *args)
|
||||
elsif plugin_type == "filter"
|
||||
LogStash::FilterDelegator.new(@logger, klass, pipeline_scoped_metric.namespace(:filters), *args)
|
||||
else
|
||||
new_plugin = klass.new(*args)
|
||||
inputs_metric = pipeline_scoped_metric.namespace(:inputs)
|
||||
namespaced_metric = inputs_metric.namespace(new_plugin.plugin_unique_name.to_sym)
|
||||
new_plugin.metric = namespaced_metric
|
||||
new_plugin
|
||||
end
|
||||
# Scope plugins of type 'input' to 'inputs'
|
||||
type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)
|
||||
plugin = if plugin_type == "output"
|
||||
OutputDelegator.new(@logger, klass, type_scoped_metric,
|
||||
{:strategy_registry => ::LogStash::OutputDelegatorStrategyRegistry.instance},
|
||||
args)
|
||||
elsif plugin_type == "filter"
|
||||
LogStash::FilterDelegator.new(@logger, klass, type_scoped_metric, args)
|
||||
else # input
|
||||
input_plugin = klass.new(args)
|
||||
input_plugin.metric = type_scoped_metric.namespace(id)
|
||||
input_plugin
|
||||
end
|
||||
|
||||
@plugins_by_id[id] = plugin
|
||||
end
|
||||
|
||||
# for backward compatibility in devutils for the rspec helpers, this method is not used
|
||||
|
|
|
@ -60,7 +60,6 @@ module LogStash; class PipelineReporter
|
|||
{
|
||||
:events_filtered => events_filtered,
|
||||
:events_consumed => events_consumed,
|
||||
:worker_count => pipeline.worker_threads.size,
|
||||
:inflight_count => inflight_count,
|
||||
:worker_states => worker_states_snap,
|
||||
:output_info => output_info,
|
||||
|
@ -100,15 +99,10 @@ module LogStash; class PipelineReporter
|
|||
|
||||
def output_info
|
||||
pipeline.outputs.map do |output_delegator|
|
||||
is_multi_worker = output_delegator.worker_count > 1
|
||||
|
||||
{
|
||||
:type => output_delegator.config_name,
|
||||
:config => output_delegator.config,
|
||||
:is_multi_worker => is_multi_worker,
|
||||
:events_received => output_delegator.events_received,
|
||||
:workers => output_delegator.workers,
|
||||
:busy_workers => output_delegator.busy_workers
|
||||
:plugin_args => output_delegator.plugin_args,
|
||||
:concurrency => output_delegator.concurrency,
|
||||
}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -45,8 +45,11 @@ class LogStash::Plugin
|
|||
self.class.name == other.class.name && @params == other.params
|
||||
end
|
||||
|
||||
def initialize(params=nil)
|
||||
def initialize(params=nil)
|
||||
@params = LogStash::Util.deep_clone(params)
|
||||
# The id should always be defined normally, but in tests that might not be the case
|
||||
# In the future we may make this more strict in the Plugin API
|
||||
@params["id"] ||= "#{self.class.config_name}_#{SecureRandom.uuid}"
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
end
|
||||
|
||||
|
@ -57,15 +60,7 @@ class LogStash::Plugin
|
|||
#
|
||||
# @return [String] A plugin ID
|
||||
def id
|
||||
(@params["id"].nil? || @params["id"].empty?) ? SecureRandom.uuid : @params["id"]
|
||||
end
|
||||
|
||||
# Return a unique_name, This is composed by the name of
|
||||
# the plugin and the generated ID (of the configured one)
|
||||
#
|
||||
# @return [String] a unique name
|
||||
def plugin_unique_name
|
||||
"#{config_name}_#{id}"
|
||||
@params["id"]
|
||||
end
|
||||
|
||||
# close is called during shutdown, after the plugin worker
|
||||
|
@ -127,6 +122,7 @@ class LogStash::Plugin
|
|||
LogStash::Registry.instance.lookup(type ,name) do |plugin_klass, plugin_name|
|
||||
is_a_plugin?(plugin_klass, plugin_name)
|
||||
end
|
||||
|
||||
rescue LoadError, NameError => e
|
||||
logger.debug("Problems loading the plugin with", :type => type, :name => name, :path => path)
|
||||
raise(LogStash::PluginLoadingError, I18n.t("logstash.pipeline.plugin-loading-error", :type => type, :name => name, :path => path, :error => e.to_s))
|
||||
|
|
|
@ -25,6 +25,17 @@ end
|
|||
describe "conditionals in output" do
|
||||
extend ConditionalFanciness
|
||||
|
||||
class DummyNullOutput < LogStash::Outputs::Base
|
||||
def register
|
||||
end
|
||||
def multi_receive(events)
|
||||
end
|
||||
end
|
||||
|
||||
before do
|
||||
LogStash::Registry.instance.register("logstash/outputs/dummynull", DummyNullOutput)
|
||||
end
|
||||
|
||||
describe "simple" do
|
||||
config <<-CONFIG
|
||||
input {
|
||||
|
@ -35,7 +46,7 @@ describe "conditionals in output" do
|
|||
}
|
||||
output {
|
||||
if [foo] == "bar" {
|
||||
stdout { }
|
||||
dummynull { }
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
|
|
@ -392,7 +392,10 @@ describe LogStash::Agent do
|
|||
|
||||
it "resets the metric collector" do
|
||||
# We know that the store has more events coming in.
|
||||
i = 0
|
||||
while dummy_output.events.size <= new_config_generator_counter
|
||||
i += 1
|
||||
raise "Waiting too long!" if i > 20
|
||||
sleep(0.1)
|
||||
end
|
||||
|
||||
|
|
|
@ -5,26 +5,25 @@ require 'spec_helper'
|
|||
describe LogStash::OutputDelegator do
|
||||
let(:logger) { double("logger") }
|
||||
let(:events) { 7.times.map { LogStash::Event.new }}
|
||||
let(:default_worker_count) { 1 }
|
||||
|
||||
subject { described_class.new(logger, out_klass, default_worker_count, LogStash::Instrument::NullMetric.new) }
|
||||
subject { described_class.new(logger, out_klass, LogStash::Instrument::NullMetric.new, {}, "id" => "foo") }
|
||||
|
||||
context "with a plain output plugin" do
|
||||
let(:out_klass) { double("output klass") }
|
||||
let(:out_inst) { double("output instance") }
|
||||
|
||||
let(:concurrency) { :single }
|
||||
|
||||
before(:each) do
|
||||
allow(out_klass).to receive(:new).with(any_args).and_return(out_inst)
|
||||
allow(out_klass).to receive(:threadsafe?).and_return(false)
|
||||
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
|
||||
allow(out_klass).to receive(:name).and_return("example")
|
||||
allow(out_klass).to receive(:concurrency).with(any_args).and_return concurrency
|
||||
allow(out_klass).to receive(:config_name)
|
||||
allow(out_inst).to receive(:register)
|
||||
allow(out_inst).to receive(:multi_receive)
|
||||
allow(out_inst).to receive(:metric=).with(any_args)
|
||||
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
|
||||
allow(out_inst).to receive(:plugin_unique_name).and_return("hello-123")
|
||||
|
||||
allow(subject.metric_events).to receive(:increment).with(any_args)
|
||||
allow(logger).to receive(:debug).with(any_args)
|
||||
end
|
||||
|
||||
|
@ -43,109 +42,80 @@ describe LogStash::OutputDelegator do
|
|||
end
|
||||
|
||||
it "should increment the number of events received" do
|
||||
expect(subject.events_received).to eql(events.length)
|
||||
expect(subject.metric_events).to have_received(:increment).with(:in, events.length)
|
||||
expect(subject.metric_events).to have_received(:increment).with(:out, events.length)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
describe "closing" do
|
||||
before do
|
||||
subject.register
|
||||
end
|
||||
|
||||
it "should register all workers on register" do
|
||||
it "should register the output plugin instance on register" do
|
||||
expect(out_inst).to have_received(:register)
|
||||
end
|
||||
|
||||
it "should close all workers when closing" do
|
||||
it "should close the output plugin instance when closing" do
|
||||
expect(out_inst).to receive(:do_close)
|
||||
subject.do_close
|
||||
end
|
||||
end
|
||||
|
||||
describe "concurrency and worker support" do
|
||||
before do
|
||||
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
|
||||
allow(out_inst).to receive(:metric=).with(any_args)
|
||||
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
|
||||
describe "concurrency strategies" do
|
||||
it "should have :single as the default" do
|
||||
expect(subject.concurrency).to eq :single
|
||||
end
|
||||
|
||||
describe "non-threadsafe outputs that allow workers" do
|
||||
let(:default_worker_count) { 3 }
|
||||
[
|
||||
[:shared, ::LogStash::OutputDelegatorStrategies::Shared],
|
||||
[:single, ::LogStash::OutputDelegatorStrategies::Single],
|
||||
[:legacy, ::LogStash::OutputDelegatorStrategies::Legacy],
|
||||
].each do |strategy_concurrency,klass|
|
||||
context "with strategy #{strategy_concurrency}" do
|
||||
let(:concurrency) { strategy_concurrency }
|
||||
|
||||
before do
|
||||
allow(out_klass).to receive(:threadsafe?).and_return(false)
|
||||
subject.register
|
||||
end
|
||||
it "should find the correct concurrency type for the output" do
|
||||
expect(subject.concurrency).to eq(strategy_concurrency)
|
||||
end
|
||||
|
||||
it "should find the correct Strategy class for the worker" do
|
||||
expect(subject.strategy).to be_a(klass)
|
||||
end
|
||||
|
||||
it "should instantiate multiple workers" do
|
||||
expect(subject.workers.length).to eql(default_worker_count)
|
||||
end
|
||||
[[:register], [:do_close], [:multi_receive, [[]] ] ].each do |method, args|
|
||||
context "strategy objects" do
|
||||
before do
|
||||
allow(subject.strategy).to receive(method)
|
||||
end
|
||||
|
||||
it "should delegate #{method} to the strategy" do
|
||||
subject.send(method, *args)
|
||||
if args
|
||||
expect(subject.strategy).to have_received(method).with(*args)
|
||||
else
|
||||
expect(subject.strategy).to have_received(method).with(no_args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "should send received events to the worker" do
|
||||
expect(out_inst).to receive(:multi_receive).with(events)
|
||||
subject.multi_receive(events)
|
||||
context "strategy output instances" do
|
||||
before do
|
||||
allow(out_inst).to receive(method)
|
||||
end
|
||||
|
||||
it "should delegate #{method} to the strategy" do
|
||||
subject.send(method, *args)
|
||||
if args
|
||||
expect(out_inst).to have_received(method).with(*args)
|
||||
else
|
||||
expect(out_inst).to have_received(method).with(no_args)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "threadsafe outputs" do
|
||||
before do
|
||||
allow(out_klass).to receive(:threadsafe?).and_return(true)
|
||||
subject.register
|
||||
end
|
||||
|
||||
it "should return true when threadsafe? is invoked" do
|
||||
expect(subject.threadsafe?).to eql(true)
|
||||
end
|
||||
|
||||
it "should define a threadsafe_worker" do
|
||||
expect(subject.send(:threadsafe_worker)).to eql(out_inst)
|
||||
end
|
||||
|
||||
it "should utilize threadsafe_multi_receive" do
|
||||
expect(subject.send(:threadsafe_worker)).to receive(:multi_receive).with(events)
|
||||
subject.multi_receive(events)
|
||||
end
|
||||
|
||||
it "should not utilize the worker queue" do
|
||||
expect(subject.send(:worker_queue)).to be_nil
|
||||
end
|
||||
|
||||
it "should send received events to the worker" do
|
||||
expect(out_inst).to receive(:multi_receive).with(events)
|
||||
subject.multi_receive(events)
|
||||
end
|
||||
|
||||
it "should close all workers when closing" do
|
||||
expect(out_inst).to receive(:do_close)
|
||||
subject.do_close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# This may seem suspiciously similar to the class in outputs/base_spec
|
||||
# but, in fact, we need a whole new class because using this even once
|
||||
# will immutably modify the base class
|
||||
class LogStash::Outputs::NOOPDelLegacyNoWorkers < ::LogStash::Outputs::Base
|
||||
LEGACY_WORKERS_NOT_SUPPORTED_REASON = "legacy reason"
|
||||
|
||||
def register
|
||||
workers_not_supported(LEGACY_WORKERS_NOT_SUPPORTED_REASON)
|
||||
end
|
||||
end
|
||||
|
||||
describe "legacy output workers_not_supported" do
|
||||
let(:default_worker_count) { 2 }
|
||||
let(:out_klass) { LogStash::Outputs::NOOPDelLegacyNoWorkers }
|
||||
|
||||
before(:each) do
|
||||
allow(logger).to receive(:debug).with(any_args)
|
||||
end
|
||||
|
||||
it "should only setup one worker" do
|
||||
subject.register
|
||||
expect(subject.worker_count).to eql(1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,9 +2,9 @@
|
|||
require "spec_helper"
|
||||
|
||||
# use a dummy NOOP output to test Outputs::Base
|
||||
class LogStash::Outputs::NOOP < LogStash::Outputs::Base
|
||||
class LogStash::Outputs::NOOPSingle < LogStash::Outputs::Base
|
||||
config_name "noop"
|
||||
milestone 2
|
||||
concurrency :single
|
||||
|
||||
config :dummy_option, :validate => :string
|
||||
|
||||
|
@ -15,26 +15,57 @@ class LogStash::Outputs::NOOP < LogStash::Outputs::Base
|
|||
end
|
||||
end
|
||||
|
||||
class LogStash::Outputs::NOOPLegacyNoWorkers < ::LogStash::Outputs::Base
|
||||
LEGACY_WORKERS_NOT_SUPPORTED_REASON = "legacy reason"
|
||||
|
||||
def register
|
||||
workers_not_supported(LEGACY_WORKERS_NOT_SUPPORTED_REASON)
|
||||
end
|
||||
class LogStash::Outputs::NOOPShared < ::LogStash::Outputs::Base
|
||||
concurrency :shared
|
||||
|
||||
def register; end
|
||||
end
|
||||
|
||||
class LogStash::Outputs::NOOPLegacy < ::LogStash::Outputs::Base
|
||||
def register; end
|
||||
end
|
||||
|
||||
|
||||
describe "LogStash::Outputs::Base#new" do
|
||||
it "should instantiate cleanly" do
|
||||
params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 }
|
||||
worker_params = params.dup; worker_params["workers"] = 1
|
||||
let(:params) { {} }
|
||||
subject(:instance) { klass.new(params.dup) }
|
||||
|
||||
expect do
|
||||
LogStash::Outputs::NOOP.new(params.dup)
|
||||
end.not_to raise_error
|
||||
context "single" do
|
||||
let(:klass) { LogStash::Outputs::NOOPSingle }
|
||||
|
||||
it "should instantiate cleanly" do
|
||||
params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 }
|
||||
worker_params = params.dup; worker_params["workers"] = 1
|
||||
|
||||
expect{ subject }.not_to raise_error
|
||||
end
|
||||
|
||||
it "should set concurrency correctly" do
|
||||
expect(subject.concurrency).to eq(:single)
|
||||
end
|
||||
end
|
||||
|
||||
it "should move workers_not_supported declarations up to the class level" do
|
||||
LogStash::Outputs::NOOPLegacyNoWorkers.new.register
|
||||
expect(LogStash::Outputs::NOOPLegacyNoWorkers.workers_not_supported?).to eql(true)
|
||||
context "shared" do
|
||||
let(:klass) { LogStash::Outputs::NOOPShared }
|
||||
|
||||
it "should set concurrency correctly" do
|
||||
expect(subject.concurrency).to eq(:shared)
|
||||
end
|
||||
end
|
||||
|
||||
context "legacy" do
|
||||
let(:klass) { LogStash::Outputs::NOOPLegacy }
|
||||
|
||||
it "should set concurrency correctly" do
|
||||
expect(subject.concurrency).to eq(:legacy)
|
||||
end
|
||||
|
||||
it "should default the # of workers to 1" do
|
||||
expect(subject.workers).to eq(1)
|
||||
end
|
||||
|
||||
it "should default concurrency to :legacy" do
|
||||
expect(subject.concurrency).to eq(:legacy)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -43,6 +43,7 @@ describe LogStash::PipelineReporter do
|
|||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_call_original
|
||||
|
||||
@pre_snapshot = reporter.snapshot
|
||||
|
||||
pipeline.run
|
||||
@post_snapshot = reporter.snapshot
|
||||
end
|
||||
|
@ -80,10 +81,4 @@ describe LogStash::PipelineReporter do
|
|||
expect(@post_snapshot.inflight_count).to eql(0)
|
||||
end
|
||||
end
|
||||
|
||||
describe "output states" do
|
||||
it "should include the count of received events" do
|
||||
expect(@post_snapshot.output_info.first[:events_received]).to eql(generator_count)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -235,31 +235,21 @@ describe LogStash::Pipeline do
|
|||
}
|
||||
|
||||
context "output close" do
|
||||
it "should call close of output without output-workers" do
|
||||
pipeline = TestPipeline.new(test_config_without_output_workers)
|
||||
pipeline.run
|
||||
let(:pipeline) { TestPipeline.new(test_config_without_output_workers) }
|
||||
let(:output) { pipeline.outputs.first }
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.workers.size ).to eq(::LogStash::SETTINGS.get("pipeline.output.workers"))
|
||||
expect(pipeline.outputs.first.workers.first.num_closes ).to eq(1)
|
||||
pipeline.shutdown
|
||||
before do
|
||||
allow(output).to receive(:do_close)
|
||||
end
|
||||
|
||||
it "should call output close correctly with output workers" do
|
||||
pipeline = TestPipeline.new(test_config_with_output_workers)
|
||||
after do
|
||||
pipeline.shutdown
|
||||
end
|
||||
|
||||
it "should call close of output without output-workers" do
|
||||
pipeline.run
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
# We even close the parent output worker, even though it doesn't receive messages
|
||||
|
||||
output_delegator = pipeline.outputs.first
|
||||
output = output_delegator.workers.first
|
||||
|
||||
expect(output.num_closes).to eq(1)
|
||||
output_delegator.workers.each do |plugin|
|
||||
expect(plugin.num_closes ).to eq(1)
|
||||
end
|
||||
pipeline.shutdown
|
||||
expect(output).to have_received(:do_close).once
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -604,7 +594,13 @@ describe LogStash::Pipeline do
|
|||
|
||||
Thread.new { subject.run }
|
||||
# make sure we have received all the generated events
|
||||
sleep 0.25 while dummyoutput.events.size < number_of_events
|
||||
|
||||
times = 0
|
||||
while dummyoutput.events.size < number_of_events
|
||||
times += 1
|
||||
sleep 0.25
|
||||
raise "Waited too long" if times > 4
|
||||
end
|
||||
end
|
||||
|
||||
after :each do
|
||||
|
@ -640,7 +636,7 @@ describe LogStash::Pipeline do
|
|||
end
|
||||
|
||||
it "populates the output metrics" do
|
||||
plugin_name = "dummyoutput_#{dummy_output_id}".to_sym
|
||||
plugin_name = dummy_output_id.to_sym
|
||||
expect(collected_metric[:stats][:pipelines][:main][:plugins][:outputs][plugin_name][:events][:out].value).to eq(number_of_events)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -236,7 +236,7 @@ describe LogStash::Plugin do
|
|||
end
|
||||
end
|
||||
|
||||
describe "#plugin_unique_name" do
|
||||
describe "#id" do
|
||||
let(:plugin) do
|
||||
Class.new(LogStash::Filters::Base,) do
|
||||
config_name "simple_plugin"
|
||||
|
@ -258,7 +258,7 @@ describe LogStash::Plugin do
|
|||
subject { plugin.new(config) }
|
||||
|
||||
it "return a human readable ID" do
|
||||
expect(subject.plugin_unique_name).to eq("simple_plugin_#{my_id}")
|
||||
expect(subject.id).to eq(my_id)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -266,7 +266,7 @@ describe LogStash::Plugin do
|
|||
subject { plugin.new(config) }
|
||||
|
||||
it "return a human readable ID" do
|
||||
expect(subject.plugin_unique_name).to match(/^simple_plugin_/)
|
||||
expect(subject.id).to match(/^simple_plugin_/)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue