base SignalEvent class and cleanup LogStash namespace

This commit is contained in:
Colin Surprenant 2016-08-23 17:12:19 -04:00
parent 2c25592fd5
commit 2002bace3c
5 changed files with 71 additions and 31 deletions

View file

@ -10,14 +10,26 @@ require "cabin"
# in the future it might be necessary to refactor using like a BaseEvent
# class to have a common interface for all pileline events to support
# eventual queueing persistence for example, TBD.
class LogStash::ShutdownEvent; end
class LogStash::FlushEvent; end
module LogStash
FLUSH = LogStash::FlushEvent.new
class SignalEvent
def flush?; raise "abstract method"; end;
def shutdown?; raise "abstract method"; end;
end
class ShutdownEvent < SignalEvent
def flush?; false; end;
def shutdown?; true; end;
end
class FlushEvent < SignalEvent
def flush?; true; end;
def shutdown?; false; end;
end
FLUSH = FlushEvent.new
# LogStash::SHUTDOWN is used by plugins
SHUTDOWN = LogStash::ShutdownEvent.new
SHUTDOWN = ShutdownEvent.new
end
# for backward compatibility, require "logstash/event" is used a lots of places so let's bootstrap the

View file

@ -13,14 +13,27 @@ require "logstash/string_interpolation"
# in the future it might be necessary to refactor using like a BaseEvent
# class to have a common interface for all pileline events to support
# eventual queueing persistence for example, TBD.
class LogStash::ShutdownEvent; end
class LogStash::FlushEvent; end
module LogStash
FLUSH = LogStash::FlushEvent.new
class SignalEvent
def flush?; raise "abstract method"; end;
def shutdown?; raise "abstract method"; end;
end
class ShutdownEvent < SignalEvent
def flush?; false; end;
def shutdown?; true; end;
end
class FlushEvent < SignalEvent
def flush?; true; end;
def shutdown?; false; end;
end
FLUSH = FlushEvent.new
# LogStash::SHUTDOWN is used by plugins
SHUTDOWN = LogStash::ShutdownEvent.new
SHUTDOWN = ShutdownEvent.new
end
# the logstash event object.

View file

@ -535,12 +535,27 @@ describe LogStash::Event do
end
context "signal events" do
it "should define the shutdown event" do
it "should define the shutdown and flush event constants" do
# the SHUTDOWN and FLUSH constants are part of the plugin API contract
# if they are changed, all plugins must be updated
expect(LogStash::SHUTDOWN).to be_a(LogStash::ShutdownEvent)
expect(LogStash::FLUSH).to be_a(LogStash::FlushEvent)
end
it "should define the shutdown event with SignalEvent as parent class" do
expect(LogStash::SHUTDOWN).to be_a(LogStash::SignalEvent)
expect(LogStash::FLUSH).to be_a(LogStash::SignalEvent)
end
it "should define the flush? method" do
expect(LogStash::SHUTDOWN.flush?).to be_falsey
expect(LogStash::FLUSH.flush?).to be_truthy
end
it "should define the shutdown? method" do
expect(LogStash::SHUTDOWN.shutdown?).to be_truthy
expect(LogStash::FLUSH.shutdown?).to be_falsey
end
end
end

View file

@ -44,7 +44,7 @@ module LogStash; class Pipeline
"LogStash::Inputs::Stdin"
]
def initialize(config_str, settings = LogStash::SETTINGS, namespaced_metric = nil)
def initialize(config_str, settings = SETTINGS, namespaced_metric = nil)
@config_str = config_str
@config_hash = Digest::SHA1.hexdigest(@config_str)
# Every time #plugin is invoked this is incremented to give each plugin
@ -54,7 +54,7 @@ module LogStash; class Pipeline
@logger = Cabin::Channel.get(LogStash)
@settings = settings
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
@reporter = LogStash::PipelineReporter.new(@logger, self)
@reporter = PipelineReporter.new(@logger, self)
# A list of plugins indexed by id
@plugins_by_id = {}
@ -66,12 +66,12 @@ module LogStash; class Pipeline
# This needs to be configured before we evaluate the code to make
# sure the metric instance is correctly send to the plugins to make the namespace scoping work
@metric = namespaced_metric.nil? ? LogStash::Instrument::NullMetric.new : namespaced_metric
@metric = namespaced_metric.nil? ? Instrument::NullMetric.new : namespaced_metric
grammar = LogStashConfigParser.new
@config = grammar.parse(config_str)
if @config.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
raise ConfigurationError, grammar.failure_reason
end
# This will compile the config to ruby and evaluate the resulting code.
# The code will initialize all the plugins and define the
@ -92,7 +92,7 @@ module LogStash; class Pipeline
raise
end
queue = LogStash::Util::WrappedSynchronousQueue.new
queue = Util::WrappedSynchronousQueue.new
@input_queue_client = queue.write_client
@filter_queue_client = queue.read_client
# Note that @inflight_batches as a central mechanism for tracking inflight
@ -148,7 +148,7 @@ module LogStash; class Pipeline
@started_at = Time.now
@thread = Thread.current
LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
start_workers
@ -221,7 +221,7 @@ module LogStash; class Pipeline
pipeline_workers.times do |t|
@worker_threads << Thread.new do
LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
worker_loop(batch_size, batch_delay)
end
end
@ -256,7 +256,7 @@ module LogStash; class Pipeline
def filter_batch(batch)
batch.each do |event|
if event.is_a?(LogStash::Event)
if event.is_a?(Event)
filtered = filter_func(event)
filtered.each do |e|
#these are both original and generated events
@ -331,7 +331,7 @@ module LogStash; class Pipeline
end
def inputworker(plugin)
LogStash::Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
begin
plugin.run(@input_queue_client)
rescue => e
@ -387,7 +387,7 @@ module LogStash; class Pipeline
# Each worker thread will receive this exactly once!
@worker_threads.each do |t|
@logger.debug("Pushing shutdown", :thread => t.inspect)
@input_queue_client.push(LogStash::SHUTDOWN)
@input_queue_client.push(SHUTDOWN)
end
@worker_threads.each do |t|
@ -411,20 +411,20 @@ module LogStash; class Pipeline
args["id"]
end
raise LogStash::ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
klass = LogStash::Plugin.lookup(plugin_type, name)
klass = Plugin.lookup(plugin_type, name)
# Scope plugins of type 'input' to 'inputs'
type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)
plugin = if plugin_type == "output"
OutputDelegator.new(@logger, klass, type_scoped_metric,
::LogStash::OutputDelegatorStrategyRegistry.instance,
OutputDelegatorStrategyRegistry.instance,
args)
elsif plugin_type == "filter"
LogStash::FilterDelegator.new(@logger, klass, type_scoped_metric, args)
FilterDelegator.new(@logger, klass, type_scoped_metric, args)
else # input
input_plugin = klass.new(args)
input_plugin.metric = type_scoped_metric.namespace(id)
@ -472,7 +472,7 @@ module LogStash; class Pipeline
def flush
if @flushing.compare_and_set(false, true)
@logger.debug? && @logger.debug("Pushing flush onto pipeline")
@input_queue_client.push(LogStash::FLUSH)
@input_queue_client.push(FLUSH)
end
end
@ -506,7 +506,7 @@ module LogStash; class Pipeline
def plugin_threads_info
input_threads = @input_threads.select {|t| t.alive? }
worker_threads = @worker_threads.select {|t| t.alive? }
(input_threads + worker_threads).map {|t| LogStash::Util.thread_info(t) }
(input_threads + worker_threads).map {|t| Util.thread_info(t) }
end
def stalling_threads_info

View file

@ -218,16 +218,16 @@ module LogStash; module Util
if event.nil?
# queue poll timed out
next
elsif event == LogStash::SHUTDOWN
elsif event.is_a?(LogStash::SignalEvent)
# We MUST break here. If a batch consumes two SHUTDOWN events
# then another worker may have its SHUTDOWN 'stolen', thus blocking
# the pipeline.
@shutdown_signal_received = true
break
elsif event == LogStash::FLUSH
@shutdown_signal_received = event.shutdown?
# See comment above
# We should stop doing work after flush as well.
@flush_signal_received = true
@flush_signal_received = event.flush?
break
else
@originals[event] = true