mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
e390d105e4
commit
028576ba6f
20 changed files with 823 additions and 308 deletions
13
Gemfile
13
Gemfile
|
@ -4,17 +4,13 @@
|
|||
source "https://rubygems.org"
|
||||
gem "logstash-core", "3.0.0.dev", :path => "./logstash-core"
|
||||
gem "logstash-core-event", "3.0.0.dev", :path => "./logstash-core-event"
|
||||
# gem "logstash-core-event-java", "3.0.0.dev", :path => "./logstash-core-event-java"
|
||||
gem "file-dependencies", "0.1.6"
|
||||
gem "ci_reporter_rspec", "1.0.0", :group => :development
|
||||
gem "simplecov", :group => :development
|
||||
gem "coveralls", :group => :development
|
||||
# Tins 1.7 requires the ruby 2.0 platform to install,
|
||||
# this gem is a dependency of term-ansi-color which is a dependency of coveralls.
|
||||
# 1.6 is the last supported version on jruby.
|
||||
gem "tins", "1.6", :group => :development
|
||||
gem "rspec", "~> 3.1.0", :group => :development
|
||||
gem "logstash-devutils", "~> 0.0.15", :group => :development
|
||||
gem "logstash-devutils", ">= 0"
|
||||
gem "benchmark-ips", :group => :development
|
||||
gem "octokit", "3.8.0", :group => :build
|
||||
gem "stud", "~> 0.0.21", :group => :build
|
||||
|
@ -22,3 +18,10 @@ gem "fpm", "~> 1.3.3", :group => :build
|
|||
gem "rubyzip", "~> 1.1.7", :group => :build
|
||||
gem "gems", "~> 0.8.3", :group => :build
|
||||
gem "flores", "~> 0.0.6", :group => :development
|
||||
gem "logstash-filter-clone"
|
||||
gem "logstash-filter-mutate"
|
||||
gem "logstash-filter-multiline"
|
||||
gem "logstash-input-generator"
|
||||
gem "logstash-input-stdin"
|
||||
gem "logstash-input-tcp"
|
||||
gem "logstash-output-stdout"
|
||||
|
|
|
@ -13,6 +13,7 @@ PATH
|
|||
logstash-core-event (~> 3.0.0.dev)
|
||||
minitar (~> 0.5.4)
|
||||
pry (~> 0.10.1)
|
||||
rubyzip (~> 1.1.7)
|
||||
stud (~> 0.0.19)
|
||||
thread_safe (~> 0.3.5)
|
||||
treetop (< 1.5.0)
|
||||
|
@ -74,10 +75,21 @@ GEM
|
|||
domain_name (~> 0.5)
|
||||
i18n (0.6.9)
|
||||
insist (1.0.0)
|
||||
jls-grok (0.11.2)
|
||||
cabin (>= 0.6.0)
|
||||
jrjackson (0.3.7)
|
||||
jruby-openssl (0.9.12-java)
|
||||
json (1.8.3-java)
|
||||
kramdown (1.9.0)
|
||||
logstash-codec-json (2.0.2)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-codec-json_lines (2.0.2)
|
||||
logstash-codec-line
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-codec-line (2.0.2)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-codec-plain (2.0.2)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-devutils (0.0.18-java)
|
||||
gem_publisher
|
||||
insist (= 1.0.0)
|
||||
|
@ -87,6 +99,42 @@ GEM
|
|||
rspec (~> 3.1.0)
|
||||
rspec-wait
|
||||
stud (>= 0.0.20)
|
||||
logstash-filter-clone (2.0.3)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-filter-grok (2.0.2)
|
||||
jls-grok (~> 0.11.1)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-patterns-core
|
||||
logstash-filter-multiline (2.0.2)
|
||||
jls-grok (~> 0.11.0)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-filter-mutate
|
||||
logstash-patterns-core
|
||||
logstash-filter-mutate (2.0.2)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-filter-grok
|
||||
logstash-patterns-core
|
||||
logstash-input-generator (2.0.2)
|
||||
logstash-codec-plain
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-input-stdin (2.0.2)
|
||||
concurrent-ruby
|
||||
logstash-codec-json
|
||||
logstash-codec-json_lines
|
||||
logstash-codec-line
|
||||
logstash-codec-plain
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-input-tcp (2.0.4)
|
||||
logstash-codec-json
|
||||
logstash-codec-json_lines
|
||||
logstash-codec-line
|
||||
logstash-codec-plain
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-output-stdout (2.0.2)
|
||||
logstash-codec-line
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
logstash-patterns-core (2.0.2)
|
||||
logstash-core (>= 2.0.0.beta2, < 3.0.0)
|
||||
method_source (0.8.2)
|
||||
mime-types (2.6.2)
|
||||
minitar (0.5.4)
|
||||
|
@ -155,7 +203,14 @@ DEPENDENCIES
|
|||
gems (~> 0.8.3)
|
||||
logstash-core (= 3.0.0.dev)!
|
||||
logstash-core-event (= 3.0.0.dev)!
|
||||
logstash-devutils (~> 0.0.15)
|
||||
logstash-devutils
|
||||
logstash-filter-clone
|
||||
logstash-filter-multiline
|
||||
logstash-filter-mutate
|
||||
logstash-input-generator
|
||||
logstash-input-stdin
|
||||
logstash-input-tcp
|
||||
logstash-output-stdout
|
||||
octokit (= 3.8.0)
|
||||
rspec (~> 3.1.0)
|
||||
rubyzip (~> 1.1.7)
|
||||
|
|
|
@ -33,7 +33,7 @@ class LogStash::Agent
|
|||
end # def execute
|
||||
|
||||
def add_pipeline(pipeline_id, config_str, settings = {})
|
||||
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings)
|
||||
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings.merge(:pipeline_id => pipeline_id))
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -76,4 +76,4 @@ class LogStash::Agent
|
|||
end
|
||||
end
|
||||
end
|
||||
end # class LogStash::Agent
|
||||
end # class LogStash::Agent
|
|
@ -108,6 +108,7 @@ module LogStash; module Config; module AST
|
|||
# defines @filter_func and @output_func
|
||||
|
||||
definitions << "def #{type}_func(event)"
|
||||
definitions << " targeted_outputs = []" if type == "output"
|
||||
definitions << " events = [event]" if type == "filter"
|
||||
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"
|
||||
|
||||
|
@ -116,6 +117,7 @@ module LogStash; module Config; module AST
|
|||
end
|
||||
|
||||
definitions << " events" if type == "filter"
|
||||
definitions << " targeted_outputs" if type == "output"
|
||||
definitions << "end"
|
||||
end
|
||||
|
||||
|
@ -237,7 +239,7 @@ module LogStash; module Config; module AST
|
|||
events = #{variable_name}.multi_filter(events)
|
||||
CODE
|
||||
when "output"
|
||||
return "#{variable_name}.handle(event)\n"
|
||||
return "targeted_outputs << #{variable_name}\n"
|
||||
when "codec"
|
||||
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
|
||||
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
|
||||
|
@ -402,7 +404,7 @@ module LogStash; module Config; module AST
|
|||
<<-CODE
|
||||
events = cond_func_#{i}(events)
|
||||
CODE
|
||||
else
|
||||
else # Output
|
||||
<<-CODE
|
||||
#{super}
|
||||
end
|
||||
|
@ -542,4 +544,4 @@ class Treetop::Runtime::SyntaxNode
|
|||
""
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
68
logstash-core/lib/logstash/output_delegator.rb
Normal file
68
logstash-core/lib/logstash/output_delegator.rb
Normal file
|
@ -0,0 +1,68 @@
|
|||
# encoding: utf-8
|
||||
require "concurrent/atomic/atomic_fixnum"
|
||||
|
||||
# This class goes hand in hand with the pipeline to provide a pool of
|
||||
# free workers to be used by pipeline worker threads. The pool is
|
||||
# internally represented with a SizedQueue set the the size of the number
|
||||
# of 'workers' the output plugin is configured with.
|
||||
#
|
||||
# This plugin also records some basic statistics
|
||||
module LogStash; class OutputDelegator
|
||||
attr_reader :workers, :config, :worker_count
|
||||
|
||||
# The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them
|
||||
# Internally these just get merged together into a single hash
|
||||
def initialize(logger, klass, *args)
|
||||
@logger = logger
|
||||
@config = args.reduce({}, :merge)
|
||||
@klass = klass
|
||||
@worker_count = @config["workers"] || 1
|
||||
|
||||
@worker_queue = SizedQueue.new(@worker_count)
|
||||
|
||||
@workers = @worker_count.times.map do
|
||||
w = @klass.new(*args)
|
||||
w.register
|
||||
@worker_queue << w
|
||||
w
|
||||
end
|
||||
|
||||
@events_received = Concurrent::AtomicFixnum.new(0)
|
||||
end
|
||||
|
||||
def config_name
|
||||
@klass.config_name
|
||||
end
|
||||
|
||||
def register
|
||||
@workers.each {|w| w.register}
|
||||
end
|
||||
|
||||
def multi_receive(events)
|
||||
@events_received.increment(events.length)
|
||||
|
||||
worker = @worker_queue.pop
|
||||
begin
|
||||
worker.multi_receive(events)
|
||||
ensure
|
||||
@worker_queue.push(worker)
|
||||
end
|
||||
end
|
||||
|
||||
def do_close
|
||||
@logger.debug("closing output delegator", :klass => self)
|
||||
|
||||
@worker_count.times do
|
||||
worker = @worker_queue.pop
|
||||
worker.do_close
|
||||
end
|
||||
end
|
||||
|
||||
def events_received
|
||||
@events_received.value
|
||||
end
|
||||
|
||||
def busy_workers
|
||||
@worker_queue.size
|
||||
end
|
||||
end end
|
|
@ -4,6 +4,8 @@ require "logstash/logging"
|
|||
require "logstash/plugin"
|
||||
require "logstash/namespace"
|
||||
require "logstash/config/mixin"
|
||||
require "logstash/util/wrapped_synchronous_queue"
|
||||
require "concurrent/atomic/atomic_fixnum"
|
||||
|
||||
class LogStash::Outputs::Base < LogStash::Plugin
|
||||
include LogStash::Config::Mixin
|
||||
|
@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
# Note that this setting may not be useful for all outputs.
|
||||
config :workers, :validate => :number, :default => 1
|
||||
|
||||
attr_reader :worker_plugins, :worker_queue, :worker_threads
|
||||
attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins
|
||||
|
||||
public
|
||||
def workers_not_supported(message=nil)
|
||||
|
@ -40,6 +42,10 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
def initialize(params={})
|
||||
super
|
||||
config_init(params)
|
||||
|
||||
# If we're running with a single thread we must enforce single-threaded concurrency by default
|
||||
# Maybe in a future version we'll assume output plugins are threadsafe
|
||||
@single_worker_mutex = Mutex.new
|
||||
end
|
||||
|
||||
public
|
||||
|
@ -53,37 +59,9 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
end # def receive
|
||||
|
||||
public
|
||||
def worker_setup
|
||||
if @workers == 1
|
||||
@worker_plugins = [self]
|
||||
@worker_threads = []
|
||||
else
|
||||
define_singleton_method(:handle, method(:handle_worker))
|
||||
@worker_queue = SizedQueue.new(20)
|
||||
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
|
||||
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
|
||||
Thread.new(original_params, @worker_queue) do |params, queue|
|
||||
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
|
||||
LogStash::Util.set_thread_plugin(self)
|
||||
plugin.register
|
||||
while true
|
||||
event = queue.pop
|
||||
plugin.handle(event)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def handle(event)
|
||||
LogStash::Util.set_thread_plugin(self)
|
||||
receive(event)
|
||||
end # def handle
|
||||
|
||||
def handle_worker(event)
|
||||
LogStash::Util.set_thread_plugin(self)
|
||||
@worker_queue.push(event)
|
||||
# To be overriden in implementations
|
||||
def multi_receive(events)
|
||||
events.each {|event| receive(event) }
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -91,4 +69,4 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
# TODO: noop for now, remove this once we delete this call from all plugins
|
||||
true
|
||||
end # def output?
|
||||
end # class LogStash::Outputs::Base
|
||||
end # class LogStash::Outputs::Base
|
|
@ -12,20 +12,35 @@ require "logstash/outputs/base"
|
|||
require "logstash/config/cpu_core_strategy"
|
||||
require "logstash/util/defaults_printer"
|
||||
require "logstash/shutdown_controller"
|
||||
require "logstash/util/wrapped_synchronous_queue"
|
||||
require "logstash/pipeline_reporter"
|
||||
require "concurrent/timer_task"
|
||||
require "logstash/output_delegator"
|
||||
|
||||
module LogStash; class Pipeline
|
||||
attr_reader :inputs, :filters, :outputs, :input_to_filter, :filter_to_output
|
||||
attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id
|
||||
|
||||
DEFAULT_SETTINGS = {
|
||||
:default_pipeline_workers => LogStash::Config::CpuCoreStrategy.fifty_percent,
|
||||
:pipeline_batch_size => 125,
|
||||
:pipeline_batch_delay => 5, # in milliseconds
|
||||
:flush_interval => 5, # in seconds
|
||||
:flush_timeout_interval => 60 # in seconds
|
||||
}
|
||||
|
||||
def initialize(config_str, settings = {})
|
||||
@pipeline_id = settings[:pipeline_id] || self.object_id
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
@reporter = LogStash::PipelineReporter.new(@logger, self)
|
||||
|
||||
@inputs = nil
|
||||
@filters = nil
|
||||
@outputs = nil
|
||||
|
||||
@worker_threads = []
|
||||
|
||||
grammar = LogStashConfigParser.new
|
||||
@config = grammar.parse(config_str)
|
||||
|
||||
if @config.nil?
|
||||
raise LogStash::ConfigurationError, grammar.failure_reason
|
||||
end
|
||||
|
@ -42,18 +57,22 @@ module LogStash; class Pipeline
|
|||
raise
|
||||
end
|
||||
|
||||
@input_to_filter = SizedQueue.new(20)
|
||||
# if no filters, pipe inputs directly to outputs
|
||||
@filter_to_output = filters? ? SizedQueue.new(20) : @input_to_filter
|
||||
|
||||
@settings = {
|
||||
"default-filter-workers" => LogStash::Config::CpuCoreStrategy.fifty_percent
|
||||
}
|
||||
@input_queue = LogStash::Util::WrappedSynchronousQueue.new
|
||||
@events_filtered = Concurrent::AtomicFixnum.new(0)
|
||||
@events_consumed = Concurrent::AtomicFixnum.new(0)
|
||||
|
||||
# We generally only want one thread at a time able to access pop/take/poll operations
|
||||
# from this queue. We also depend on this to be able to block consumers while we snapshot
|
||||
# in-flight buffers
|
||||
@input_queue_pop_mutex = Mutex.new
|
||||
@input_threads = []
|
||||
@settings = DEFAULT_SETTINGS.clone
|
||||
# @ready requires thread safety since it is typically polled from outside the pipeline thread
|
||||
@ready = Concurrent::AtomicBoolean.new(false)
|
||||
@input_threads = []
|
||||
@flushing = Concurrent::AtomicReference.new(false)
|
||||
settings.each {|setting, value| configure(setting, value) }
|
||||
|
||||
start_flusher
|
||||
end # def initialize
|
||||
|
||||
def ready?
|
||||
|
@ -64,24 +83,29 @@ module LogStash; class Pipeline
|
|||
@settings[setting] = value
|
||||
end
|
||||
|
||||
def safe_filter_worker_count
|
||||
default = @settings["default-filter-workers"]
|
||||
thread_count = @settings["filter-workers"] #override from args "-w 8" or config
|
||||
def safe_pipeline_worker_count
|
||||
default = DEFAULT_SETTINGS[:default_pipeline_workers]
|
||||
thread_count = @settings[:pipeline_workers] #override from args "-w 8" or config
|
||||
safe_filters, unsafe_filters = @filters.partition(&:threadsafe?)
|
||||
|
||||
if unsafe_filters.any?
|
||||
plugins = unsafe_filters.collect { |f| f.class.config_name }
|
||||
case thread_count
|
||||
when nil
|
||||
# user did not specify a worker thread count
|
||||
# warn if the default is multiple
|
||||
@logger.warn("Defaulting filter worker threads to 1 because there are some filters that might not work with multiple worker threads",
|
||||
:count_was => default, :filters => plugins) if default > 1
|
||||
|
||||
if default > 1
|
||||
@logger.warn("Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads",
|
||||
:count_was => default, :filters => plugins)
|
||||
end
|
||||
|
||||
1 # can't allow the default value to propagate if there are unsafe filters
|
||||
when 0, 1
|
||||
1
|
||||
else
|
||||
@logger.warn("Warning: Manual override - there are filters that might not work with multiple worker threads",
|
||||
:worker_threads => thread_count, :filters => plugins)
|
||||
:worker_threads => thread_count, :filters => plugins)
|
||||
thread_count # allow user to force this even if there are unsafe filters
|
||||
end
|
||||
else
|
||||
|
@ -94,31 +118,21 @@ module LogStash; class Pipeline
|
|||
end
|
||||
|
||||
def run
|
||||
LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
|
||||
@logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings))
|
||||
|
||||
begin
|
||||
start_inputs
|
||||
start_filters if filters?
|
||||
start_outputs
|
||||
ensure
|
||||
# it is important to garantee @ready to be true after the startup sequence has been completed
|
||||
# to potentially unblock the shutdown method which may be waiting on @ready to proceed
|
||||
@ready.make_true
|
||||
end
|
||||
start_workers
|
||||
|
||||
@logger.info("Pipeline started")
|
||||
@logger.terminal("Logstash startup completed")
|
||||
|
||||
# Block until all inputs have stopped
|
||||
# Generally this happens if SIGINT is sent and `shutdown` is called from an external thread
|
||||
wait_inputs
|
||||
@logger.info("Input plugins stopped! Will shutdown filter/output workers.")
|
||||
|
||||
if filters?
|
||||
shutdown_filters
|
||||
wait_filters
|
||||
flush_filters_to_output!(:final => true)
|
||||
end
|
||||
|
||||
shutdown_outputs
|
||||
wait_outputs
|
||||
shutdown_flusher
|
||||
shutdown_workers
|
||||
|
||||
@logger.info("Pipeline shutdown complete.")
|
||||
@logger.terminal("Logstash shutdown completed")
|
||||
|
@ -127,29 +141,142 @@ module LogStash; class Pipeline
|
|||
return 0
|
||||
end # def run
|
||||
|
||||
def start_workers
|
||||
@inflight_batches = {}
|
||||
|
||||
@worker_threads.clear # In case we're restarting the pipeline
|
||||
begin
|
||||
start_inputs
|
||||
@outputs.each {|o| o.register }
|
||||
@filters.each {|f| f.register}
|
||||
|
||||
pipeline_workers = safe_pipeline_worker_count
|
||||
batch_size = @settings[:pipeline_batch_size]
|
||||
batch_delay = @settings[:pipeline_batch_delay]
|
||||
@logger.info("Starting pipeline",
|
||||
:id => self.pipeline_id,
|
||||
:pipeline_workers => pipeline_workers,
|
||||
:batch_size => batch_size,
|
||||
:batch_delay => batch_delay)
|
||||
|
||||
pipeline_workers.times do |t|
|
||||
@worker_threads << Thread.new do
|
||||
LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
|
||||
worker_loop(batch_size, batch_delay)
|
||||
end
|
||||
end
|
||||
ensure
|
||||
# it is important to garantee @ready to be true after the startup sequence has been completed
|
||||
# to potentially unblock the shutdown method which may be waiting on @ready to proceed
|
||||
@ready.make_true
|
||||
end
|
||||
end
|
||||
|
||||
# Main body of what a worker thread does
|
||||
# Repeatedly takes batches off the queu, filters, then outputs them
|
||||
def worker_loop(batch_size, batch_delay)
|
||||
running = true
|
||||
|
||||
while running
|
||||
# To understand the purpose behind this synchronize please read the body of take_batch
|
||||
input_batch, signal = @input_queue_pop_mutex.synchronize { take_batch(batch_size, batch_delay) }
|
||||
running = false if signal == LogStash::SHUTDOWN
|
||||
|
||||
@events_consumed.increment(input_batch.size)
|
||||
|
||||
filtered_batch = filter_batch(input_batch)
|
||||
|
||||
if signal # Flush on SHUTDOWN or FLUSH
|
||||
flush_options = (signal == LogStash::SHUTDOWN) ? {:final => true} : {}
|
||||
flush_filters_to_batch(filtered_batch, flush_options)
|
||||
end
|
||||
|
||||
@events_filtered.increment(filtered_batch.size)
|
||||
|
||||
output_batch(filtered_batch)
|
||||
|
||||
inflight_batches_synchronize { set_current_thread_inflight_batch(nil) }
|
||||
end
|
||||
end
|
||||
|
||||
def take_batch(batch_size, batch_delay)
|
||||
batch = []
|
||||
# Since this is externally synchronized in `worker_look` wec can guarantee that the visibility of an insight batch
|
||||
# guaranteed to be a full batch not a partial batch
|
||||
set_current_thread_inflight_batch(batch)
|
||||
|
||||
signal = false
|
||||
batch_size.times do |t|
|
||||
event = (t == 0) ? @input_queue.take : @input_queue.poll(batch_delay)
|
||||
|
||||
if event.nil?
|
||||
next
|
||||
elsif event == LogStash::SHUTDOWN || event == LogStash::FLUSH
|
||||
# We MUST break here. If a batch consumes two SHUTDOWN events
|
||||
# then another worker may have its SHUTDOWN 'stolen', thus blocking
|
||||
# the pipeline. We should stop doing work after flush as well.
|
||||
signal = event
|
||||
break
|
||||
else
|
||||
batch << event
|
||||
end
|
||||
end
|
||||
|
||||
[batch, signal]
|
||||
end
|
||||
|
||||
def filter_batch(batch)
|
||||
batch.reduce([]) do |acc,e|
|
||||
if e.is_a?(LogStash::Event)
|
||||
filtered = filter_func(e)
|
||||
filtered.each {|fe| acc << fe unless fe.cancelled?}
|
||||
end
|
||||
acc
|
||||
end
|
||||
rescue Exception => e
|
||||
# Plugins authors should manage their own exceptions in the plugin code
|
||||
# but if an exception is raised up to the worker thread they are considered
|
||||
# fatal and logstash will not recover from this situation.
|
||||
#
|
||||
# Users need to check their configuration or see if there is a bug in the
|
||||
# plugin.
|
||||
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
|
||||
"exception" => e, "backtrace" => e.backtrace)
|
||||
raise
|
||||
end
|
||||
|
||||
# Take an array of events and send them to the correct output
|
||||
def output_batch(batch)
|
||||
# Build a mapping of { output_plugin => [events...]}
|
||||
outputs_events = batch.reduce(Hash.new { |h, k| h[k] = [] }) do |acc, event|
|
||||
# We ask the AST to tell us which outputs to send each event to
|
||||
# Then, we stick it in the correct bin
|
||||
output_func(event).each do |output|
|
||||
acc[output] << event
|
||||
end
|
||||
acc
|
||||
end
|
||||
# Now that we have our output to event mapping we can just invoke each output
|
||||
# once with its list of events
|
||||
outputs_events.each do |output, events|
|
||||
output.multi_receive(events)
|
||||
end
|
||||
end
|
||||
|
||||
def set_current_thread_inflight_batch(batch)
|
||||
@inflight_batches[Thread.current] = batch
|
||||
end
|
||||
|
||||
def inflight_batches_synchronize
|
||||
@input_queue_pop_mutex.synchronize do
|
||||
yield(@inflight_batches)
|
||||
end
|
||||
end
|
||||
|
||||
def wait_inputs
|
||||
@input_threads.each(&:join)
|
||||
end
|
||||
|
||||
def shutdown_filters
|
||||
@flusher_thread.kill
|
||||
@input_to_filter.push(LogStash::SHUTDOWN)
|
||||
end
|
||||
|
||||
def wait_filters
|
||||
@filter_threads.each(&:join) if @filter_threads
|
||||
end
|
||||
|
||||
def shutdown_outputs
|
||||
# nothing, filters will do this
|
||||
@filter_to_output.push(LogStash::SHUTDOWN)
|
||||
end
|
||||
|
||||
def wait_outputs
|
||||
# Wait for the outputs to stop
|
||||
@output_threads.each(&:join)
|
||||
end
|
||||
|
||||
def start_inputs
|
||||
moreinputs = []
|
||||
@inputs.each do |input|
|
||||
|
@ -167,45 +294,15 @@ module LogStash; class Pipeline
|
|||
end
|
||||
end
|
||||
|
||||
def start_filters
|
||||
@filters.each(&:register)
|
||||
# dynamically get thread count based on filter threadsafety
|
||||
# moved this test to here to allow for future config reloading
|
||||
to_start = safe_filter_worker_count
|
||||
@filter_threads = to_start.times.collect do |i|
|
||||
Thread.new do
|
||||
LogStash::Util.set_thread_name("|filterworker.#{i}")
|
||||
filterworker
|
||||
end
|
||||
end
|
||||
actually_started = @filter_threads.select(&:alive?).size
|
||||
msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}"
|
||||
if actually_started < to_start
|
||||
@logger.warn(msg)
|
||||
else
|
||||
@logger.info(msg)
|
||||
end
|
||||
@flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } }
|
||||
end
|
||||
|
||||
def start_outputs
|
||||
@outputs.each(&:register)
|
||||
@output_threads = [
|
||||
Thread.new { outputworker }
|
||||
]
|
||||
end
|
||||
|
||||
def start_input(plugin)
|
||||
@input_threads << Thread.new { inputworker(plugin) }
|
||||
end
|
||||
|
||||
def inputworker(plugin)
|
||||
LogStash::Util.set_thread_name("<#{plugin.class.config_name}")
|
||||
LogStash::Util.set_thread_plugin(plugin)
|
||||
LogStash::Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
|
||||
begin
|
||||
plugin.run(@input_to_filter)
|
||||
plugin.run(@input_queue)
|
||||
rescue => e
|
||||
# if plugin is stopping, ignore uncatched exceptions and exit worker
|
||||
if plugin.stop?
|
||||
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
|
||||
:plugin => plugin.class.config_name, :exception => e,
|
||||
|
@ -233,56 +330,6 @@ module LogStash; class Pipeline
|
|||
end
|
||||
end # def inputworker
|
||||
|
||||
def filterworker
|
||||
begin
|
||||
while true
|
||||
event = @input_to_filter.pop
|
||||
|
||||
case event
|
||||
when LogStash::Event
|
||||
# filter_func returns all filtered events, including cancelled ones
|
||||
filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? }
|
||||
when LogStash::FlushEvent
|
||||
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
|
||||
# don't have to deal with thread safety implementing the flush method
|
||||
flush_filters_to_output!
|
||||
when LogStash::ShutdownEvent
|
||||
# pass it down to any other filterworker and stop this worker
|
||||
@input_to_filter.push(event)
|
||||
break
|
||||
end
|
||||
end
|
||||
rescue Exception => e
|
||||
# Plugins authors should manage their own exceptions in the plugin code
|
||||
# but if an exception is raised up to the worker thread they are considered
|
||||
# fatal and logstash will not recover from this situation.
|
||||
#
|
||||
# Users need to check their configuration or see if there is a bug in the
|
||||
# plugin.
|
||||
@logger.error("Exception in filterworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
|
||||
"exception" => e, "backtrace" => e.backtrace)
|
||||
raise
|
||||
ensure
|
||||
@filters.each(&:do_close)
|
||||
end
|
||||
end # def filterworker
|
||||
|
||||
def outputworker
|
||||
LogStash::Util.set_thread_name(">output")
|
||||
@outputs.each(&:worker_setup)
|
||||
|
||||
while true
|
||||
event = @filter_to_output.pop
|
||||
break if event == LogStash::SHUTDOWN
|
||||
output_func(event)
|
||||
LogStash::Util.set_thread_plugin(nil)
|
||||
end
|
||||
ensure
|
||||
@outputs.each do |output|
|
||||
output.worker_plugins.each(&:do_close)
|
||||
end
|
||||
end # def outputworker
|
||||
|
||||
# initiate the pipeline shutdown sequence
|
||||
# this method is intended to be called from outside the pipeline thread
|
||||
# @param before_stop [Proc] code block called before performing stop operation on input plugins
|
||||
|
@ -296,13 +343,40 @@ module LogStash; class Pipeline
|
|||
|
||||
before_stop.call if block_given?
|
||||
|
||||
@logger.info "Closing inputs"
|
||||
@inputs.each(&:do_stop)
|
||||
@logger.info "Closed inputs"
|
||||
end # def shutdown
|
||||
|
||||
# After `shutdown` is called from an external thread this is called from the main thread to
|
||||
# tell the worker threads to stop and then block until they've fully stopped
|
||||
# This also stops all filter and output plugins
|
||||
def shutdown_workers
|
||||
# Each worker thread will receive this exactly once!
|
||||
@worker_threads.each do |t|
|
||||
@logger.debug("Pushing shutdown", :thread => t)
|
||||
@input_queue.push(LogStash::SHUTDOWN)
|
||||
end
|
||||
|
||||
@worker_threads.each do |t|
|
||||
@logger.debug("Shutdown waiting for worker thread #{t}")
|
||||
t.join
|
||||
end
|
||||
|
||||
@filters.each(&:do_close)
|
||||
@outputs.each(&:do_close)
|
||||
end
|
||||
|
||||
def plugin(plugin_type, name, *args)
|
||||
args << {} if args.empty?
|
||||
|
||||
klass = LogStash::Plugin.lookup(plugin_type, name)
|
||||
return klass.new(*args)
|
||||
|
||||
if plugin_type == "output"
|
||||
LogStash::OutputDelegator.new(@logger, klass, *args)
|
||||
else
|
||||
klass.new(*args)
|
||||
end
|
||||
end
|
||||
|
||||
# for backward compatibility in devutils for the rspec helpers, this method is not used
|
||||
|
@ -312,6 +386,7 @@ module LogStash; class Pipeline
|
|||
filter_func(event).each { |e| block.call(e) }
|
||||
end
|
||||
|
||||
|
||||
# perform filters flush and yeild flushed event to the passed block
|
||||
# @param options [Hash]
|
||||
# @option options [Boolean] :final => true to signal a final shutdown flush
|
||||
|
@ -323,61 +398,70 @@ module LogStash; class Pipeline
|
|||
end
|
||||
end
|
||||
|
||||
class FlusherObserver
|
||||
def initialize(logger, flushing)
|
||||
@logger = logger
|
||||
@flushing = flushing
|
||||
end
|
||||
|
||||
def update(time, result, exception)
|
||||
# This is a safeguard in the event that the timer task somehow times out
|
||||
# We still need to call it in the original (in case someone decides to call it directly)
|
||||
# but this is the safeguard for timer related issues causing @flushing not to be reset to false
|
||||
@flushing.set(false)
|
||||
|
||||
return unless exception
|
||||
@logger.warn("Error during flush!",
|
||||
:message => exception.message,
|
||||
:class => exception.class.name,
|
||||
:backtrace => exception.backtrace)
|
||||
end
|
||||
end
|
||||
|
||||
def start_flusher
|
||||
@flusher_task = Concurrent::TimerTask.new { flush }
|
||||
@flusher_task.execution_interval = @settings[:flush_interval]
|
||||
@flusher_task.timeout_interval = @settings[:flush_timeout_interval]
|
||||
@flusher_task.add_observer(FlusherObserver.new(@logger, @flushing))
|
||||
@flusher_task.execute
|
||||
end
|
||||
|
||||
def shutdown_flusher
|
||||
@flusher_task.shutdown
|
||||
end
|
||||
|
||||
def flush
|
||||
if @flushing.compare_and_set(false, true)
|
||||
@logger.debug? && @logger.debug("Pushing flush onto pipeline")
|
||||
@input_queue.push(LogStash::FLUSH)
|
||||
end
|
||||
end
|
||||
|
||||
# perform filters flush into the output queue
|
||||
# @param options [Hash]
|
||||
# @option options [Boolean] :final => true to signal a final shutdown flush
|
||||
def flush_filters_to_output!(options = {})
|
||||
def flush_filters_to_batch(batch, options = {})
|
||||
flush_filters(options) do |event|
|
||||
unless event.cancelled?
|
||||
@logger.debug? and @logger.debug("Pushing flushed events", :event => event)
|
||||
@filter_to_output.push(event)
|
||||
batch << event
|
||||
end
|
||||
end
|
||||
|
||||
@flushing.set(false)
|
||||
end # flush_filters_to_output!
|
||||
|
||||
def inflight_count
|
||||
data = {}
|
||||
total = 0
|
||||
|
||||
input_to_filter = @input_to_filter.size
|
||||
total += input_to_filter
|
||||
filter_to_output = @filter_to_output.size
|
||||
total += filter_to_output
|
||||
|
||||
data["input_to_filter"] = input_to_filter if input_to_filter > 0
|
||||
data["filter_to_output"] = filter_to_output if filter_to_output > 0
|
||||
|
||||
output_worker_queues = []
|
||||
@outputs.each do |output|
|
||||
next unless output.worker_queue && output.worker_queue.size > 0
|
||||
plugin_info = output.debug_info
|
||||
size = output.worker_queue.size
|
||||
total += size
|
||||
plugin_info << size
|
||||
output_worker_queues << plugin_info
|
||||
end
|
||||
data["output_worker_queues"] = output_worker_queues unless output_worker_queues.empty?
|
||||
data["total"] = total
|
||||
data
|
||||
def plugin_threads_info
|
||||
input_threads = @input_threads.select {|t| t.alive? }
|
||||
worker_threads = @worker_threads.select {|t| t.alive? }
|
||||
(input_threads + worker_threads).map {|t| LogStash::Util.thread_info(t) }
|
||||
end
|
||||
|
||||
def stalling_threads
|
||||
plugin_threads
|
||||
.reject {|t| t["blocked_on"] } # known begnin blocking statuses
|
||||
.each {|t| t.delete("backtrace") }
|
||||
.each {|t| t.delete("blocked_on") }
|
||||
.each {|t| t.delete("status") }
|
||||
def stalling_threads_info
|
||||
plugin_threads_info
|
||||
.reject {|t| t["blocked_on"] } # known benign blocking statuses
|
||||
.each {|t| t.delete("backtrace") }
|
||||
.each {|t| t.delete("blocked_on") }
|
||||
.each {|t| t.delete("status") }
|
||||
end
|
||||
|
||||
def plugin_threads
|
||||
input_threads = @input_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
|
||||
filter_threads = @filter_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
|
||||
output_threads = @output_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
|
||||
output_worker_threads = @outputs.flat_map {|output| output.worker_threads }.map {|t| thread_info(t) }
|
||||
input_threads + filter_threads + output_threads + output_worker_threads
|
||||
end
|
||||
|
||||
def thread_info(thread)
|
||||
LogStash::Util.thread_info(thread)
|
||||
end
|
||||
end; end
|
||||
end end
|
114
logstash-core/lib/logstash/pipeline_reporter.rb
Normal file
114
logstash-core/lib/logstash/pipeline_reporter.rb
Normal file
|
@ -0,0 +1,114 @@
|
|||
# encoding: utf-8
|
||||
require 'ostruct'
|
||||
|
||||
module LogStash; class PipelineReporter
|
||||
attr_reader :logger, :pipeline
|
||||
|
||||
# This is an immutable copy of the pipeline state,
|
||||
# It is a proxy to a hash to allow us to add methods dynamically to the hash
|
||||
class Snapshot
|
||||
def initialize(data)
|
||||
@data = data
|
||||
end
|
||||
|
||||
def to_hash
|
||||
@data
|
||||
end
|
||||
|
||||
def to_simple_hash
|
||||
{"inflight_count" => inflight_count, "stalling_thread_info" => format_threads_by_plugin}
|
||||
end
|
||||
|
||||
def to_str
|
||||
to_simple_hash.to_s
|
||||
end
|
||||
alias_method :to_s, :to_str
|
||||
|
||||
def method_missing(meth)
|
||||
@data[meth]
|
||||
end
|
||||
|
||||
def format_threads_by_plugin
|
||||
stalled_plugins = {}
|
||||
stalling_threads_info.each do |thr|
|
||||
key = (thr.delete("plugin") || "other")
|
||||
stalled_plugins[key] ||= []
|
||||
stalled_plugins[key] << thr
|
||||
end
|
||||
stalled_plugins
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(logger,pipeline)
|
||||
@logger = logger
|
||||
@pipeline = pipeline
|
||||
end
|
||||
|
||||
# The main way of accessing data from the reporter,,
|
||||
# this provides a (more or less) consistent snapshot of what's going on in the
|
||||
# pipeline with some extra decoration
|
||||
def snapshot
|
||||
Snapshot.new(self.to_hash)
|
||||
end
|
||||
|
||||
def to_hash
|
||||
pipeline.inflight_batches_synchronize do |batch_map|
|
||||
worker_states_snap = worker_states(batch_map) # We only want to run this once
|
||||
inflight_count = worker_states_snap.map {|s| s[:inflight_count] }.reduce(0, :+)
|
||||
|
||||
{
|
||||
:events_filtered => events_filtered,
|
||||
:events_consumed => events_consumed,
|
||||
:worker_count => pipeline.worker_threads.size,
|
||||
:inflight_count => inflight_count,
|
||||
:worker_states => worker_states_snap,
|
||||
:output_info => output_info,
|
||||
:thread_info => pipeline.plugin_threads_info,
|
||||
:stalling_threads_info => pipeline.stalling_threads_info
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def events_filtered
|
||||
pipeline.events_filtered.value
|
||||
end
|
||||
|
||||
def events_consumed
|
||||
pipeline.events_consumed.value
|
||||
end
|
||||
|
||||
def plugin_threads
|
||||
pipeline.plugin_threads
|
||||
end
|
||||
|
||||
# Not threadsafe! must be called within an `inflight_batches_synchronize` block
|
||||
def worker_states(batch_map)
|
||||
pipeline.worker_threads.map.with_index do |thread,idx|
|
||||
status = thread.status || "dead"
|
||||
inflight_count = batch_map[thread] ? batch_map[thread].size : 0
|
||||
{
|
||||
:status => status,
|
||||
:alive => thread.alive?,
|
||||
:index => idx,
|
||||
:inflight_count => inflight_count
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
def output_info
|
||||
pipeline.outputs.map do |output_delegator|
|
||||
is_multi_worker = output_delegator.worker_count > 1
|
||||
|
||||
{
|
||||
:type => output_delegator.config_name,
|
||||
:config => output_delegator.config,
|
||||
:is_multi_worker => is_multi_worker,
|
||||
:events_received => output_delegator.events_received,
|
||||
:workers => output_delegator.workers,
|
||||
:busy_workers => output_delegator.busy_workers
|
||||
}
|
||||
end
|
||||
end
|
||||
end end
|
|
@ -26,10 +26,20 @@ class LogStash::Runner < Clamp::Command
|
|||
:default_input => DEFAULT_INPUT, :default_output => DEFAULT_OUTPUT),
|
||||
:default => "", :attribute_name => :config_string
|
||||
|
||||
option ["-w", "--filterworkers"], "COUNT",
|
||||
I18n.t("logstash.runner.flag.filterworkers"),
|
||||
:attribute_name => :filter_workers,
|
||||
:default => LogStash::Config::CpuCoreStrategy.fifty_percent, &:to_i
|
||||
option ["-w", "--pipeline-workers"], "COUNT",
|
||||
I18n.t("logstash.runner.flag.pipeline-workers"),
|
||||
:attribute_name => :pipeline_workers,
|
||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers], &:to_i
|
||||
|
||||
option ["-b", "--pipeline-batch-size"], "SIZE",
|
||||
I18n.t("logstash.runner.flag.pipeline-batch-size"),
|
||||
:attribute_name => :pipeline_batch_size,
|
||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size], &:to_i
|
||||
|
||||
option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS",
|
||||
I18n.t("logstash.runner.flag.pipeline-batch-delay"),
|
||||
:attribute_name => :pipeline_batch_delay,
|
||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay], &:to_i
|
||||
|
||||
option ["-l", "--log"], "FILE",
|
||||
I18n.t("logstash.runner.flag.log"),
|
||||
|
@ -110,7 +120,12 @@ class LogStash::Runner < Clamp::Command
|
|||
|
||||
config_string = format_config(@config_path, @config_string)
|
||||
|
||||
@agent.add_pipeline("base", config_string, :filter_workers => filter_workers)
|
||||
pipeline_settings = {
|
||||
:pipeline_workers => pipeline_workers,
|
||||
:pipeline_batch_size => pipeline_batch_size,
|
||||
:pipeline_batch_delay => pipeline_batch_delay
|
||||
}
|
||||
@agent.add_pipeline("base", config_string, pipeline_settings)
|
||||
|
||||
if config_test?
|
||||
puts "Configuration OK"
|
||||
|
|
|
@ -47,10 +47,10 @@ module LogStash
|
|||
cycle_number = 0
|
||||
stalled_count = 0
|
||||
Stud.interval(@cycle_period) do
|
||||
@reports << Report.from_pipeline(@pipeline)
|
||||
@reports << pipeline_report_snapshot
|
||||
@reports.delete_at(0) if @reports.size > @report_every # expire old report
|
||||
if cycle_number == (@report_every - 1) # it's report time!
|
||||
logger.warn(@reports.last.to_hash)
|
||||
logger.warn(@reports.last)
|
||||
|
||||
if shutdown_stalled?
|
||||
logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0
|
||||
|
@ -69,6 +69,10 @@ module LogStash
|
|||
end
|
||||
end
|
||||
|
||||
def pipeline_report_snapshot
|
||||
@pipeline.reporter.snapshot
|
||||
end
|
||||
|
||||
# A pipeline shutdown is stalled if
|
||||
# * at least REPORT_EVERY reports have been created
|
||||
# * the inflight event count is in monotonically increasing
|
||||
|
@ -78,7 +82,7 @@ module LogStash
|
|||
return false unless @reports.size == @report_every #
|
||||
# is stalled if inflight count is either constant or increasing
|
||||
stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report|
|
||||
prev_report.inflight_count["total"] <= next_report.inflight_count["total"]
|
||||
prev_report.inflight_count <= next_report.inflight_count
|
||||
end
|
||||
if stalled_event_count
|
||||
@reports.each_cons(2).all? do |prev_report, next_report|
|
||||
|
@ -93,35 +97,4 @@ module LogStash
|
|||
exit(-1)
|
||||
end
|
||||
end
|
||||
|
||||
class Report
|
||||
|
||||
attr_reader :inflight_count, :stalling_threads
|
||||
|
||||
def self.from_pipeline(pipeline)
|
||||
new(pipeline.inflight_count, pipeline.stalling_threads)
|
||||
end
|
||||
|
||||
def initialize(inflight_count, stalling_threads)
|
||||
@inflight_count = inflight_count
|
||||
@stalling_threads = format_threads_by_plugin(stalling_threads)
|
||||
end
|
||||
|
||||
def to_hash
|
||||
{
|
||||
"INFLIGHT_EVENT_COUNT" => @inflight_count,
|
||||
"STALLING_THREADS" => @stalling_threads
|
||||
}
|
||||
end
|
||||
|
||||
def format_threads_by_plugin(stalling_threads)
|
||||
stalled_plugins = {}
|
||||
stalling_threads.each do |thr|
|
||||
key = (thr.delete("plugin") || "other")
|
||||
stalled_plugins[key] ||= []
|
||||
stalled_plugins[key] << thr
|
||||
end
|
||||
stalled_plugins
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -6,8 +6,8 @@ require "logstash/util"
|
|||
module LogStash module Util class WorkerThreadsDefaultPrinter
|
||||
|
||||
def initialize(settings)
|
||||
@setting = settings.fetch('filter-workers', 0)
|
||||
@default = settings.fetch('default-filter-workers', 0)
|
||||
@setting = settings.fetch('pipeline-workers', 0)
|
||||
@default = settings.fetch('default-pipeline-workers', 0)
|
||||
end
|
||||
|
||||
def visit(collector)
|
||||
|
@ -17,12 +17,12 @@ module LogStash module Util class WorkerThreadsDefaultPrinter
|
|||
|
||||
def visit_setting(collector)
|
||||
return if @setting == 0
|
||||
collector.push("User set filter workers: #{@setting}")
|
||||
collector.push("User set pipeline workers: #{@setting}")
|
||||
end
|
||||
|
||||
def visit_default(collector)
|
||||
return if @default == 0
|
||||
collector.push "Default filter workers: #{@default}"
|
||||
collector.push "Default pipeline workers: #{@default}"
|
||||
end
|
||||
|
||||
end end end
|
||||
|
|
27
logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb
Normal file
27
logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb
Normal file
|
@ -0,0 +1,27 @@
|
|||
# encoding: utf-8
|
||||
|
||||
module LogStash; module Util
|
||||
class WrappedSynchronousQueue
|
||||
java_import java.util.concurrent.SynchronousQueue
|
||||
java_import java.util.concurrent.TimeUnit
|
||||
|
||||
def initialize()
|
||||
@queue = java.util.concurrent.SynchronousQueue.new()
|
||||
end
|
||||
|
||||
def push(obj)
|
||||
@queue.put(obj)
|
||||
end
|
||||
alias_method(:<<, :push)
|
||||
|
||||
# Blocking
|
||||
def take
|
||||
@queue.take()
|
||||
end
|
||||
|
||||
# Block for X millis
|
||||
def poll(millis)
|
||||
@queue.poll(millis, TimeUnit::MILLISECONDS)
|
||||
end
|
||||
end
|
||||
end end
|
|
@ -164,8 +164,13 @@ en:
|
|||
the empty string for the '-e' flag.
|
||||
configtest: |+
|
||||
Check configuration for valid syntax and then exit.
|
||||
filterworkers: |+
|
||||
Sets the number of filter workers to run.
|
||||
pipeline-workers: |+
|
||||
Sets the number of pipeline workers to run.
|
||||
pipeline-batch-size: |+
|
||||
Size of batches the pipeline is to work in.
|
||||
pipeline-batch-delay: |+
|
||||
When creating pipeline batches, how long to wait while polling
|
||||
for the next event.
|
||||
log: |+
|
||||
Write logstash internal logs to the given
|
||||
file. Without this flag, logstash will emit
|
||||
|
|
49
logstash-core/spec/logstash/output_delegator_spec.rb
Normal file
49
logstash-core/spec/logstash/output_delegator_spec.rb
Normal file
|
@ -0,0 +1,49 @@
|
|||
# encoding: utf-8
|
||||
require 'spec_helper'
|
||||
|
||||
|
||||
|
||||
describe LogStash::OutputDelegator do
|
||||
let(:logger) { double("logger") }
|
||||
let(:out_klass) { double("output klass") }
|
||||
let(:out_inst) { double("output instance") }
|
||||
|
||||
subject { described_class.new(logger, out_klass) }
|
||||
|
||||
before do
|
||||
allow(out_klass).to receive(:new).with(any_args).and_return(out_inst)
|
||||
allow(out_inst).to receive(:register)
|
||||
allow(logger).to receive(:debug).with(any_args)
|
||||
end
|
||||
|
||||
it "should initialize cleanly" do
|
||||
expect { subject }.not_to raise_error
|
||||
end
|
||||
|
||||
context "after having received a batch of events" do
|
||||
let(:events) { 7.times.map { LogStash::Event.new }}
|
||||
|
||||
before do
|
||||
allow(out_inst).to receive(:multi_receive)
|
||||
subject.multi_receive(events)
|
||||
end
|
||||
|
||||
it "should pass the events through" do
|
||||
expect(out_inst).to have_received(:multi_receive).with(events)
|
||||
end
|
||||
|
||||
it "should increment the number of events received" do
|
||||
expect(subject.events_received).to eql(events.length)
|
||||
end
|
||||
end
|
||||
|
||||
it "should register all workers on register" do
|
||||
expect(out_inst).to receive(:register)
|
||||
subject.register
|
||||
end
|
||||
|
||||
it "should close all workers when closing" do
|
||||
expect(out_inst).to receive(:do_close)
|
||||
subject.do_close
|
||||
end
|
||||
end
|
|
@ -15,12 +15,13 @@ class LogStash::Outputs::NOOP < LogStash::Outputs::Base
|
|||
end
|
||||
end
|
||||
|
||||
describe "LogStash::Outputs::Base#worker_setup" do
|
||||
it "should create workers using original parameters except workers = 1" do
|
||||
describe "LogStash::Outputs::Base#new" do
|
||||
it "should instantiate cleanly" do
|
||||
params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 }
|
||||
worker_params = params.dup; worker_params["workers"] = 1
|
||||
output = LogStash::Outputs::NOOP.new(params.dup)
|
||||
expect(LogStash::Outputs::NOOP).to receive(:new).twice.with(worker_params).and_call_original
|
||||
output.worker_setup
|
||||
|
||||
expect do
|
||||
LogStash::Outputs::NOOP.new(params.dup)
|
||||
end.not_to raise_error
|
||||
end
|
||||
end
|
||||
|
|
85
logstash-core/spec/logstash/pipeline_reporter_spec.rb
Normal file
85
logstash-core/spec/logstash/pipeline_reporter_spec.rb
Normal file
|
@ -0,0 +1,85 @@
|
|||
# encoding: utf-8
|
||||
require "spec_helper"
|
||||
require "logstash/pipeline"
|
||||
require "logstash/pipeline_reporter"
|
||||
|
||||
class DummyOutput < LogStash::Outputs::Base
|
||||
config_name "dummyoutput"
|
||||
milestone 2
|
||||
|
||||
attr_reader :num_closes, :events
|
||||
|
||||
def initialize(params={})
|
||||
super
|
||||
@num_closes = 0
|
||||
@events = []
|
||||
end
|
||||
|
||||
def register
|
||||
end
|
||||
|
||||
def receive(event)
|
||||
@events << event
|
||||
end
|
||||
|
||||
def close
|
||||
@num_closes += 1
|
||||
end
|
||||
end
|
||||
|
||||
#TODO: Figure out how to add more tests that actually cover inflight events
|
||||
#This will require some janky multithreading stuff
|
||||
describe LogStash::PipelineReporter do
|
||||
let(:generator_count) { 5 }
|
||||
let(:config) do
|
||||
"input { generator { count => #{generator_count} } } output { dummyoutput {} } "
|
||||
end
|
||||
let(:pipeline) { LogStash::Pipeline.new(config)}
|
||||
let(:reporter) { pipeline.reporter }
|
||||
|
||||
before do
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_call_original
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_call_original
|
||||
|
||||
@pre_snapshot = reporter.snapshot
|
||||
pipeline.run
|
||||
@post_snapshot = reporter.snapshot
|
||||
end
|
||||
|
||||
describe "events filtered" do
|
||||
it "should start at zero" do
|
||||
expect(@pre_snapshot.events_filtered).to eql(0)
|
||||
end
|
||||
|
||||
it "should end at the number of generated events" do
|
||||
expect(@post_snapshot.events_filtered).to eql(generator_count)
|
||||
end
|
||||
end
|
||||
|
||||
describe "events consumed" do
|
||||
it "should start at zero" do
|
||||
expect(@pre_snapshot.events_consumed).to eql(0)
|
||||
end
|
||||
|
||||
it "should end at the number of generated events" do
|
||||
expect(@post_snapshot.events_consumed).to eql(generator_count)
|
||||
end
|
||||
end
|
||||
|
||||
describe "inflight count" do
|
||||
it "should be zero before running" do
|
||||
expect(@pre_snapshot.inflight_count).to eql(0)
|
||||
end
|
||||
|
||||
it "should be zero after running" do
|
||||
expect(@post_snapshot.inflight_count).to eql(0)
|
||||
end
|
||||
end
|
||||
|
||||
describe "output states" do
|
||||
it "should include the count of received events" do
|
||||
expect(@post_snapshot.output_info.first[:events_received]).to eql(generator_count)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,5 +1,7 @@
|
|||
# encoding: utf-8
|
||||
require "spec_helper"
|
||||
require "logstash/inputs/generator"
|
||||
require "logstash/filters/multiline"
|
||||
|
||||
class DummyInput < LogStash::Inputs::Base
|
||||
config_name "dummyinput"
|
||||
|
@ -35,17 +37,19 @@ class DummyOutput < LogStash::Outputs::Base
|
|||
config_name "dummyoutput"
|
||||
milestone 2
|
||||
|
||||
attr_reader :num_closes
|
||||
attr_reader :num_closes, :events
|
||||
|
||||
def initialize(params={})
|
||||
super
|
||||
@num_closes = 0
|
||||
@events = []
|
||||
end
|
||||
|
||||
def register
|
||||
end
|
||||
|
||||
def receive(event)
|
||||
@events << event
|
||||
end
|
||||
|
||||
def close
|
||||
|
@ -80,22 +84,21 @@ class DummySafeFilter < LogStash::Filters::Base
|
|||
end
|
||||
|
||||
class TestPipeline < LogStash::Pipeline
|
||||
attr_reader :outputs, :filter_threads, :settings, :logger
|
||||
attr_reader :outputs, :settings, :logger
|
||||
end
|
||||
|
||||
describe LogStash::Pipeline do
|
||||
let(:worker_thread_count) { 8 }
|
||||
let(:worker_thread_count) { LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] }
|
||||
let(:safe_thread_count) { 1 }
|
||||
let(:override_thread_count) { 42 }
|
||||
|
||||
describe "defaulting the filter workers based on thread safety" do
|
||||
describe "defaulting the pipeline workers based on thread safety" do
|
||||
before(:each) do
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummysafefilter").and_return(DummySafeFilter)
|
||||
allow(LogStash::Config::CpuCoreStrategy).to receive(:fifty_percent).and_return(worker_thread_count)
|
||||
end
|
||||
|
||||
context "when there are some not threadsafe filters" do
|
||||
|
@ -117,13 +120,13 @@ describe LogStash::Pipeline do
|
|||
|
||||
context "when there is no command line -w N set" do
|
||||
it "starts one filter thread" do
|
||||
msg = "Defaulting filter worker threads to 1 because there are some" +
|
||||
msg = "Defaulting pipeline worker threads to 1 because there are some" +
|
||||
" filters that might not work with multiple worker threads"
|
||||
pipeline = TestPipeline.new(test_config_with_filters)
|
||||
expect(pipeline.logger).to receive(:warn).with(msg,
|
||||
{:count_was=>worker_thread_count, :filters=>["dummyfilter"]})
|
||||
pipeline.run
|
||||
expect(pipeline.filter_threads.size).to eq(safe_thread_count)
|
||||
expect(pipeline.worker_threads.size).to eq(safe_thread_count)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -134,9 +137,9 @@ describe LogStash::Pipeline do
|
|||
pipeline = TestPipeline.new(test_config_with_filters)
|
||||
expect(pipeline.logger).to receive(:warn).with(msg,
|
||||
{:worker_threads=> override_thread_count, :filters=>["dummyfilter"]})
|
||||
pipeline.configure("filter-workers", override_thread_count)
|
||||
pipeline.configure(:pipeline_workers, override_thread_count)
|
||||
pipeline.run
|
||||
expect(pipeline.filter_threads.size).to eq(override_thread_count)
|
||||
expect(pipeline.worker_threads.size).to eq(override_thread_count)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -161,7 +164,7 @@ describe LogStash::Pipeline do
|
|||
it "starts multiple filter threads" do
|
||||
pipeline = TestPipeline.new(test_config_with_filters)
|
||||
pipeline.run
|
||||
expect(pipeline.filter_threads.size).to eq(worker_thread_count)
|
||||
expect(pipeline.worker_threads.size).to eq(worker_thread_count)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -206,8 +209,8 @@ describe LogStash::Pipeline do
|
|||
pipeline.run
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.worker_plugins.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.worker_plugins.first.num_closes ).to eq(1)
|
||||
expect(pipeline.outputs.first.workers.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.workers.first.num_closes ).to eq(1)
|
||||
end
|
||||
|
||||
it "should call output close correctly with output workers" do
|
||||
|
@ -215,8 +218,13 @@ describe LogStash::Pipeline do
|
|||
pipeline.run
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.num_closes).to eq(0)
|
||||
pipeline.outputs.first.worker_plugins.each do |plugin|
|
||||
# We even close the parent output worker, even though it doesn't receive messages
|
||||
|
||||
output_delegator = pipeline.outputs.first
|
||||
output = output_delegator.workers.first
|
||||
|
||||
expect(output.num_closes).to eq(1)
|
||||
output_delegator.workers.each do |plugin|
|
||||
expect(plugin.num_closes ).to eq(1)
|
||||
end
|
||||
end
|
||||
|
@ -299,4 +307,46 @@ describe LogStash::Pipeline do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "Periodic Flush" do
|
||||
let(:number_of_events) { 100 }
|
||||
let(:config) do
|
||||
<<-EOS
|
||||
input {
|
||||
generator {
|
||||
count => #{number_of_events}
|
||||
}
|
||||
}
|
||||
filter {
|
||||
multiline {
|
||||
pattern => "^NeverMatch"
|
||||
negate => true
|
||||
what => "previous"
|
||||
}
|
||||
}
|
||||
output {
|
||||
dummyoutput {}
|
||||
}
|
||||
EOS
|
||||
end
|
||||
let(:output) { DummyOutput.new }
|
||||
|
||||
before do
|
||||
allow(DummyOutput).to receive(:new).with(any_args).and_return(output)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
|
||||
end
|
||||
|
||||
it "flushes the buffered contents of the filter" do
|
||||
Thread.abort_on_exception = true
|
||||
pipeline = LogStash::Pipeline.new(config, { :flush_interval => 1 })
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
# give us a bit of time to flush the events
|
||||
wait(5).for { output.events.first["message"].split("\n").count }.to eq(number_of_events)
|
||||
pipeline.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,10 +8,16 @@ describe LogStash::ShutdownController do
|
|||
let(:check_threshold) { 100 }
|
||||
subject { LogStash::ShutdownController.new(pipeline, check_every) }
|
||||
let(:pipeline) { double("pipeline") }
|
||||
let(:reporter) { double("reporter") }
|
||||
let(:reporter_snapshot) { double("reporter snapshot") }
|
||||
report_count = 0
|
||||
|
||||
before :each do
|
||||
allow(LogStash::Report).to receive(:from_pipeline).and_wrap_original do |m, *args|
|
||||
allow(pipeline).to receive(:reporter).and_return(reporter)
|
||||
allow(reporter).to receive(:snapshot).and_return(reporter_snapshot)
|
||||
allow(reporter_snapshot).to receive(:o_simple_hash).and_return({})
|
||||
|
||||
allow(subject).to receive(:pipeline_report_snapshot).and_wrap_original do |m, *args|
|
||||
report_count += 1
|
||||
m.call(*args)
|
||||
end
|
||||
|
@ -22,10 +28,10 @@ describe LogStash::ShutdownController do
|
|||
end
|
||||
|
||||
context "when pipeline is stalled" do
|
||||
let(:increasing_count) { (1..5000).to_a.map {|i| { "total" => i } } }
|
||||
let(:increasing_count) { (1..5000).to_a }
|
||||
before :each do
|
||||
allow(pipeline).to receive(:inflight_count).and_return(*increasing_count)
|
||||
allow(pipeline).to receive(:stalling_threads) { { } }
|
||||
allow(reporter_snapshot).to receive(:inflight_count).and_return(*increasing_count)
|
||||
allow(reporter_snapshot).to receive(:stalling_threads) { { } }
|
||||
end
|
||||
|
||||
describe ".unsafe_shutdown = true" do
|
||||
|
@ -49,7 +55,7 @@ describe LogStash::ShutdownController do
|
|||
|
||||
it "should do exactly \"abort_threshold\"*\"report_every\" stall checks" do
|
||||
allow(subject).to receive(:force_exit)
|
||||
expect(LogStash::Report).to receive(:from_pipeline).exactly(abort_threshold*report_every).times.and_call_original
|
||||
expect(subject).to receive(:pipeline_report_snapshot).exactly(abort_threshold*report_every).times.and_call_original
|
||||
subject.start
|
||||
end
|
||||
end
|
||||
|
@ -70,10 +76,10 @@ describe LogStash::ShutdownController do
|
|||
end
|
||||
|
||||
context "when pipeline is not stalled" do
|
||||
let(:decreasing_count) { (1..5000).to_a.reverse.map {|i| { "total" => i } } }
|
||||
let(:decreasing_count) { (1..5000).to_a.reverse }
|
||||
before :each do
|
||||
allow(pipeline).to receive(:inflight_count).and_return(*decreasing_count)
|
||||
allow(pipeline).to receive(:stalling_threads) { { } }
|
||||
allow(reporter_snapshot).to receive(:inflight_count).and_return(*decreasing_count)
|
||||
allow(reporter_snapshot).to receive(:stalling_threads) { { } }
|
||||
end
|
||||
|
||||
describe ".unsafe_shutdown = true" do
|
||||
|
|
|
@ -10,7 +10,7 @@ describe LogStash::Util::DefaultsPrinter do
|
|||
end
|
||||
|
||||
let(:workers) { 1 }
|
||||
let(:expected) { "Settings: User set filter workers: #{workers}" }
|
||||
let(:expected) { "Settings: User set pipeline workers: #{workers}" }
|
||||
let(:settings) { {} }
|
||||
|
||||
describe 'class methods API' do
|
||||
|
@ -24,8 +24,8 @@ describe LogStash::Util::DefaultsPrinter do
|
|||
end
|
||||
|
||||
context 'when the settings hash has content' do
|
||||
let(:workers) { 42 }
|
||||
let(:settings) { {'filter-workers' => workers} }
|
||||
let(:worker_queue) { 42 }
|
||||
let(:settings) { {'pipeline-workers' => workers} }
|
||||
it_behaves_like "a defaults printer"
|
||||
end
|
||||
end
|
||||
|
@ -42,7 +42,7 @@ describe LogStash::Util::DefaultsPrinter do
|
|||
|
||||
context 'when the settings hash has content' do
|
||||
let(:workers) { 13 }
|
||||
let(:settings) { {'filter-workers' => workers} }
|
||||
let(:settings) { {'pipeline-workers' => workers} }
|
||||
|
||||
it_behaves_like "a defaults printer"
|
||||
end
|
||||
|
|
|
@ -19,26 +19,26 @@ describe LogStash::Util::WorkerThreadsDefaultPrinter do
|
|||
end
|
||||
|
||||
context 'when the settings hash has both user and default content' do
|
||||
let(:settings) { {'filter-workers' => 42, 'default-filter-workers' => 5} }
|
||||
let(:settings) { {'pipeline-workers' => 42, 'default-pipeline-workers' => 5} }
|
||||
|
||||
it 'adds two strings' do
|
||||
expect(collector).to eq(["User set filter workers: 42", "Default filter workers: 5"])
|
||||
expect(collector).to eq(["User set pipeline workers: 42", "Default pipeline workers: 5"])
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the settings hash has only user content' do
|
||||
let(:settings) { {'filter-workers' => 42} }
|
||||
let(:settings) { {'pipeline-workers' => 42} }
|
||||
|
||||
it 'adds a string with user set filter workers' do
|
||||
expect(collector.first).to eq("User set filter workers: 42")
|
||||
it 'adds a string with user set pipeline workers' do
|
||||
expect(collector.first).to eq("User set pipeline workers: 42")
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the settings hash has only default content' do
|
||||
let(:settings) { {'default-filter-workers' => 5} }
|
||||
let(:settings) { {'default-pipeline-workers' => 5} }
|
||||
|
||||
it 'adds a string with default filter workers' do
|
||||
expect(collector.first).to eq("Default filter workers: 5")
|
||||
it 'adds a string with default pipeline workers' do
|
||||
expect(collector.first).to eq("Default pipeline workers: 5")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue