mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
fix agent silent exit upon pipelines reloading (#10346)
This commit is contained in:
parent
38e5e53def
commit
f08b8c5076
26 changed files with 774 additions and 283 deletions
|
@ -8,6 +8,7 @@ require "logstash/webserver"
|
||||||
require "logstash/config/source_loader"
|
require "logstash/config/source_loader"
|
||||||
require "logstash/pipeline_action"
|
require "logstash/pipeline_action"
|
||||||
require "logstash/state_resolver"
|
require "logstash/state_resolver"
|
||||||
|
require "logstash/pipelines_registry"
|
||||||
require "stud/trap"
|
require "stud/trap"
|
||||||
require "uri"
|
require "uri"
|
||||||
require "socket"
|
require "socket"
|
||||||
|
@ -19,7 +20,7 @@ class LogStash::Agent
|
||||||
include LogStash::Util::Loggable
|
include LogStash::Util::Loggable
|
||||||
STARTED_AT = Time.now.freeze
|
STARTED_AT = Time.now.freeze
|
||||||
|
|
||||||
attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipelines, :pipeline_bus
|
attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipeline_bus
|
||||||
attr_accessor :logger
|
attr_accessor :logger
|
||||||
|
|
||||||
# initialize method for LogStash::Agent
|
# initialize method for LogStash::Agent
|
||||||
|
@ -40,7 +41,7 @@ class LogStash::Agent
|
||||||
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
|
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
|
||||||
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
|
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
|
||||||
|
|
||||||
@pipelines = java.util.concurrent.ConcurrentHashMap.new();
|
@pipelines_registry = LogStash::PipelinesRegistry.new
|
||||||
|
|
||||||
@name = setting("node.name")
|
@name = setting("node.name")
|
||||||
@http_host = setting("http.host")
|
@http_host = setting("http.host")
|
||||||
|
@ -114,14 +115,17 @@ class LogStash::Agent
|
||||||
converge_state_and_update unless stopped?
|
converge_state_and_update unless stopped?
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
return 1 if clean_state?
|
# exit with error status if the initial converge_state_and_update did not create any pipeline
|
||||||
|
return 1 if @pipelines_registry.empty?
|
||||||
|
|
||||||
while !Stud.stop?
|
while !Stud.stop?
|
||||||
if clean_state? || running_user_defined_pipelines?
|
# exit if all pipelines are terminated and none are reloading
|
||||||
sleep(0.5)
|
break if no_pipeline?
|
||||||
else
|
|
||||||
break
|
# exit if there are no user defined pipelines (not system pipeline) and none are reloading
|
||||||
end
|
break if !running_user_defined_pipelines?
|
||||||
|
|
||||||
|
sleep(0.5)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -135,11 +139,11 @@ class LogStash::Agent
|
||||||
end
|
end
|
||||||
|
|
||||||
def running?
|
def running?
|
||||||
@running.value
|
@running.true?
|
||||||
end
|
end
|
||||||
|
|
||||||
def stopped?
|
def stopped?
|
||||||
!@running.value
|
@running.false?
|
||||||
end
|
end
|
||||||
|
|
||||||
def converge_state_and_update
|
def converge_state_and_update
|
||||||
|
@ -233,43 +237,48 @@ class LogStash::Agent
|
||||||
@id_path ||= ::File.join(settings.get("path.data"), "uuid")
|
@id_path ||= ::File.join(settings.get("path.data"), "uuid")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
#
|
||||||
|
# Backward compatibility proxies to the PipelineRegistry
|
||||||
|
#
|
||||||
|
|
||||||
def get_pipeline(pipeline_id)
|
def get_pipeline(pipeline_id)
|
||||||
pipelines.get(pipeline_id)
|
@pipelines_registry.get_pipeline(pipeline_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
def pipelines_count
|
def pipelines_count
|
||||||
pipelines.size
|
@pipelines_registry.size
|
||||||
end
|
end
|
||||||
|
|
||||||
def running_pipelines
|
def running_pipelines
|
||||||
pipelines.select {|id,pipeline| running_pipeline?(id) }
|
@pipelines_registry.running_pipelines
|
||||||
end
|
end
|
||||||
|
|
||||||
def non_running_pipelines
|
def non_running_pipelines
|
||||||
pipelines.select {|id,pipeline| !running_pipeline?(id) }
|
@pipelines_registry.non_running_pipelines
|
||||||
end
|
end
|
||||||
|
|
||||||
def running_pipelines?
|
def running_pipelines?
|
||||||
running_pipelines_count > 0
|
@pipelines_registry.running_pipelines.any?
|
||||||
end
|
end
|
||||||
|
|
||||||
def running_pipelines_count
|
def running_pipelines_count
|
||||||
running_pipelines.size
|
@pipelines_registry.running_pipelines.size
|
||||||
end
|
end
|
||||||
|
|
||||||
def running_user_defined_pipelines?
|
def running_user_defined_pipelines?
|
||||||
!running_user_defined_pipelines.empty?
|
@pipelines_registry.running_user_defined_pipelines.any?
|
||||||
end
|
end
|
||||||
|
|
||||||
def running_user_defined_pipelines
|
def running_user_defined_pipelines
|
||||||
pipelines.select {|id, pipeline| running_pipeline?(id) && !pipeline.system? }
|
@pipelines_registry.running_user_defined_pipelines
|
||||||
end
|
end
|
||||||
|
|
||||||
def with_running_user_defined_pipelines
|
def no_pipeline?
|
||||||
yield running_user_defined_pipelines
|
@pipelines_registry.running_pipelines.empty?
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def transition_to_stopped
|
def transition_to_stopped
|
||||||
@running.make_false
|
@running.make_false
|
||||||
end
|
end
|
||||||
|
@ -294,7 +303,7 @@ class LogStash::Agent
|
||||||
converge_result = LogStash::ConvergeResult.new(pipeline_actions.size)
|
converge_result = LogStash::ConvergeResult.new(pipeline_actions.size)
|
||||||
|
|
||||||
pipeline_actions.map do |action|
|
pipeline_actions.map do |action|
|
||||||
Thread.new do
|
Thread.new(action, converge_result) do |action, converge_result|
|
||||||
java.lang.Thread.currentThread().setName("Converge #{action}");
|
java.lang.Thread.currentThread().setName("Converge #{action}");
|
||||||
# We execute every task we need to converge the current state of pipelines
|
# We execute every task we need to converge the current state of pipelines
|
||||||
# for every task we will record the action result, that will help us
|
# for every task we will record the action result, that will help us
|
||||||
|
@ -310,34 +319,35 @@ class LogStash::Agent
|
||||||
# that we currently have.
|
# that we currently have.
|
||||||
begin
|
begin
|
||||||
logger.debug("Executing action", :action => action)
|
logger.debug("Executing action", :action => action)
|
||||||
action_result = action.execute(self, pipelines)
|
action_result = action.execute(self, @pipelines_registry)
|
||||||
converge_result.add(action, action_result)
|
converge_result.add(action, action_result)
|
||||||
|
|
||||||
unless action_result.successful?
|
unless action_result.successful?
|
||||||
logger.error("Failed to execute action", :id => action.pipeline_id,
|
logger.error("Failed to execute action",
|
||||||
:action_type => action_result.class, :message => action_result.message,
|
:id => action.pipeline_id,
|
||||||
:backtrace => action_result.backtrace)
|
:action_type => action_result.class,
|
||||||
|
:message => action_result.message,
|
||||||
|
:backtrace => action_result.backtrace
|
||||||
|
)
|
||||||
end
|
end
|
||||||
rescue SystemExit => e
|
rescue SystemExit, Exception => e
|
||||||
converge_result.add(action, e)
|
|
||||||
rescue Exception => e
|
|
||||||
logger.error("Failed to execute action", :action => action, :exception => e.class.name, :message => e.message, :backtrace => e.backtrace)
|
logger.error("Failed to execute action", :action => action, :exception => e.class.name, :message => e.message, :backtrace => e.backtrace)
|
||||||
converge_result.add(action, e)
|
converge_result.add(action, e)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end.each(&:join)
|
end.each(&:join)
|
||||||
|
|
||||||
if logger.trace?
|
logger.trace? && logger.trace("Converge results",
|
||||||
logger.trace("Converge results", :success => converge_result.success?,
|
:success => converge_result.success?,
|
||||||
:failed_actions => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" },
|
:failed_actions => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" },
|
||||||
:successful_actions => converge_result.successful_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}" })
|
:successful_actions => converge_result.successful_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}" }
|
||||||
end
|
)
|
||||||
|
|
||||||
converge_result
|
converge_result
|
||||||
end
|
end
|
||||||
|
|
||||||
def resolve_actions(pipeline_configs)
|
def resolve_actions(pipeline_configs)
|
||||||
@state_resolver.resolve(@pipelines, pipeline_configs)
|
@state_resolver.resolve(@pipelines_registry, pipeline_configs)
|
||||||
end
|
end
|
||||||
|
|
||||||
def dispatch_events(converge_results)
|
def dispatch_events(converge_results)
|
||||||
|
@ -395,7 +405,7 @@ class LogStash::Agent
|
||||||
end
|
end
|
||||||
|
|
||||||
def shutdown_pipelines
|
def shutdown_pipelines
|
||||||
logger.debug("Shutting down all pipelines", :pipelines_count => pipelines_count)
|
logger.debug("Shutting down all pipelines", :pipelines_count => running_pipelines_count)
|
||||||
|
|
||||||
# In this context I could just call shutdown, but I've decided to
|
# In this context I could just call shutdown, but I've decided to
|
||||||
# use the stop action implementation for that so we have the same code.
|
# use the stop action implementation for that so we have the same code.
|
||||||
|
@ -404,16 +414,6 @@ class LogStash::Agent
|
||||||
converge_state(pipeline_actions)
|
converge_state(pipeline_actions)
|
||||||
end
|
end
|
||||||
|
|
||||||
def running_pipeline?(pipeline_id)
|
|
||||||
pipeline = get_pipeline(pipeline_id)
|
|
||||||
return false unless pipeline
|
|
||||||
thread = pipeline.thread
|
|
||||||
thread.is_a?(Thread) && thread.alive?
|
|
||||||
end
|
|
||||||
|
|
||||||
def clean_state?
|
|
||||||
pipelines.empty?
|
|
||||||
end
|
|
||||||
|
|
||||||
def setting(key)
|
def setting(key)
|
||||||
@settings.get(key)
|
@settings.get(key)
|
||||||
|
|
|
@ -10,13 +10,11 @@ module LogStash module Instrument module PeriodicPoller
|
||||||
end
|
end
|
||||||
|
|
||||||
def collect
|
def collect
|
||||||
pipelines = @agent.with_running_user_defined_pipelines {|pipelines| pipelines}
|
pipelines = @agent.running_user_defined_pipelines
|
||||||
unless pipelines.nil?
|
pipelines.each do |_, pipeline|
|
||||||
pipelines.each {|_, pipeline|
|
unless pipeline.nil?
|
||||||
unless pipeline.nil?
|
pipeline.collect_dlq_stats
|
||||||
pipeline.collect_dlq_stats
|
end
|
||||||
end
|
|
||||||
}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -11,14 +11,12 @@ module LogStash module Instrument module PeriodicPoller
|
||||||
end
|
end
|
||||||
|
|
||||||
def collect
|
def collect
|
||||||
pipelines = @agent.with_running_user_defined_pipelines {|pipelines| pipelines}
|
pipelines = @agent.running_user_defined_pipelines
|
||||||
unless pipelines.nil?
|
pipelines.each do |_, pipeline|
|
||||||
pipelines.each {|_, pipeline|
|
unless pipeline.nil?
|
||||||
unless pipeline.nil?
|
pipeline.collect_stats
|
||||||
pipeline.collect_stats
|
end
|
||||||
end
|
|
||||||
}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end; end; end
|
end end end
|
||||||
|
|
|
@ -11,12 +11,12 @@ module LogStash module Instrument
|
||||||
class PeriodicPollers
|
class PeriodicPollers
|
||||||
attr_reader :metric
|
attr_reader :metric
|
||||||
|
|
||||||
def initialize(metric, queue_type, pipelines)
|
def initialize(metric, queue_type, agent)
|
||||||
@metric = metric
|
@metric = metric
|
||||||
@periodic_pollers = [PeriodicPoller::Os.new(metric),
|
@periodic_pollers = [PeriodicPoller::Os.new(metric),
|
||||||
PeriodicPoller::JVM.new(metric),
|
PeriodicPoller::JVM.new(metric),
|
||||||
PeriodicPoller::PersistentQueue.new(metric, queue_type, pipelines),
|
PeriodicPoller::PersistentQueue.new(metric, queue_type, agent),
|
||||||
PeriodicPoller::DeadLetterQueue.new(metric, pipelines)]
|
PeriodicPoller::DeadLetterQueue.new(metric, agent)]
|
||||||
end
|
end
|
||||||
|
|
||||||
def start
|
def start
|
||||||
|
|
|
@ -40,9 +40,23 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
||||||
@flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
@flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
||||||
@shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
@shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
||||||
@outputs_registered = Concurrent::AtomicBoolean.new(false)
|
@outputs_registered = Concurrent::AtomicBoolean.new(false)
|
||||||
|
|
||||||
|
# @finished_execution signals that the pipeline thread has finished its execution
|
||||||
|
# regardless of any exceptions; it will always be true when the thread completes
|
||||||
@finished_execution = Concurrent::AtomicBoolean.new(false)
|
@finished_execution = Concurrent::AtomicBoolean.new(false)
|
||||||
|
|
||||||
|
# @finished_run signals that the run methods called in the pipeline thread was completed
|
||||||
|
# without errors and it will NOT be set if the run method exits from an exception; this
|
||||||
|
# is by design and necessary for the wait_until_started semantic
|
||||||
|
@finished_run = Concurrent::AtomicBoolean.new(false)
|
||||||
|
|
||||||
|
@thread = nil
|
||||||
end # def initialize
|
end # def initialize
|
||||||
|
|
||||||
|
def finished_execution?
|
||||||
|
@finished_execution.true?
|
||||||
|
end
|
||||||
|
|
||||||
def ready?
|
def ready?
|
||||||
@ready.value
|
@ready.value
|
||||||
end
|
end
|
||||||
|
@ -84,15 +98,18 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
||||||
@logger.debug("Starting pipeline", default_logging_keys)
|
@logger.debug("Starting pipeline", default_logging_keys)
|
||||||
|
|
||||||
@finished_execution.make_false
|
@finished_execution.make_false
|
||||||
|
@finished_run.make_false
|
||||||
|
|
||||||
@thread = Thread.new do
|
@thread = Thread.new do
|
||||||
begin
|
begin
|
||||||
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
|
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
|
||||||
run
|
run
|
||||||
@finished_execution.make_true
|
@finished_run.make_true
|
||||||
rescue => e
|
rescue => e
|
||||||
close
|
close
|
||||||
logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
||||||
|
ensure
|
||||||
|
@finished_execution.make_true
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -107,15 +124,14 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
||||||
|
|
||||||
def wait_until_started
|
def wait_until_started
|
||||||
while true do
|
while true do
|
||||||
# This should be changed with an appropriate FSM
|
if @finished_run.true?
|
||||||
# It's an edge case, if we have a pipeline with
|
# it completed run without exception
|
||||||
# a generator { count => 1 } its possible that `Thread#alive?` doesn't return true
|
|
||||||
# because the execution of the thread was successful and complete
|
|
||||||
if @finished_execution.true?
|
|
||||||
return true
|
return true
|
||||||
elsif thread.nil? || !thread.alive?
|
elsif thread.nil? || !thread.alive?
|
||||||
|
# some exception occurred and the thread is dead
|
||||||
return false
|
return false
|
||||||
elsif running?
|
elsif running?
|
||||||
|
# fully initialized and running
|
||||||
return true
|
return true
|
||||||
else
|
else
|
||||||
sleep 0.01
|
sleep 0.01
|
||||||
|
|
|
@ -107,8 +107,23 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
@flushing = Concurrent::AtomicReference.new(false)
|
@flushing = Concurrent::AtomicReference.new(false)
|
||||||
@outputs_registered = Concurrent::AtomicBoolean.new(false)
|
@outputs_registered = Concurrent::AtomicBoolean.new(false)
|
||||||
@worker_shutdown = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
@worker_shutdown = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
||||||
|
|
||||||
|
# @finished_execution signals that the pipeline thread has finished its execution
|
||||||
|
# regardless of any exceptions; it will always be true when the thread completes
|
||||||
|
@finished_execution = Concurrent::AtomicBoolean.new(false)
|
||||||
|
|
||||||
|
# @finished_run signals that the run methods called in the pipeline thread was completed
|
||||||
|
# without errors and it will NOT be set if the run method exits from an exception; this
|
||||||
|
# is by design and necessary for the wait_until_started semantic
|
||||||
|
@finished_run = Concurrent::AtomicBoolean.new(false)
|
||||||
|
|
||||||
|
@thread = nil
|
||||||
end # def initialize
|
end # def initialize
|
||||||
|
|
||||||
|
def finished_execution?
|
||||||
|
@finished_execution.true?
|
||||||
|
end
|
||||||
|
|
||||||
def ready?
|
def ready?
|
||||||
@ready.value
|
@ready.value
|
||||||
end
|
end
|
||||||
|
@ -152,16 +167,19 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
"pipeline.batch.size" => settings.get("pipeline.batch.size"),
|
"pipeline.batch.size" => settings.get("pipeline.batch.size"),
|
||||||
"pipeline.batch.delay" => settings.get("pipeline.batch.delay")))
|
"pipeline.batch.delay" => settings.get("pipeline.batch.delay")))
|
||||||
|
|
||||||
@finished_execution = Concurrent::AtomicBoolean.new(false)
|
@finished_execution.make_false
|
||||||
|
@finished_run.make_false
|
||||||
|
|
||||||
@thread = Thread.new do
|
@thread = Thread.new do
|
||||||
begin
|
begin
|
||||||
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
|
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
|
||||||
run
|
run
|
||||||
@finished_execution.make_true
|
@finished_run.make_true
|
||||||
rescue => e
|
rescue => e
|
||||||
close
|
close
|
||||||
@logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
@logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
||||||
|
ensure
|
||||||
|
@finished_execution.make_true
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -176,15 +194,14 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
|
|
||||||
def wait_until_started
|
def wait_until_started
|
||||||
while true do
|
while true do
|
||||||
# This should be changed with an appropriate FSM
|
if @finished_run.true?
|
||||||
# It's an edge case, if we have a pipeline with
|
# it completed run without exception
|
||||||
# a generator { count => 1 } its possible that `Thread#alive?` doesn't return true
|
|
||||||
# because the execution of the thread was successful and complete
|
|
||||||
if @finished_execution.true?
|
|
||||||
return true
|
return true
|
||||||
elsif !thread.alive?
|
elsif thread.nil? || !thread.alive?
|
||||||
|
# some exception occured and the thread is dead
|
||||||
return false
|
return false
|
||||||
elsif running?
|
elsif running?
|
||||||
|
# fully initialized and running
|
||||||
return true
|
return true
|
||||||
else
|
else
|
||||||
sleep 0.01
|
sleep 0.01
|
||||||
|
|
|
@ -12,7 +12,7 @@ module LogStash module PipelineAction
|
||||||
end
|
end
|
||||||
alias_method :to_s, :inspect
|
alias_method :to_s, :inspect
|
||||||
|
|
||||||
def execute(agent, pipelines)
|
def execute(agent, pipelines_registry)
|
||||||
raise "`#execute` Not implemented!"
|
raise "`#execute` Not implemented!"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ require "logstash/pipeline_action/base"
|
||||||
require "logstash/pipeline"
|
require "logstash/pipeline"
|
||||||
require "logstash/java_pipeline"
|
require "logstash/java_pipeline"
|
||||||
|
|
||||||
|
|
||||||
module LogStash module PipelineAction
|
module LogStash module PipelineAction
|
||||||
class Create < Base
|
class Create < Base
|
||||||
include LogStash::Util::Loggable
|
include LogStash::Util::Loggable
|
||||||
|
@ -30,7 +31,7 @@ module LogStash module PipelineAction
|
||||||
|
|
||||||
# The execute assume that the thread safety access of the pipeline
|
# The execute assume that the thread safety access of the pipeline
|
||||||
# is managed by the caller.
|
# is managed by the caller.
|
||||||
def execute(agent, pipelines)
|
def execute(agent, pipelines_registry)
|
||||||
new_pipeline =
|
new_pipeline =
|
||||||
if @pipeline_config.settings.get_value("pipeline.java_execution")
|
if @pipeline_config.settings.get_value("pipeline.java_execution")
|
||||||
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
|
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
|
||||||
|
@ -43,21 +44,13 @@ module LogStash module PipelineAction
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
result = nil
|
success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
|
||||||
pipelines.compute(pipeline_id) do |_, current_pipeline|
|
new_pipeline.start # block until the pipeline is correctly started or crashed
|
||||||
if current_pipeline
|
|
||||||
result = LogStash::ConvergeResult::FailedAction.new("Attempted to create a pipeline that already exists")
|
|
||||||
current_pipeline
|
|
||||||
else
|
|
||||||
result = new_pipeline.start # block until the pipeline is correctly started or crashed
|
|
||||||
result ? new_pipeline : nil
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
LogStash::ConvergeResult::ActionResult.create(self, result)
|
LogStash::ConvergeResult::ActionResult.create(self, success)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
def to_s
|
def to_s
|
||||||
"PipelineAction::Create<#{pipeline_id}>"
|
"PipelineAction::Create<#{pipeline_id}>"
|
||||||
end
|
end
|
||||||
|
|
|
@ -20,8 +20,12 @@ module LogStash module PipelineAction
|
||||||
"PipelineAction::Reload<#{pipeline_id}>"
|
"PipelineAction::Reload<#{pipeline_id}>"
|
||||||
end
|
end
|
||||||
|
|
||||||
def execute(agent, pipelines)
|
def execute(agent, pipelines_registry)
|
||||||
old_pipeline = pipelines[pipeline_id]
|
old_pipeline = pipelines_registry.get_pipeline(pipeline_id)
|
||||||
|
|
||||||
|
if old_pipeline.nil?
|
||||||
|
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the pipeline does not exist")
|
||||||
|
end
|
||||||
|
|
||||||
if !old_pipeline.reloadable?
|
if !old_pipeline.reloadable?
|
||||||
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
|
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
|
||||||
|
@ -49,13 +53,35 @@ module LogStash module PipelineAction
|
||||||
|
|
||||||
logger.info("Reloading pipeline", "pipeline.id" => pipeline_id)
|
logger.info("Reloading pipeline", "pipeline.id" => pipeline_id)
|
||||||
|
|
||||||
stop_result = Stop.new(pipeline_id).execute(agent, pipelines)
|
success = pipelines_registry.reload_pipeline(pipeline_id) do
|
||||||
|
# important NOT to explicitly return from block here
|
||||||
|
# the block must emit a success boolean value
|
||||||
|
|
||||||
if stop_result.successful?
|
# First shutdown old pipeline
|
||||||
Create.new(@pipeline_config, @metric).execute(agent, pipelines)
|
old_pipeline.shutdown { LogStash::ShutdownWatcher.start(old_pipeline) }
|
||||||
else
|
old_pipeline.thread.join
|
||||||
stop_result
|
|
||||||
|
# Then create a new pipeline
|
||||||
|
new_pipeline =
|
||||||
|
if @pipeline_config.settings.get_value("pipeline.java_execution")
|
||||||
|
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
|
||||||
|
else
|
||||||
|
agent.exclusive do
|
||||||
|
# The Ruby pipeline initialization is not thread safe because of the module level
|
||||||
|
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
|
||||||
|
# executed simultaneously in different threads and we need to synchronize this initialization.
|
||||||
|
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
success = new_pipeline.start # block until the pipeline is correctly started or crashed
|
||||||
|
|
||||||
|
# return success and new_pipeline to registry reload_pipeline
|
||||||
|
[success, new_pipeline]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
LogStash::ConvergeResult::ActionResult.create(self, success)
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end end
|
end end
|
||||||
|
|
|
@ -9,11 +9,10 @@ module LogStash module PipelineAction
|
||||||
@pipeline_id = pipeline_id
|
@pipeline_id = pipeline_id
|
||||||
end
|
end
|
||||||
|
|
||||||
def execute(agent, pipelines)
|
def execute(agent, pipelines_registry)
|
||||||
pipelines.compute(pipeline_id) do |_, pipeline|
|
pipelines_registry.terminate_pipeline(pipeline_id) do |pipeline|
|
||||||
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
|
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
|
||||||
pipeline.thread.join
|
pipeline.thread.join
|
||||||
nil # remove pipeline from pipelines
|
|
||||||
end
|
end
|
||||||
|
|
||||||
LogStash::ConvergeResult::SuccessfulAction.new
|
LogStash::ConvergeResult::SuccessfulAction.new
|
||||||
|
|
166
logstash-core/lib/logstash/pipelines_registry.rb
Normal file
166
logstash-core/lib/logstash/pipelines_registry.rb
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
|
||||||
|
module LogStash
|
||||||
|
class PipelineState
|
||||||
|
attr_reader :pipeline_id, :pipeline
|
||||||
|
|
||||||
|
def initialize(pipeline_id, pipeline)
|
||||||
|
@pipeline_id = pipeline_id
|
||||||
|
@pipeline = pipeline
|
||||||
|
@reloading = Concurrent::AtomicBoolean.new(false)
|
||||||
|
end
|
||||||
|
|
||||||
|
def terminated?
|
||||||
|
# a reloading pipeline is never considered terminated
|
||||||
|
@reloading.false? && @pipeline.finished_execution?
|
||||||
|
end
|
||||||
|
|
||||||
|
def set_reloading(is_reloading)
|
||||||
|
@reloading.value = is_reloading
|
||||||
|
end
|
||||||
|
|
||||||
|
def set_pipeline(pipeline)
|
||||||
|
raise(ArgumentError, "invalid nil pipeline") if pipeline.nil?
|
||||||
|
@pipeline = pipeline
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class PipelinesRegistry
|
||||||
|
attr_reader :states
|
||||||
|
include LogStash::Util::Loggable
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
# we leverage the semantic of the Java ConcurrentHashMap for the
|
||||||
|
# compute() method which is atomic; calling compute() concurrently
|
||||||
|
# will block until the other compute finishes so no mutex is necessary
|
||||||
|
# for synchronizing compute calls
|
||||||
|
@states = java.util.concurrent.ConcurrentHashMap.new
|
||||||
|
end
|
||||||
|
|
||||||
|
# Execute the passed creation logic block and create a new state upon success
|
||||||
|
# @param pipeline_id [String, Symbol] the pipeline id
|
||||||
|
# @param pipeline [Pipeline] the new pipeline to create
|
||||||
|
# @param create_block [Block] the creation execution logic
|
||||||
|
#
|
||||||
|
# @yieldreturn [Boolean] the new pipeline creation success
|
||||||
|
#
|
||||||
|
# @return [Boolean] new pipeline creation success
|
||||||
|
def create_pipeline(pipeline_id, pipeline, &create_block)
|
||||||
|
success = false
|
||||||
|
|
||||||
|
@states.compute(pipeline_id) do |_, state|
|
||||||
|
if state
|
||||||
|
if state.terminated?
|
||||||
|
success = yield
|
||||||
|
state.set_pipeline(pipeline)
|
||||||
|
else
|
||||||
|
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
|
||||||
|
end
|
||||||
|
state
|
||||||
|
else
|
||||||
|
success = yield
|
||||||
|
success ? PipelineState.new(pipeline_id, pipeline) : nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
success
|
||||||
|
end
|
||||||
|
|
||||||
|
# Execute the passed termination logic block
|
||||||
|
# @param pipeline_id [String, Symbol] the pipeline id
|
||||||
|
# @param stop_block [Block] the termination execution logic
|
||||||
|
#
|
||||||
|
# @yieldparam [Pipeline] the pipeline to terminate
|
||||||
|
def terminate_pipeline(pipeline_id, &stop_block)
|
||||||
|
@states.compute(pipeline_id) do |_, state|
|
||||||
|
if state.nil?
|
||||||
|
logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
|
||||||
|
nil
|
||||||
|
else
|
||||||
|
yield(state.pipeline)
|
||||||
|
state
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state
|
||||||
|
# @param pipeline_id [String, Symbol] the pipeline id
|
||||||
|
# @param reload_block [Block] the reloading execution logic
|
||||||
|
#
|
||||||
|
# @yieldreturn [Array<Boolean, Pipeline>] the new pipeline creation success and new pipeline object
|
||||||
|
#
|
||||||
|
# @return [Boolean] new pipeline creation success
|
||||||
|
def reload_pipeline(pipeline_id, &reload_block)
|
||||||
|
success = false
|
||||||
|
|
||||||
|
@states.compute(pipeline_id) do |_, state|
|
||||||
|
if state.nil?
|
||||||
|
logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
|
||||||
|
nil
|
||||||
|
else
|
||||||
|
state.set_reloading(true)
|
||||||
|
begin
|
||||||
|
success, new_pipeline = yield
|
||||||
|
state.set_pipeline(new_pipeline)
|
||||||
|
ensure
|
||||||
|
state.set_reloading(false)
|
||||||
|
end
|
||||||
|
state
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
success
|
||||||
|
end
|
||||||
|
|
||||||
|
# @param pipeline_id [String, Symbol] the pipeline id
|
||||||
|
# @return [Pipeline] the pipeline object or nil if none for pipeline_id
|
||||||
|
def get_pipeline(pipeline_id)
|
||||||
|
state = @states.get(pipeline_id)
|
||||||
|
state.nil? ? nil : state.pipeline
|
||||||
|
end
|
||||||
|
|
||||||
|
# @return [Fixnum] number of items in the states collection
|
||||||
|
def size
|
||||||
|
@states.size
|
||||||
|
end
|
||||||
|
|
||||||
|
# @return [Boolean] true if the states collection is empty.
|
||||||
|
def empty?
|
||||||
|
@states.isEmpty
|
||||||
|
end
|
||||||
|
|
||||||
|
# @return [Hash{String=>Pipeline}]
|
||||||
|
def running_pipelines
|
||||||
|
select_pipelines { |state| !state.terminated? }
|
||||||
|
end
|
||||||
|
|
||||||
|
# @return [Hash{String=>Pipeline}]
|
||||||
|
def non_running_pipelines
|
||||||
|
select_pipelines { |state| state.terminated? }
|
||||||
|
end
|
||||||
|
|
||||||
|
# @return [Hash{String=>Pipeline}]
|
||||||
|
def running_user_defined_pipelines
|
||||||
|
select_pipelines { |state | !state.terminated? && !state.pipeline.system? }
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
# Returns a mapping of pipelines by their ids.
|
||||||
|
# Pipelines can optionally be filtered by their `PipelineState` by passing
|
||||||
|
# a block that returns truthy when a pipeline should be included in the
|
||||||
|
# result.
|
||||||
|
#
|
||||||
|
# @yieldparam [PipelineState]
|
||||||
|
# @yieldreturn [Boolean]
|
||||||
|
#
|
||||||
|
# @return [Hash{String=>Pipeline}]
|
||||||
|
def select_pipelines(&optional_state_filter)
|
||||||
|
@states.each_with_object({}) do |(id, state), memo|
|
||||||
|
if state && (!block_given? || yield(state))
|
||||||
|
memo[id] = state.pipeline
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -10,11 +10,11 @@ module LogStash
|
||||||
@metric = metric
|
@metric = metric
|
||||||
end
|
end
|
||||||
|
|
||||||
def resolve(pipelines, pipeline_configs)
|
def resolve(pipelines_registry, pipeline_configs)
|
||||||
actions = []
|
actions = []
|
||||||
|
|
||||||
pipeline_configs.each do |pipeline_config|
|
pipeline_configs.each do |pipeline_config|
|
||||||
pipeline = pipelines[pipeline_config.pipeline_id]
|
pipeline = pipelines_registry.get_pipeline(pipeline_config.pipeline_id)
|
||||||
|
|
||||||
if pipeline.nil?
|
if pipeline.nil?
|
||||||
actions << LogStash::PipelineAction::Create.new(pipeline_config, @metric)
|
actions << LogStash::PipelineAction::Create.new(pipeline_config, @metric)
|
||||||
|
@ -25,12 +25,12 @@ module LogStash
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
running_pipelines = pipeline_configs.collect(&:pipeline_id)
|
configured_pipelines = pipeline_configs.collect(&:pipeline_id)
|
||||||
|
|
||||||
# If one of the running pipeline is not in the pipeline_configs, we assume that we need to
|
# If one of the running pipeline is not in the pipeline_configs, we assume that we need to
|
||||||
# stop it.
|
# stop it.
|
||||||
pipelines.keys
|
pipelines_registry.running_pipelines.keys
|
||||||
.select { |pipeline_id| !running_pipelines.include?(pipeline_id) }
|
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
|
||||||
.each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) }
|
.each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) }
|
||||||
|
|
||||||
actions.sort # See logstash/pipeline_action.rb
|
actions.sort # See logstash/pipeline_action.rb
|
||||||
|
|
|
@ -49,7 +49,7 @@ describe LogStash::Agent do
|
||||||
|
|
||||||
context "system pipeline" do
|
context "system pipeline" do
|
||||||
|
|
||||||
let(:system_pipeline_config) { mock_pipeline_config(:system_pipeline, "input { generator { } } output { null {} }", { "pipeline.system" => true }) }
|
let(:system_pipeline_config) { mock_pipeline_config(:system_pipeline, "input { dummyblockinginput { } } output { null {} }", { "pipeline.system" => true }) }
|
||||||
|
|
||||||
context "when we have a finite pipeline and a system pipeline running" do
|
context "when we have a finite pipeline and a system pipeline running" do
|
||||||
|
|
||||||
|
@ -65,40 +65,40 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when we have an infinite pipeline and a system pipeline running" do
|
context "when we have an infinite pipeline and a system pipeline running" do
|
||||||
let(:infinite_pipeline_config) { mock_pipeline_config(:main, "input { generator { } } output { null {} }") }
|
let(:infinite_pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { } } output { null {} }") }
|
||||||
|
|
||||||
let(:source_loader) do
|
let(:source_loader) do
|
||||||
TestSourceLoader.new(infinite_pipeline_config, system_pipeline_config)
|
TestSourceLoader.new(infinite_pipeline_config, system_pipeline_config)
|
||||||
end
|
end
|
||||||
|
|
||||||
before(:each) do
|
before(:each) do
|
||||||
@agent_task = start_agent(subject)
|
@agent_task = start_agent(subject)
|
||||||
end
|
end
|
||||||
|
|
||||||
after(:each) do
|
after(:each) do
|
||||||
@agent_task.stop!
|
@agent_task.stop!
|
||||||
|
@agent_task.wait
|
||||||
|
subject.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "#running_user_defined_pipelines" do
|
describe "#running_user_defined_pipelines" do
|
||||||
it "returns the user defined pipelines" do
|
it "returns the user defined pipelines" do
|
||||||
wait_for do
|
# wait is necessary to accommodate for pipelines startup time
|
||||||
subject.with_running_user_defined_pipelines {|pipelines| pipelines.keys }
|
wait(60).for {subject.running_user_defined_pipelines.keys}.to eq([:main])
|
||||||
end.to eq([:main])
|
end
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "#running_user_defined_pipelines?" do
|
describe "#running_user_defined_pipelines?" do
|
||||||
it "returns true" do
|
it "returns true" do
|
||||||
wait_for do
|
# wait is necessary to accommodate for pipelines startup time
|
||||||
subject.running_user_defined_pipelines?
|
wait(60).for {subject.running_user_defined_pipelines?}.to be_truthy
|
||||||
end.to be_truthy
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when `config.reload.automatic`" do
|
context "when `config.reload.automatic`" do
|
||||||
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator {} } output { null {} }") }
|
let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput {} } output { null {} }") }
|
||||||
|
|
||||||
let(:source_loader) do
|
let(:source_loader) do
|
||||||
TestSourceLoader.new(pipeline_config)
|
TestSourceLoader.new(pipeline_config)
|
||||||
|
@ -114,14 +114,14 @@ describe LogStash::Agent do
|
||||||
|
|
||||||
after(:each) do
|
after(:each) do
|
||||||
@agent_task.stop!
|
@agent_task.stop!
|
||||||
|
@agent_task.wait
|
||||||
|
subject.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
it "converge only once" do
|
it "converge only once" do
|
||||||
wait(60).for { source_loader.fetch_count }.to eq(1)
|
wait(60).for { source_loader.fetch_count }.to eq(1)
|
||||||
|
# no need to wait here because have_running_pipeline? does the wait
|
||||||
expect(subject).to have_running_pipeline?(pipeline_config)
|
expect(subject).to have_running_pipeline?(pipeline_config)
|
||||||
|
|
||||||
subject.shutdown
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -135,8 +135,6 @@ describe LogStash::Agent do
|
||||||
|
|
||||||
expect(source_loader.fetch_count).to eq(1)
|
expect(source_loader.fetch_count).to eq(1)
|
||||||
expect(subject.pipelines_count).to eq(0)
|
expect(subject.pipelines_count).to eq(0)
|
||||||
|
|
||||||
subject.shutdown
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -149,26 +147,25 @@ describe LogStash::Agent do
|
||||||
"config.reload.interval" => interval
|
"config.reload.interval" => interval
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
before(:each) do
|
before(:each) do
|
||||||
@agent_task = start_agent(subject)
|
@agent_task = start_agent(subject)
|
||||||
end
|
end
|
||||||
|
|
||||||
after(:each) do
|
after(:each) do
|
||||||
@agent_task.stop!
|
@agent_task.stop!
|
||||||
|
@agent_task.wait
|
||||||
|
subject.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
context "and successfully load the config" do
|
context "and successfully load the config" do
|
||||||
it "converges periodically the pipelines from the configs source" do
|
it "converges periodically the pipelines from the configs source" do
|
||||||
sleep(2) # let the interval reload a few times
|
# no need to wait here because have_running_pipeline? does the wait
|
||||||
expect(subject).to have_running_pipeline?(pipeline_config)
|
expect(subject).to have_running_pipeline?(pipeline_config)
|
||||||
|
|
||||||
# we rely on a periodic thread to call fetch count, we have seen unreliable run on
|
# we rely on a periodic thread to call fetch count, we have seen unreliable run on
|
||||||
# travis, so lets add a few retries
|
# travis, so lets add a few retries
|
||||||
try do
|
try { expect(source_loader.fetch_count).to be > 1 }
|
||||||
expect(source_loader.fetch_count).to be > 1
|
|
||||||
end
|
|
||||||
|
|
||||||
subject.shutdown
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -178,12 +175,9 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "it will keep trying to converge" do
|
it "it will keep trying to converge" do
|
||||||
|
|
||||||
sleep(agent_settings.get("config.reload.interval") / 1_000_000_000.0 * 20) # let the interval reload a few times
|
sleep(agent_settings.get("config.reload.interval") / 1_000_000_000.0 * 20) # let the interval reload a few times
|
||||||
expect(subject.pipelines_count).to eq(0)
|
expect(subject.pipelines_count).to eq(0)
|
||||||
expect(source_loader.fetch_count).to be > 1
|
expect(source_loader.fetch_count).to be > 1
|
||||||
|
|
||||||
subject.shutdown
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -191,8 +185,8 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when shutting down the agent" do
|
context "when shutting down the agent" do
|
||||||
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator {} } output { null {} }") }
|
let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput {} } output { null {} }") }
|
||||||
let(:new_pipeline_config) { mock_pipeline_config(:new, "input { generator { id => 'new' } } output { null {} }") }
|
let(:new_pipeline_config) { mock_pipeline_config(:new, "input { dummyblockinginput { id => 'new' } } output { null {} }") }
|
||||||
|
|
||||||
let(:source_loader) do
|
let(:source_loader) do
|
||||||
TestSourceLoader.new([pipeline_config, new_pipeline_config])
|
TestSourceLoader.new([pipeline_config, new_pipeline_config])
|
||||||
|
@ -205,8 +199,8 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "Configuration converge scenario" do
|
context "Configuration converge scenario" do
|
||||||
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator {} } output { null {} }", { "pipeline.reloadable" => true }) }
|
let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput {} } output { null {} }", { "pipeline.reloadable" => true }) }
|
||||||
let(:new_pipeline_config) { mock_pipeline_config(:new, "input { generator {} } output { null {} }", { "pipeline.reloadable" => true }) }
|
let(:new_pipeline_config) { mock_pipeline_config(:new, "input { dummyblockinginput {} } output { null {} }", { "pipeline.reloadable" => true }) }
|
||||||
|
|
||||||
before do
|
before do
|
||||||
# Set the Agent to an initial state of pipelines
|
# Set the Agent to an initial state of pipelines
|
||||||
|
@ -263,7 +257,7 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when the source return a modified pipeline" do
|
context "when the source return a modified pipeline" do
|
||||||
let(:modified_pipeline_config) { mock_pipeline_config(:main, "input { generator { id => 'new-and-modified' } } output { null {} }", { "pipeline.reloadable" => true }) }
|
let(:modified_pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => 'new-and-modified' } } output { null {} }", { "pipeline.reloadable" => true }) }
|
||||||
|
|
||||||
let(:source_loader) do
|
let(:source_loader) do
|
||||||
TestSequenceSourceLoader.new(
|
TestSequenceSourceLoader.new(
|
||||||
|
|
|
@ -119,7 +119,7 @@ describe LogStash::Agent do
|
||||||
context "if state is clean" do
|
context "if state is clean" do
|
||||||
before :each do
|
before :each do
|
||||||
allow(subject).to receive(:running_user_defined_pipelines?).and_return(true)
|
allow(subject).to receive(:running_user_defined_pipelines?).and_return(true)
|
||||||
allow(subject).to receive(:clean_state?).and_return(false)
|
allow(subject).to receive(:no_pipeline?).and_return(false)
|
||||||
end
|
end
|
||||||
|
|
||||||
it "should not converge state more than once" do
|
it "should not converge state more than once" do
|
||||||
|
@ -142,7 +142,7 @@ describe LogStash::Agent do
|
||||||
it "does not upgrade the new config" do
|
it "does not upgrade the new config" do
|
||||||
t = Thread.new { subject.execute }
|
t = Thread.new { subject.execute }
|
||||||
wait(timeout)
|
wait(timeout)
|
||||||
.for { subject.running_pipelines? && subject.pipelines.values.first.ready? }
|
.for { subject.running_pipelines? && subject.running_pipelines.values.first.ready? }
|
||||||
.to eq(true)
|
.to eq(true)
|
||||||
expect(subject.converge_state_and_update).not_to be_a_successful_converge
|
expect(subject.converge_state_and_update).not_to be_a_successful_converge
|
||||||
expect(subject).to have_running_pipeline?(mock_config_pipeline)
|
expect(subject).to have_running_pipeline?(mock_config_pipeline)
|
||||||
|
@ -162,7 +162,7 @@ describe LogStash::Agent do
|
||||||
it "does upgrade the new config" do
|
it "does upgrade the new config" do
|
||||||
t = Thread.new { subject.execute }
|
t = Thread.new { subject.execute }
|
||||||
Timeout.timeout(timeout) do
|
Timeout.timeout(timeout) do
|
||||||
sleep(0.1) until subject.pipelines_count > 0 && subject.pipelines.values.first.ready?
|
sleep(0.1) until subject.running_pipelines_count > 0 && subject.running_pipelines.values.first.ready?
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(subject.converge_state_and_update).to be_a_successful_converge
|
expect(subject.converge_state_and_update).to be_a_successful_converge
|
||||||
|
@ -186,7 +186,7 @@ describe LogStash::Agent do
|
||||||
it "does not try to reload the pipeline" do
|
it "does not try to reload the pipeline" do
|
||||||
t = Thread.new { subject.execute }
|
t = Thread.new { subject.execute }
|
||||||
Timeout.timeout(timeout) do
|
Timeout.timeout(timeout) do
|
||||||
sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.running?
|
sleep(0.1) until subject.running_pipelines? && subject.running_pipelines.values.first.running?
|
||||||
end
|
end
|
||||||
expect(subject.converge_state_and_update).not_to be_a_successful_converge
|
expect(subject.converge_state_and_update).not_to be_a_successful_converge
|
||||||
expect(subject).to have_running_pipeline?(mock_config_pipeline)
|
expect(subject).to have_running_pipeline?(mock_config_pipeline)
|
||||||
|
@ -206,7 +206,7 @@ describe LogStash::Agent do
|
||||||
it "tries to reload the pipeline" do
|
it "tries to reload the pipeline" do
|
||||||
t = Thread.new { subject.execute }
|
t = Thread.new { subject.execute }
|
||||||
Timeout.timeout(timeout) do
|
Timeout.timeout(timeout) do
|
||||||
sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.running?
|
sleep(0.1) until subject.running_pipelines? && subject.running_pipelines.values.first.running?
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(subject.converge_state_and_update).to be_a_successful_converge
|
expect(subject.converge_state_and_update).to be_a_successful_converge
|
||||||
|
|
|
@ -2,13 +2,14 @@
|
||||||
require "spec_helper"
|
require "spec_helper"
|
||||||
require_relative "../../support/helpers"
|
require_relative "../../support/helpers"
|
||||||
require_relative "../../support/matchers"
|
require_relative "../../support/matchers"
|
||||||
|
require "logstash/pipelines_registry"
|
||||||
require "logstash/pipeline_action/create"
|
require "logstash/pipeline_action/create"
|
||||||
require "logstash/inputs/generator"
|
require "logstash/inputs/generator"
|
||||||
|
|
||||||
describe LogStash::PipelineAction::Create do
|
describe LogStash::PipelineAction::Create do
|
||||||
let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }
|
let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }
|
||||||
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") }
|
let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } output { null {} }") }
|
||||||
let(:pipelines) { java.util.concurrent.ConcurrentHashMap.new }
|
let(:pipelines) { LogStash::PipelinesRegistry.new }
|
||||||
let(:agent) { double("agent") }
|
let(:agent) { double("agent") }
|
||||||
|
|
||||||
before do
|
before do
|
||||||
|
@ -18,7 +19,7 @@ describe LogStash::PipelineAction::Create do
|
||||||
subject { described_class.new(pipeline_config, metric) }
|
subject { described_class.new(pipeline_config, metric) }
|
||||||
|
|
||||||
after do
|
after do
|
||||||
pipelines.each do |_, pipeline|
|
pipelines.running_pipelines do |_, pipeline|
|
||||||
pipeline.shutdown
|
pipeline.shutdown
|
||||||
pipeline.thread.join
|
pipeline.thread.join
|
||||||
end
|
end
|
||||||
|
@ -44,7 +45,7 @@ describe LogStash::PipelineAction::Create do
|
||||||
|
|
||||||
it "starts the pipeline" do
|
it "starts the pipeline" do
|
||||||
subject.execute(agent, pipelines)
|
subject.execute(agent, pipelines)
|
||||||
expect(pipelines[:main].running?).to be_truthy
|
expect(pipelines.get_pipeline(:main).running?).to be_truthy
|
||||||
end
|
end
|
||||||
|
|
||||||
it "returns a successful execution status" do
|
it "returns a successful execution status" do
|
||||||
|
@ -54,7 +55,7 @@ describe LogStash::PipelineAction::Create do
|
||||||
|
|
||||||
context "when the pipeline doesn't start" do
|
context "when the pipeline doesn't start" do
|
||||||
context "with a syntax error" do
|
context "with a syntax error" do
|
||||||
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { stdout ") } # bad syntax
|
let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } output { stdout ") } # bad syntax
|
||||||
|
|
||||||
it "raises the exception upstream" do
|
it "raises the exception upstream" do
|
||||||
expect { subject.execute(agent, pipelines) }.to raise_error
|
expect { subject.execute(agent, pipelines) }.to raise_error
|
||||||
|
@ -62,7 +63,7 @@ describe LogStash::PipelineAction::Create do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "with an error raised during `#register`" do
|
context "with an error raised during `#register`" do
|
||||||
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } filter { ruby { init => '1/0' code => '1+2' } } output { null {} }") }
|
let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } filter { ruby { init => '1/0' code => '1+2' } } output { null {} }") }
|
||||||
|
|
||||||
it "returns false" do
|
it "returns false" do
|
||||||
expect(subject.execute(agent, pipelines)).not_to be_a_successful_action
|
expect(subject.execute(agent, pipelines)).not_to be_a_successful_action
|
||||||
|
@ -71,8 +72,8 @@ describe LogStash::PipelineAction::Create do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when sorting create action" do
|
context "when sorting create action" do
|
||||||
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") }
|
let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } output { null {} }") }
|
||||||
let(:system_pipeline_config) { mock_pipeline_config(:main_2, "input { generator { id => '123' } } output { null {} }", { "pipeline.system" => true }) }
|
let(:system_pipeline_config) { mock_pipeline_config(:main_2, "input { dummyblockinginput { id => '123' } } output { null {} }", { "pipeline.system" => true }) }
|
||||||
|
|
||||||
it "should give higher priority to system pipeline" do
|
it "should give higher priority to system pipeline" do
|
||||||
action_user_pipeline = described_class.new(pipeline_config, metric)
|
action_user_pipeline = described_class.new(pipeline_config, metric)
|
||||||
|
|
|
@ -2,15 +2,16 @@
|
||||||
require "spec_helper"
|
require "spec_helper"
|
||||||
require_relative "../../support/helpers"
|
require_relative "../../support/helpers"
|
||||||
require_relative "../../support/matchers"
|
require_relative "../../support/matchers"
|
||||||
|
require "logstash/pipelines_registry"
|
||||||
require "logstash/pipeline_action/reload"
|
require "logstash/pipeline_action/reload"
|
||||||
|
|
||||||
describe LogStash::PipelineAction::Reload do
|
describe LogStash::PipelineAction::Reload do
|
||||||
let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }
|
let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }
|
||||||
let(:pipeline_id) { :main }
|
let(:pipeline_id) { :main }
|
||||||
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) }
|
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) }
|
||||||
let(:pipeline_config) { "input { generator {} } output { null {} }" }
|
let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
|
||||||
let(:pipeline) { mock_pipeline_from_string(pipeline_config, mock_settings("pipeline.reloadable" => true)) }
|
let(:pipeline) { mock_pipeline_from_string(pipeline_config, mock_settings("pipeline.reloadable" => true)) }
|
||||||
let(:pipelines) { chm = java.util.concurrent.ConcurrentHashMap.new; chm[pipeline_id] = pipeline; chm }
|
let(:pipelines) { r = LogStash::PipelinesRegistry.new; r.create_pipeline(pipeline_id, pipeline) { true }; r }
|
||||||
let(:agent) { double("agent") }
|
let(:agent) { double("agent") }
|
||||||
|
|
||||||
subject { described_class.new(new_pipeline_config, metric) }
|
subject { described_class.new(new_pipeline_config, metric) }
|
||||||
|
@ -21,7 +22,7 @@ describe LogStash::PipelineAction::Reload do
|
||||||
end
|
end
|
||||||
|
|
||||||
after do
|
after do
|
||||||
pipelines.each do |_, pipeline|
|
pipelines.running_pipelines do |_, pipeline|
|
||||||
pipeline.shutdown
|
pipeline.shutdown
|
||||||
pipeline.thread.join
|
pipeline.thread.join
|
||||||
end
|
end
|
||||||
|
@ -38,12 +39,12 @@ describe LogStash::PipelineAction::Reload do
|
||||||
|
|
||||||
it "start the new pipeline" do
|
it "start the new pipeline" do
|
||||||
subject.execute(agent, pipelines)
|
subject.execute(agent, pipelines)
|
||||||
expect(pipelines[pipeline_id].running?).to be_truthy
|
expect(pipelines.get_pipeline(pipeline_id).running?).to be_truthy
|
||||||
end
|
end
|
||||||
|
|
||||||
it "run the new pipeline code" do
|
it "run the new pipeline code" do
|
||||||
subject.execute(agent, pipelines)
|
subject.execute(agent, pipelines)
|
||||||
expect(pipelines[pipeline_id].config_hash).to eq(new_pipeline_config.config_hash)
|
expect(pipelines.get_pipeline(pipeline_id).config_hash).to eq(new_pipeline_config.config_hash)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -58,7 +59,7 @@ describe LogStash::PipelineAction::Reload do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when the new pipeline is not reloadable" do
|
context "when the new pipeline is not reloadable" do
|
||||||
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
|
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
|
||||||
|
|
||||||
it "cannot successfully execute the action" do
|
it "cannot successfully execute the action" do
|
||||||
expect(subject.execute(agent, pipelines)).not_to be_a_successful_action
|
expect(subject.execute(agent, pipelines)).not_to be_a_successful_action
|
||||||
|
@ -66,7 +67,7 @@ describe LogStash::PipelineAction::Reload do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when the new pipeline has syntax errors" do
|
context "when the new pipeline has syntax errors" do
|
||||||
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
|
let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
|
||||||
|
|
||||||
it "cannot successfully execute the action" do
|
it "cannot successfully execute the action" do
|
||||||
expect(subject.execute(agent, pipelines)).not_to be_a_successful_action
|
expect(subject.execute(agent, pipelines)).not_to be_a_successful_action
|
||||||
|
@ -75,7 +76,7 @@ describe LogStash::PipelineAction::Reload do
|
||||||
|
|
||||||
context "when there is an error in the register" do
|
context "when there is an error in the register" do
|
||||||
before do
|
before do
|
||||||
allow_any_instance_of(LogStash::Inputs::Generator).to receive(:register).and_raise("Bad value")
|
allow_any_instance_of(LogStash::Inputs::DummyBlockingInput).to receive(:register).and_raise("Bad value")
|
||||||
end
|
end
|
||||||
|
|
||||||
it "cannot successfully execute the action" do
|
it "cannot successfully execute the action" do
|
||||||
|
|
|
@ -1,14 +1,15 @@
|
||||||
# encoding: utf-8
|
# encoding: utf-8
|
||||||
require "spec_helper"
|
require "spec_helper"
|
||||||
require_relative "../../support/helpers"
|
require_relative "../../support/helpers"
|
||||||
|
require "logstash/pipelines_registry"
|
||||||
require "logstash/pipeline_action/stop"
|
require "logstash/pipeline_action/stop"
|
||||||
require "logstash/pipeline"
|
require "logstash/pipeline"
|
||||||
|
|
||||||
describe LogStash::PipelineAction::Stop do
|
describe LogStash::PipelineAction::Stop do
|
||||||
let(:pipeline_config) { "input { generator {} } output { null {} }" }
|
let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
|
||||||
let(:pipeline_id) { :main }
|
let(:pipeline_id) { :main }
|
||||||
let(:pipeline) { mock_pipeline_from_string(pipeline_config) }
|
let(:pipeline) { mock_pipeline_from_string(pipeline_config) }
|
||||||
let(:pipelines) { chm = java.util.concurrent.ConcurrentHashMap.new; chm[:main] = pipeline; chm }
|
let(:pipelines) { chm = LogStash::PipelinesRegistry.new; chm.create_pipeline(pipeline_id, pipeline) { true }; chm }
|
||||||
let(:agent) { double("agent") }
|
let(:agent) { double("agent") }
|
||||||
|
|
||||||
subject { described_class.new(pipeline_id) }
|
subject { described_class.new(pipeline_id) }
|
||||||
|
@ -31,6 +32,6 @@ describe LogStash::PipelineAction::Stop do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "removes the pipeline from the running pipelines" do
|
it "removes the pipeline from the running pipelines" do
|
||||||
expect { subject.execute(agent, pipelines) }.to change { pipelines.include?(pipeline_id) }.from(true).to(false)
|
expect { subject.execute(agent, pipelines) }.to change { pipelines.running_pipelines.keys }.from([:main]).to([])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
220
logstash-core/spec/logstash/pipelines_registry_spec.rb
Normal file
220
logstash-core/spec/logstash/pipelines_registry_spec.rb
Normal file
|
@ -0,0 +1,220 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "spec_helper"
|
||||||
|
require "logstash/pipelines_registry"
|
||||||
|
|
||||||
|
describe LogStash::PipelinesRegistry do
|
||||||
|
|
||||||
|
let(:pipeline_id) { "test" }
|
||||||
|
let(:pipeline) { double("Pipeline") }
|
||||||
|
let (:logger) { double("Logger") }
|
||||||
|
|
||||||
|
context "at object creation" do
|
||||||
|
it "should be empty" do
|
||||||
|
expect(subject.size).to eq(0)
|
||||||
|
expect(subject.empty?).to be_truthy
|
||||||
|
expect(subject.running_pipelines).to be_empty
|
||||||
|
expect(subject.non_running_pipelines).to be_empty
|
||||||
|
expect(subject.running_user_defined_pipelines).to be_empty
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "creating a pipeline" do
|
||||||
|
context "without existing same pipeline id" do
|
||||||
|
it "registry should not have a state for pipeline_id" do
|
||||||
|
expect(subject.get_pipeline(pipeline_id)).to be_nil
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should return block return value" do
|
||||||
|
expect(subject.create_pipeline(pipeline_id, pipeline) { "dummy" }).to eq("dummy")
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should register the new pipeline upon successful create block" do
|
||||||
|
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||||
|
expect(subject.get_pipeline(pipeline_id)).to eq(pipeline)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not register the new pipeline upon unsuccessful create block" do
|
||||||
|
subject.create_pipeline(pipeline_id, pipeline) { false }
|
||||||
|
expect(subject.get_pipeline(pipeline_id)).to be_nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "with existing pipeline id" do
|
||||||
|
before :each do
|
||||||
|
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||||
|
end
|
||||||
|
|
||||||
|
it "registry should have a state for pipeline_id" do
|
||||||
|
expect(subject.get_pipeline(pipeline_id)).to eq(pipeline)
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when existing pipeline is not terminated" do
|
||||||
|
before :each do
|
||||||
|
expect(pipeline).to receive(:finished_execution?).and_return(false)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should return false" do
|
||||||
|
expect(subject.create_pipeline(pipeline_id, pipeline) { "dummy" }).to be_falsey
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not call block and log error if pipeline is not terminated" do
|
||||||
|
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
|
||||||
|
expect(logger).to receive(:error)
|
||||||
|
expect { |b| subject.create_pipeline(pipeline_id, pipeline, &b) }.not_to yield_control
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when existing pipeline is terminated" do
|
||||||
|
let (:new_pipeline) { double("New Pipeline") }
|
||||||
|
|
||||||
|
before :each do
|
||||||
|
expect(pipeline).to receive(:finished_execution?).and_return(true)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should return block value" do
|
||||||
|
expect(subject.create_pipeline(pipeline_id, new_pipeline) { "dummy" }).to eq("dummy")
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should return block value" do
|
||||||
|
expect(subject.create_pipeline(pipeline_id, new_pipeline) { "dummy" }).to eq("dummy")
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should register new pipeline" do
|
||||||
|
subject.create_pipeline(pipeline_id, new_pipeline) { true }
|
||||||
|
expect(subject.get_pipeline(pipeline_id)).to eq(new_pipeline)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "terminating a pipeline" do
|
||||||
|
context "without existing pipeline id" do
|
||||||
|
it "should log error" do
|
||||||
|
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
|
||||||
|
expect(logger).to receive(:error)
|
||||||
|
subject.terminate_pipeline(pipeline_id) { "dummy" }
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not yield to block" do
|
||||||
|
expect { |b| subject.terminate_pipeline(pipeline_id, &b) }.not_to yield_control
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "with existing pipeline id" do
|
||||||
|
before :each do
|
||||||
|
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should yield to block" do
|
||||||
|
expect { |b| subject.terminate_pipeline(pipeline_id, &b) }.to yield_control
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should keep pipeline id" do
|
||||||
|
subject.terminate_pipeline(pipeline_id) { "dummy" }
|
||||||
|
expect(subject.get_pipeline(pipeline_id)).to eq(pipeline)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "reloading a pipeline" do
|
||||||
|
it "should log error with inexistent pipeline id" do
|
||||||
|
expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
|
||||||
|
expect(logger).to receive(:error)
|
||||||
|
subject.reload_pipeline(pipeline_id) { }
|
||||||
|
end
|
||||||
|
|
||||||
|
context "with existing pipeline id" do
|
||||||
|
before :each do
|
||||||
|
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should return block value" do
|
||||||
|
expect(subject.reload_pipeline(pipeline_id) { ["dummy", pipeline] }).to eq("dummy")
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not be terminated while reloading" do
|
||||||
|
expect(pipeline).to receive(:finished_execution?).and_return(false, true, true)
|
||||||
|
|
||||||
|
# 1st call: finished_execution? is false
|
||||||
|
expect(subject.running_pipelines).not_to be_empty
|
||||||
|
|
||||||
|
# 2nd call: finished_execution? is true
|
||||||
|
expect(subject.running_pipelines).to be_empty
|
||||||
|
|
||||||
|
|
||||||
|
queue = Queue.new # threadsafe queue
|
||||||
|
in_block = Concurrent::AtomicBoolean.new(false)
|
||||||
|
|
||||||
|
thread = Thread.new(subject, pipeline_id, pipeline, queue, in_block) do |subject, pipeline_id, pipeline, queue, in_block|
|
||||||
|
subject.reload_pipeline(pipeline_id) do
|
||||||
|
in_block.make_true
|
||||||
|
queue.pop
|
||||||
|
[true, pipeline]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# make sure we entered the block executioin
|
||||||
|
wait(10).for {in_block.true?}.to be_truthy
|
||||||
|
|
||||||
|
# at this point the thread is suspended waiting on queue
|
||||||
|
|
||||||
|
# since in reloading state, running_pipelines is not empty
|
||||||
|
expect(subject.running_pipelines).not_to be_empty
|
||||||
|
|
||||||
|
# unblock thread
|
||||||
|
queue.push(:dummy)
|
||||||
|
thread.join
|
||||||
|
|
||||||
|
# 3rd call: finished_execution? is true
|
||||||
|
expect(subject.running_pipelines).to be_empty
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "pipelines collections" do
|
||||||
|
context "with a non terminated pipelines" do
|
||||||
|
before :each do
|
||||||
|
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||||
|
expect(pipeline).to receive(:finished_execution?).and_return(false)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should find running pipelines" do
|
||||||
|
expect(subject.running_pipelines).not_to be_empty
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not find non_running pipelines" do
|
||||||
|
expect(subject.non_running_pipelines).to be_empty
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should find running_user_defined_pipelines" do
|
||||||
|
expect(pipeline).to receive(:system?).and_return(false)
|
||||||
|
expect(subject.running_user_defined_pipelines).not_to be_empty
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not find running_user_defined_pipelines" do
|
||||||
|
expect(pipeline).to receive(:system?).and_return(true)
|
||||||
|
expect(subject.running_user_defined_pipelines).to be_empty
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "with a terminated pipelines" do
|
||||||
|
before :each do
|
||||||
|
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||||
|
expect(pipeline).to receive(:finished_execution?).and_return(true)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not find running pipelines" do
|
||||||
|
expect(subject.running_pipelines).to be_empty
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should find non_running pipelines" do
|
||||||
|
expect(subject.non_running_pipelines).not_to be_empty
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should not find running_user_defined_pipelines" do
|
||||||
|
expect(subject.running_user_defined_pipelines).to be_empty
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
end
|
|
@ -18,17 +18,17 @@ describe LogStash::StateResolver do
|
||||||
|
|
||||||
after do
|
after do
|
||||||
# ensure that the the created pipeline are closed
|
# ensure that the the created pipeline are closed
|
||||||
running_pipelines.each { |_, pipeline| pipeline.close }
|
pipelines.running_pipelines.each { |_, pipeline| pipeline.close }
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when no pipeline is running" do
|
context "when no pipeline is running" do
|
||||||
let(:running_pipelines) { {} }
|
let(:pipelines) { LogStash::PipelinesRegistry.new }
|
||||||
|
|
||||||
context "no pipeline configs is received" do
|
context "no pipeline configs is received" do
|
||||||
let(:pipeline_configs) { [] }
|
let(:pipeline_configs) { [] }
|
||||||
|
|
||||||
it "returns no action" do
|
it "returns no action" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs).size).to eq(0)
|
expect(subject.resolve(pipelines, pipeline_configs).size).to eq(0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ describe LogStash::StateResolver do
|
||||||
let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] }
|
let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] }
|
||||||
|
|
||||||
it "returns some actions" do
|
it "returns some actions" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
|
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
|
||||||
[:create, :hello_world],
|
[:create, :hello_world],
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
@ -47,13 +47,17 @@ describe LogStash::StateResolver do
|
||||||
context "when a pipeline is running" do
|
context "when a pipeline is running" do
|
||||||
let(:main_pipeline) { mock_pipeline(:main) }
|
let(:main_pipeline) { mock_pipeline(:main) }
|
||||||
let(:main_pipeline_config) { main_pipeline.pipeline_config }
|
let(:main_pipeline_config) { main_pipeline.pipeline_config }
|
||||||
let(:running_pipelines) { { :main => main_pipeline } }
|
let(:pipelines) do
|
||||||
|
r = LogStash::PipelinesRegistry.new
|
||||||
|
r.create_pipeline(:main, main_pipeline) { true }
|
||||||
|
r
|
||||||
|
end
|
||||||
|
|
||||||
context "when the pipeline config contains a new one and the existing" do
|
context "when the pipeline config contains a new one and the existing" do
|
||||||
let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] }
|
let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] }
|
||||||
|
|
||||||
it "creates the new one and keep the other one" do
|
it "creates the new one and keep the other one" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
|
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
|
||||||
[:create, :hello_world],
|
[:create, :hello_world],
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
@ -62,7 +66,7 @@ describe LogStash::StateResolver do
|
||||||
let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] }
|
let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] }
|
||||||
|
|
||||||
it "creates the new one and stop the old one one" do
|
it "creates the new one and stop the old one one" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
|
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
|
||||||
[:create, :hello_world],
|
[:create, :hello_world],
|
||||||
[:stop, :main]
|
[:stop, :main]
|
||||||
)
|
)
|
||||||
|
@ -73,7 +77,7 @@ describe LogStash::StateResolver do
|
||||||
let(:pipeline_configs) { [] }
|
let(:pipeline_configs) { [] }
|
||||||
|
|
||||||
it "stops the old one one" do
|
it "stops the old one one" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
|
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
|
||||||
[:stop, :main]
|
[:stop, :main]
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
@ -83,7 +87,7 @@ describe LogStash::StateResolver do
|
||||||
let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] }
|
let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] }
|
||||||
|
|
||||||
it "reloads the old one one" do
|
it "reloads the old one one" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
|
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
|
||||||
[:reload, :main]
|
[:reload, :main]
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
@ -92,21 +96,21 @@ describe LogStash::StateResolver do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when we have a lot of pipeline running" do
|
context "when we have a lot of pipeline running" do
|
||||||
let(:running_pipelines) do
|
let(:pipelines) do
|
||||||
{
|
r = LogStash::PipelinesRegistry.new
|
||||||
:main1 => mock_pipeline(:main1),
|
r.create_pipeline(:main1, mock_pipeline(:main1)) { true }
|
||||||
:main2 => mock_pipeline(:main2),
|
r.create_pipeline(:main2, mock_pipeline(:main2)) { true }
|
||||||
:main3 => mock_pipeline(:main3),
|
r.create_pipeline(:main3, mock_pipeline(:main3)) { true }
|
||||||
:main4 => mock_pipeline(:main4),
|
r.create_pipeline(:main4, mock_pipeline(:main4)) { true }
|
||||||
:main5 => mock_pipeline(:main5),
|
r.create_pipeline(:main5, mock_pipeline(:main5)) { true }
|
||||||
:main6 => mock_pipeline(:main6),
|
r.create_pipeline(:main6, mock_pipeline(:main6)) { true }
|
||||||
}
|
r
|
||||||
end
|
end
|
||||||
|
|
||||||
context "without system pipeline" do
|
context "without system pipeline" do
|
||||||
let(:pipeline_configs) do
|
let(:pipeline_configs) do
|
||||||
[
|
[
|
||||||
running_pipelines[:main1].pipeline_config,
|
pipelines.get_pipeline(:main1).pipeline_config,
|
||||||
mock_pipeline_config(:main9),
|
mock_pipeline_config(:main9),
|
||||||
mock_pipeline_config(:main5, "input { generator {}}"),
|
mock_pipeline_config(:main5, "input { generator {}}"),
|
||||||
mock_pipeline_config(:main3, "input { generator {}}"),
|
mock_pipeline_config(:main3, "input { generator {}}"),
|
||||||
|
@ -115,7 +119,7 @@ describe LogStash::StateResolver do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "generates actions required to converge" do
|
it "generates actions required to converge" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
|
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
|
||||||
[:create, :main7],
|
[:create, :main7],
|
||||||
[:create, :main9],
|
[:create, :main9],
|
||||||
[:reload, :main3],
|
[:reload, :main3],
|
||||||
|
@ -130,7 +134,7 @@ describe LogStash::StateResolver do
|
||||||
context "with system pipeline" do
|
context "with system pipeline" do
|
||||||
let(:pipeline_configs) do
|
let(:pipeline_configs) do
|
||||||
[
|
[
|
||||||
running_pipelines[:main1].pipeline_config,
|
pipelines.get_pipeline(:main1).pipeline_config,
|
||||||
mock_pipeline_config(:main9),
|
mock_pipeline_config(:main9),
|
||||||
mock_pipeline_config(:main5, "input { generator {}}"),
|
mock_pipeline_config(:main5, "input { generator {}}"),
|
||||||
mock_pipeline_config(:main3, "input { generator {}}"),
|
mock_pipeline_config(:main3, "input { generator {}}"),
|
||||||
|
@ -140,7 +144,7 @@ describe LogStash::StateResolver do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "creates the system pipeline before user defined pipelines" do
|
it "creates the system pipeline before user defined pipelines" do
|
||||||
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
|
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
|
||||||
[:create, :monitoring],
|
[:create, :monitoring],
|
||||||
[:create, :main7],
|
[:create, :main7],
|
||||||
[:create, :main9],
|
[:create, :main9],
|
||||||
|
|
|
@ -51,44 +51,50 @@ end
|
||||||
|
|
||||||
RSpec::Matchers.define :have_pipeline? do |pipeline_config|
|
RSpec::Matchers.define :have_pipeline? do |pipeline_config|
|
||||||
match do |agent|
|
match do |agent|
|
||||||
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
pipeline = nil
|
||||||
expect(pipeline).to_not be_nil
|
try(30) do
|
||||||
|
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
||||||
|
expect(pipeline).to_not be_nil
|
||||||
|
end
|
||||||
expect(pipeline.config_str).to eq(pipeline_config.config_string)
|
expect(pipeline.config_str).to eq(pipeline_config.config_string)
|
||||||
|
expect(agent.running_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
|
||||||
end
|
end
|
||||||
|
|
||||||
match_when_negated do |agent|
|
match_when_negated do |agent|
|
||||||
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
pipeline = nil
|
||||||
pipeline.nil? || pipeline.config_str != pipeline_config.config_string
|
try(30) do
|
||||||
|
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
||||||
|
expect(pipeline).to_not be_nil
|
||||||
|
end
|
||||||
|
# either the pipeline_id is not in the running pipelines OR it is but have different configurations
|
||||||
|
expect(!agent.running_pipelines.keys.map(&:to_s).include?(pipeline_config.pipeline_id.to_s) || pipeline.config_str != pipeline_config.config_string).to be_truthy
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
RSpec::Matchers.define :have_running_pipeline? do |pipeline_config|
|
RSpec::Matchers.define :have_running_pipeline? do |pipeline_config|
|
||||||
match do |agent|
|
match do |agent|
|
||||||
Stud.try(10.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
|
pipeline = nil
|
||||||
|
try(30) do
|
||||||
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
||||||
expect(pipeline).to_not be_nil
|
expect(pipeline).to_not be_nil
|
||||||
expect(pipeline.config_str).to eq(pipeline_config.config_string)
|
|
||||||
expect(pipeline.running?).to be_truthy
|
|
||||||
end
|
end
|
||||||
|
expect(pipeline.config_str).to eq(pipeline_config.config_string)
|
||||||
|
expect(pipeline.running?).to be_truthy
|
||||||
|
expect(agent.running_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
|
||||||
end
|
end
|
||||||
|
|
||||||
failure_message do |agent|
|
failure_message do |agent|
|
||||||
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
|
||||||
|
|
||||||
if pipeline.nil?
|
if pipeline.nil?
|
||||||
"Expected pipeline to exist and running, be we cannot find `#{pipeline_config.pipeline_id}` in the running pipelines `#{agent.pipelines.keys.join(",")}`"
|
"Expected pipeline to exist and running, be we cannot find '#{pipeline_config.pipeline_id.to_s}' in the running pipelines '#{agent.running_pipelines.keys.join(",")}'"
|
||||||
else
|
else
|
||||||
if pipeline.running? == false
|
if !pipeline.running?
|
||||||
"Found `#{pipeline_config.pipeline_id}` in the list of pipelines but its not running"
|
"Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines but its not running"
|
||||||
elsif pipeline.config_str != pipeline_config.config_string
|
elsif pipeline.config_str != pipeline_config.config_string
|
||||||
"Found `#{pipeline_config.pipeline_id}` in the list of pipelines and running, but the config_string doesn't match,
|
"Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines and running, but the config_string doesn't match,\nExpected:\n#{pipeline_config.config_string}\n\ngot:\n#{pipeline.config_str}"
|
||||||
Expected:
|
|
||||||
#{pipeline_config.config_string}
|
|
||||||
|
|
||||||
got:
|
|
||||||
#{pipeline.config_str}"
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
match_when_negated do
|
match_when_negated do
|
||||||
|
|
|
@ -26,12 +26,12 @@ shared_context "api setup" do
|
||||||
@agent.execute
|
@agent.execute
|
||||||
pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }")
|
pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }")
|
||||||
pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric)
|
pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric)
|
||||||
@pipelines = java.util.concurrent.ConcurrentHashMap.new
|
@pipelines_registry = LogStash::PipelinesRegistry.new
|
||||||
expect(pipeline_creator.execute(@agent, @pipelines)).to be_truthy
|
expect(pipeline_creator.execute(@agent, @pipelines_registry)).to be_truthy
|
||||||
end
|
end
|
||||||
|
|
||||||
after :all do
|
after :all do
|
||||||
@pipelines.each do |_, pipeline|
|
@pipelines_registry.running_pipelines.each do |_, pipeline|
|
||||||
pipeline.shutdown
|
pipeline.shutdown
|
||||||
pipeline.thread.join
|
pipeline.thread.join
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,6 +9,7 @@ import org.jruby.anno.JRubyMethod;
|
||||||
import org.jruby.runtime.ThreadContext;
|
import org.jruby.runtime.ThreadContext;
|
||||||
import org.jruby.runtime.builtin.IRubyObject;
|
import org.jruby.runtime.builtin.IRubyObject;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
@ -51,6 +52,15 @@ public final class HooksRegistryExt extends RubyObject {
|
||||||
return syncHooks(context);
|
return syncHooks(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JRubyMethod(name = "remove_hooks")
|
||||||
|
public IRubyObject remove_hooks(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject callback) {
|
||||||
|
final List<IRubyObject> callbacks = registeredHooks.get(emitterScope);
|
||||||
|
if (callbacks == null) {
|
||||||
|
return context.fals;
|
||||||
|
}
|
||||||
|
return callbacks.removeAll(Collections.singleton(callback)) ? context.tru : context.fals;
|
||||||
|
}
|
||||||
|
|
||||||
@JRubyMethod(name = "emitters_count")
|
@JRubyMethod(name = "emitters_count")
|
||||||
public IRubyObject emittersCount(final ThreadContext context) {
|
public IRubyObject emittersCount(final ThreadContext context) {
|
||||||
return RubyFixnum.newFixnum(context.runtime, registeredEmitters.size());
|
return RubyFixnum.newFixnum(context.runtime, registeredEmitters.size());
|
||||||
|
|
|
@ -65,18 +65,28 @@ module LogStash module Inputs
|
||||||
@timer_task.add_observer(TimerTaskLogger.new)
|
@timer_task.add_observer(TimerTaskLogger.new)
|
||||||
end
|
end
|
||||||
|
|
||||||
def run(arg_queue)
|
def run(arg_queue)
|
||||||
@logger.debug("Metric: input started")
|
@logger.debug("Metric: input started")
|
||||||
@queue = arg_queue
|
@queue = arg_queue
|
||||||
|
|
||||||
configure_snapshot_poller
|
configure_snapshot_poller
|
||||||
|
|
||||||
# This must be invoked here because we need a queue to store the data
|
# This hook registration was originally set here to act on pipeline_started dispatcher event
|
||||||
LogStash::PLUGIN_REGISTRY.hooks.register_hooks(LogStash::Agent, self)
|
# from the Agent using the pipeline_started method here which sends events to the pipeline queue
|
||||||
|
# which is only available here in the run method.
|
||||||
|
#
|
||||||
|
# There are 2 things to know with this strategy:
|
||||||
|
# - The initial pipeline creation preceding this plugin invocation will not be catched by our
|
||||||
|
# hook here because it is added after the initial pipeline creations.
|
||||||
|
#
|
||||||
|
# - The below remove_hooks was added because not removing it was causing problems in tests where
|
||||||
|
# multiple instances of this plugin would be created and added in the global static PLUGIN_REGISTRY
|
||||||
|
# leading to calling the pipeline_started method multiple times leading to weird problems.
|
||||||
|
LogStash::PLUGIN_REGISTRY.hooks.register_hooks(LogStash::Agent, self)
|
||||||
|
|
||||||
exec_timer_task
|
exec_timer_task
|
||||||
sleep_till_stop
|
sleep_till_stop
|
||||||
end
|
end
|
||||||
|
|
||||||
def exec_timer_task
|
def exec_timer_task
|
||||||
@timer_task.execute
|
@timer_task.execute
|
||||||
|
@ -90,6 +100,7 @@ module LogStash module Inputs
|
||||||
|
|
||||||
def stop
|
def stop
|
||||||
@logger.debug("Metrics input: stopped")
|
@logger.debug("Metrics input: stopped")
|
||||||
|
LogStash::PLUGIN_REGISTRY.hooks.remove_hooks(LogStash::Agent, self)
|
||||||
@timer_task.shutdown if @timer_task
|
@timer_task.shutdown if @timer_task
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -128,7 +139,7 @@ module LogStash module Inputs
|
||||||
time_for_update = @last_states_update.nil? || @last_states_update < (Time.now - 60*10)
|
time_for_update = @last_states_update.nil? || @last_states_update < (Time.now - 60*10)
|
||||||
|
|
||||||
pipeline_hashes = []
|
pipeline_hashes = []
|
||||||
agent.pipelines.each do |pipeline_id, pipeline|
|
agent.running_pipelines.each do |pipeline_id, pipeline|
|
||||||
if time_for_update || !@last_updated_pipeline_hashes.include?(pipeline.hash)
|
if time_for_update || !@last_updated_pipeline_hashes.include?(pipeline.hash)
|
||||||
update_pipeline_state(pipeline)
|
update_pipeline_state(pipeline)
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,7 +9,7 @@ module LogStash; module Inputs; class Metrics; module StatsEvent;
|
||||||
# metrics pipelines. This prevents race conditions as pipeline stats may be
|
# metrics pipelines. This prevents race conditions as pipeline stats may be
|
||||||
# populated before the agent has it in its own pipelines state
|
# populated before the agent has it in its own pipelines state
|
||||||
stats = metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines]
|
stats = metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines]
|
||||||
agent.pipelines.map do |pipeline_id, pipeline|
|
agent.running_pipelines.map do |pipeline_id, pipeline|
|
||||||
p_stats = stats[pipeline_id]
|
p_stats = stats[pipeline_id]
|
||||||
# Don't record stats for system pipelines
|
# Don't record stats for system pipelines
|
||||||
next nil if pipeline.system?
|
next nil if pipeline.system?
|
||||||
|
|
|
@ -38,36 +38,34 @@ describe LogStash::Inputs::Metrics do
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
context "integration" do
|
context "integration" do
|
||||||
|
|
||||||
shared_examples_for 'events are added to the queue' do
|
shared_examples_for 'events are added to the queue' do
|
||||||
it 'should add a stats events to the queue' do
|
it 'should add a stats events to the queue' do
|
||||||
expect(stats_events.size).to eq(1)
|
wait(60).for { stats_events.size }.to eq(1)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'should add two state events to the queue' do
|
it 'should add two state events to the queue' do
|
||||||
# Triggered event plus the one from `update`
|
# Triggered event plus the one from `update`
|
||||||
expect(state_events.size).to eq(2)
|
wait(60).for { state_events.size }.to eq(2)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
shared_examples_for 'events are not added to the queue' do
|
shared_examples_for 'events are not added to the queue' do
|
||||||
it 'should not add a stats events to the queue' do
|
it 'should not add a stats events to the queue' do
|
||||||
expect(stats_events.size).to eq(0)
|
wait(60).for { stats_events.size }.to eq(0)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'should not add a state events to the queue' do
|
it 'should not add a state events to the queue' do
|
||||||
# Triggered event plus the one from `update`
|
# Triggered event plus the one from `update`
|
||||||
expect(state_events.size).to eq(0)
|
wait(60).for { state_events.size }.to eq(0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:schemas_path) { File.join("spec", "monitoring", "schemas") }
|
let(:schemas_path) { File.join(File.dirname(__FILE__), "..", "..", "..", "spec", "monitoring", "schemas") }
|
||||||
let(:queue) { [] }
|
let(:queue) { Concurrent::Array.new }
|
||||||
|
|
||||||
let(:number_of_events) { 20 }
|
let(:config) { "input { dummyblockinginput { } } output { null { } }" }
|
||||||
let(:config) { "input { generator { count => #{number_of_events} } } output { null { } }" }
|
|
||||||
|
|
||||||
let(:pipeline_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone.merge({
|
let(:pipeline_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone.merge({
|
||||||
"pipeline.id" => "main",
|
"pipeline.id" => "main",
|
||||||
|
@ -77,6 +75,7 @@ describe LogStash::Inputs::Metrics do
|
||||||
let(:agent) { LogStash::Agent.new(pipeline_settings) }
|
let(:agent) { LogStash::Agent.new(pipeline_settings) }
|
||||||
let(:metric) { agent.metric }
|
let(:metric) { agent.metric }
|
||||||
let(:collector) { metric.collector }
|
let(:collector) { metric.collector }
|
||||||
|
let(:agent_task) { start_agent(agent) }
|
||||||
|
|
||||||
# Can't use let because this value can change over time
|
# Can't use let because this value can change over time
|
||||||
def stats_events
|
def stats_events
|
||||||
|
@ -92,74 +91,68 @@ describe LogStash::Inputs::Metrics do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
before :each do
|
context "with pipeline execution" do
|
||||||
allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
|
|
||||||
end
|
|
||||||
|
|
||||||
def setup_pipeline
|
|
||||||
agent.execute
|
|
||||||
|
|
||||||
100.times do
|
|
||||||
sleep 0.1
|
|
||||||
break if main_pipeline
|
|
||||||
end
|
|
||||||
raise "No main pipeline registered!" unless main_pipeline
|
|
||||||
|
|
||||||
subject.metric = metric
|
|
||||||
|
|
||||||
subject.register
|
|
||||||
subject.run(queue)
|
|
||||||
subject.pipeline_started(agent, main_pipeline)
|
|
||||||
end
|
|
||||||
|
|
||||||
def main_pipeline
|
|
||||||
agent.get_pipeline(:main)
|
|
||||||
end
|
|
||||||
|
|
||||||
after :each do
|
|
||||||
agent.shutdown
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'after the pipeline is setup' do
|
|
||||||
before do
|
|
||||||
allow(subject).to receive(:exec_timer_task)
|
|
||||||
allow(subject).to receive(:sleep_till_stop)
|
|
||||||
setup_pipeline
|
|
||||||
end
|
|
||||||
it "should store the agent" do
|
|
||||||
expect(subject.agent).to eq(agent)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "#update" do
|
|
||||||
before :each do
|
before :each do
|
||||||
allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
|
allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
|
||||||
allow(subject).to receive(:exec_timer_task)
|
allow(subject).to receive(:exec_timer_task)
|
||||||
allow(subject).to receive(:sleep_till_stop)
|
allow(subject).to receive(:sleep_till_stop)
|
||||||
setup_pipeline
|
|
||||||
subject.update(collector.snapshot_metric)
|
agent
|
||||||
|
agent_task
|
||||||
|
|
||||||
|
wait(60).for { agent.get_pipeline(:main) }.to_not be_nil
|
||||||
|
|
||||||
|
subject.metric = metric
|
||||||
|
|
||||||
|
subject.register
|
||||||
|
subject.run(queue)
|
||||||
|
subject.pipeline_started(agent, agent.get_pipeline(:main))
|
||||||
end
|
end
|
||||||
|
|
||||||
it_behaves_like 'events are added to the queue'
|
after :each do
|
||||||
|
subject.stop
|
||||||
|
agent.shutdown
|
||||||
|
agent_task.wait
|
||||||
|
end
|
||||||
|
|
||||||
describe "state event" do
|
context 'after the pipeline is setup' do
|
||||||
let(:schema_file) { File.join(schemas_path, "states_document_schema.json") }
|
it "should store the agent" do
|
||||||
let(:event) { state_events.first }
|
expect(subject.agent).to eq(agent)
|
||||||
|
|
||||||
it "should validate against the schema" do
|
|
||||||
expect(event).to be_a(LogStash::Event)
|
|
||||||
expect(JSON::Validator.fully_validate(schema_file, event.to_json)).to be_empty
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "#build_event" do
|
describe "#update" do
|
||||||
let(:schema_file) { File.join(schemas_path, "monitoring_document_schema.json") }
|
before :each do
|
||||||
|
# collector.snapshot_metric is timing dependant and if fired too fast will miss some metrics.
|
||||||
|
# after some tests a correct metric_store.size is 72 but when it is incomplete it is lower.
|
||||||
|
# I guess this 72 is dependant on the metrics we collect and there is probably a better
|
||||||
|
# way to make sure no metrics are missing without forcing a hard sleep but this is what is
|
||||||
|
# easily observable, feel free to refactor with a better "timing" test here.
|
||||||
|
wait(60).for { collector.snapshot_metric.metric_store.size >= 72 }.to be_truthy
|
||||||
|
|
||||||
describe "data event" do
|
subject.update(collector.snapshot_metric)
|
||||||
let(:event) { stats_events.first }
|
end
|
||||||
it "has the correct schema" do
|
|
||||||
expect(event).to be_a(LogStash::Event) # Check that we actually have an event...
|
it_behaves_like 'events are added to the queue'
|
||||||
expect(JSON::Validator.fully_validate(schema_file, event.to_json)).to be_empty
|
|
||||||
|
describe "state event" do
|
||||||
|
let(:schema_file) { File.join(schemas_path, "states_document_schema.json") }
|
||||||
|
|
||||||
|
it "should validate against the schema" do
|
||||||
|
wait(60).for { state_events.empty? }.to be_falsey
|
||||||
|
expect(JSON::Validator.fully_validate(schema_file, state_events.first.to_json)).to be_empty
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "#build_event" do
|
||||||
|
let(:schema_file) { File.join(schemas_path, "monitoring_document_schema.json") }
|
||||||
|
|
||||||
|
describe "data event" do
|
||||||
|
it "has the correct schema" do
|
||||||
|
wait(60).for { stats_events.empty? }.to be_falsey
|
||||||
|
expect(JSON::Validator.fully_validate(schema_file, stats_events.first.to_json)).to be_empty
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -173,6 +166,10 @@ describe LogStash::Inputs::Metrics do
|
||||||
allow(subject).to receive(:queue).and_return(queue)
|
allow(subject).to receive(:queue).and_return(queue)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
after :each do
|
||||||
|
subject.stop
|
||||||
|
end
|
||||||
|
|
||||||
describe "#update_pipeline_state" do
|
describe "#update_pipeline_state" do
|
||||||
let(:pipeline) { double("pipeline") }
|
let(:pipeline) { double("pipeline") }
|
||||||
let(:state_event) { double("state event") }
|
let(:state_event) { double("state event") }
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
# or more contributor license agreements. Licensed under the Elastic License;
|
# or more contributor license agreements. Licensed under the Elastic License;
|
||||||
# you may not use this file except in compliance with the Elastic License.
|
# you may not use this file except in compliance with the Elastic License.
|
||||||
|
|
||||||
|
require "stud/task"
|
||||||
|
|
||||||
# Settings' TimeValue is using nanos seconds as the default unit
|
# Settings' TimeValue is using nanos seconds as the default unit
|
||||||
def time_value(time)
|
def time_value(time)
|
||||||
LogStash::Util::TimeValue.from_value(time).to_nanos
|
LogStash::Util::TimeValue.from_value(time).to_nanos
|
||||||
|
@ -21,7 +23,6 @@ def define_settings(settings_options)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
def apply_settings(settings_values, settings = nil)
|
def apply_settings(settings_values, settings = nil)
|
||||||
settings = settings.nil? ? LogStash::SETTINGS.clone : settings
|
settings = settings.nil? ? LogStash::SETTINGS.clone : settings
|
||||||
|
|
||||||
|
@ -31,3 +32,35 @@ def apply_settings(settings_values, settings = nil)
|
||||||
|
|
||||||
settings
|
settings
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def start_agent(agent)
|
||||||
|
agent_task = Stud::Task.new do
|
||||||
|
begin
|
||||||
|
agent.execute
|
||||||
|
rescue => e
|
||||||
|
raise "Start Agent exception: #{e}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
wait(30).for { agent.running? }.to be(true)
|
||||||
|
agent_task
|
||||||
|
end
|
||||||
|
|
||||||
|
module LogStash
|
||||||
|
module Inputs
|
||||||
|
class DummyBlockingInput < LogStash::Inputs::Base
|
||||||
|
config_name "dummyblockinginput"
|
||||||
|
milestone 2
|
||||||
|
|
||||||
|
def register
|
||||||
|
end
|
||||||
|
|
||||||
|
def run(_)
|
||||||
|
sleep(1) while !stop?
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue