support exclusive locking of PQ dir access

fix agent and pipeline and specs for queue exclusive access

added comments and swapped all sleep 0.01 to 0.1

revert explicit pipeline close in specs using sample helper

fix multiple pipelines specs

use BasePipeline for config validation which does not instantiate a new queue

review modifications

improve queue exception message
This commit is contained in:
Colin Surprenant 2017-02-01 14:48:00 -05:00
parent f594e64319
commit e784f0c8aa
12 changed files with 351 additions and 143 deletions

View file

@ -189,7 +189,22 @@ class LogStash::Agent
end
end
def close_pipeline(id)
pipeline = @pipelines[id]
if pipeline
@logger.warn("closing pipeline", :id => id)
pipeline.close
end
end
def close_pipelines
@pipelines.each do |id, _|
close_pipeline(id)
end
end
private
def start_webserver
options = {:http_host => @http_host, :http_ports => @http_port, :http_environment => @http_environment }
@webserver = LogStash::WebServer.new(@logger, self, options)
@ -229,17 +244,17 @@ class LogStash::Agent
@collect_metric
end
def increment_reload_failures_metrics(id, config, exception)
def increment_reload_failures_metrics(id, message, backtrace = nil)
@instance_reload_metric.increment(:failures)
@pipeline_reload_metric.namespace([id.to_sym, :reloads]).tap do |n|
n.increment(:failures)
n.gauge(:last_error, { :message => exception.message, :backtrace => exception.backtrace})
n.gauge(:last_error, { :message => message, :backtrace =>backtrace})
n.gauge(:last_failure_timestamp, LogStash::Timestamp.now)
end
if @logger.debug?
@logger.error("Cannot load an invalid configuration.", :reason => exception.message, :backtrace => exception.backtrace)
@logger.error("Cannot load an invalid configuration", :reason => message, :backtrace => backtrace)
else
@logger.error("Cannot load an invalid configuration.", :reason => exception.message)
@logger.error("Cannot load an invalid configuration", :reason => message)
end
end
@ -261,7 +276,7 @@ class LogStash::Agent
begin
LogStash::Pipeline.new(config, settings, metric)
rescue => e
increment_reload_failures_metrics(settings.get("pipeline.id"), config, e)
increment_reload_failures_metrics(settings.get("pipeline.id"), e.message, e.backtrace)
return nil
end
end
@ -294,15 +309,14 @@ class LogStash::Agent
begin
pipeline_validator = LogStash::BasePipeline.new(new_config, old_pipeline.settings)
rescue => e
increment_reload_failures_metrics(id, new_config, e)
increment_reload_failures_metrics(id, e.message, e.backtrace)
return
end
# check if the new pipeline will be reloadable in which case we want to log that as an error and abort
if !pipeline_validator.reloadable?
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => id, :plugins => pipeline_validator.non_reloadable_plugins.map(&:class))
# TODO: in the original code the failure metrics were not incremented, should we do it here?
# increment_reload_failures_metrics(id, new_config, e)
increment_reload_failures_metrics(id, "non reloadable pipeline")
return
end
@ -331,20 +345,28 @@ class LogStash::Agent
# this is a scenario where the configuration is valid (compilable) but the new pipeline refused to start
# and at this point NO pipeline is running
@logger.error("failed to create the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id)
increment_reload_failures_metrics(pipeline_id, "failed to create the reloaded pipeline")
return
end
### at this point pipeline#close must be called if upgrade_pipeline does not succeed
# check if the new pipeline will be reloadable in which case we want to log that as an error and abort. this should normally not
# happen since the check should be done in reload_pipeline! prior to get here.
if !new_pipeline.reloadable?
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => pipeline_id, :plugins => new_pipeline.non_reloadable_plugins.map(&:class))
increment_reload_failures_metrics(pipeline_id, "non reloadable pipeline")
new_pipeline.close
return
end
# @pipelines[pipeline_id] must be initialized before #start_pipeline below which uses it
@pipelines[pipeline_id] = new_pipeline
if !start_pipeline(pipeline_id)
@logger.error("failed to start the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id)
# do not call increment_reload_failures_metrics here since #start_pipeline already does it on failure
new_pipeline.close
return
end
@ -373,6 +395,8 @@ class LogStash::Agent
n.gauge(:last_failure_timestamp, LogStash::Timestamp.now)
end
@logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace)
# TODO: this is weird, why dont we return directly here? any reason we need to enter the while true loop below?!
end
end
while true do

View file

@ -26,7 +26,7 @@ module LogStash; class BasePipeline
attr_reader :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id
def initialize(config_str, settings)
def initialize(config_str, settings = SETTINGS)
@logger = self.logger
@config_str = config_str
@config_hash = Digest::SHA1.hexdigest(@config_str)
@ -59,8 +59,7 @@ module LogStash; class BasePipeline
begin
eval(config_code)
rescue => e
# TODO: the original code rescue e but does nothing with it, should we re-raise to have original exception details!?
raise
raise e
end
end
@ -139,7 +138,13 @@ module LogStash; class Pipeline < BasePipeline
super(config_str, settings)
@queue = LogStash::QueueFactory.create(settings)
begin
@queue = LogStash::QueueFactory.create(settings)
rescue => e
@logger.error("Logstash failed to create queue", "exception" => e, "backtrace" => e.backtrace)
raise e
end
@input_queue_client = @queue.write_client
@filter_queue_client = @queue.read_client
@signal_queue = Queue.new
@ -216,8 +221,7 @@ module LogStash; class Pipeline < BasePipeline
shutdown_flusher
shutdown_workers
@filter_queue_client.close
@queue.close
close
@logger.debug("Pipeline #{@pipeline_id} has been shutdown")
@ -225,6 +229,11 @@ module LogStash; class Pipeline < BasePipeline
return 0
end # def run
def close
@filter_queue_client.close
@queue.close
end
def transition_to_running
@running.make_true
end
@ -327,7 +336,8 @@ module LogStash; class Pipeline < BasePipeline
# plugin.
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
"exception" => e, "backtrace" => e.backtrace)
raise
raise e
end
# Take an array of events and send them to the correct output

View file

@ -249,7 +249,7 @@ class LogStash::Runner < Clamp::StrictCommand
config_loader = LogStash::Config::Loader.new(logger)
config_str = config_loader.format_config(setting("path.config"), setting("config.string"))
begin
LogStash::Pipeline.new(config_str)
LogStash::BasePipeline.new(config_str)
puts "Configuration OK"
logger.info "Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash"
return 0

View file

@ -287,7 +287,7 @@ describe "conditionals in filter" do
conditional "!([message] <= 'sample')" do
sample("apple") { expect(subject.get("tags")).not_to include("success") }
sample("zebra") { expect(subject.get("tags")).not_to include("failure") }
sample("sample") { expect(subject.get("tags")).not_to include("success") }
sample("sample") { expect(subject.get("tags")).not_to include("success")}
end
conditional "!([message] >= 'sample')" do

View file

@ -16,7 +16,7 @@ describe LogStash::Agent do
let(:config_file) { Stud::Temporary.pathname }
let(:config_file_txt) { "input { generator { count => 100000 } } output { }" }
subject { LogStash::Agent.new(agent_settings) }
subject { LogStash::Agent.new(agent_settings) }
before :each do
# This MUST run first, before `subject` is invoked to ensure clean state
@ -52,6 +52,10 @@ describe LogStash::Agent do
}
end
after(:each) do
subject.close_pipelines
end
it "should delegate settings to new pipeline" do
expect(LogStash::Pipeline).to receive(:new) do |arg1, arg2|
expect(arg1).to eq(config_string)
@ -75,7 +79,7 @@ describe LogStash::Agent do
end
end
describe "#execute" do
describe "#execute" do
let(:config_file_txt) { "input { generator { count => 100000 } } output { }" }
before :each do
@ -105,7 +109,10 @@ describe LogStash::Agent do
it "should not reload_state!" do
expect(subject).to_not receive(:reload_state!)
t = Thread.new { subject.execute }
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
@ -118,11 +125,14 @@ describe LogStash::Agent do
it "does not upgrade the new config" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.ready?
expect(subject).to_not receive(:upgrade_pipeline)
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
subject.send(:"reload_pipeline!", "main")
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
@ -134,14 +144,16 @@ describe LogStash::Agent do
it "does upgrade the new config" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.ready?
expect(subject).to receive(:upgrade_pipeline).once.and_call_original
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
subject.send(:"reload_pipeline!", "main")
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
end
end
@ -154,14 +166,16 @@ describe LogStash::Agent do
it "does not try to reload the pipeline" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
expect(subject).to_not receive(:reload_pipeline!)
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
subject.reload_state!
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
end
end
@ -172,14 +186,16 @@ describe LogStash::Agent do
it "tries to reload the pipeline" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
expect(subject).to receive(:reload_pipeline!).once.and_call_original
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
subject.reload_state!
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
end
end
@ -203,9 +219,12 @@ describe LogStash::Agent do
it "should periodically reload_state" do
allow(subject).to receive(:clean_state?).and_return(false)
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
expect(subject).to receive(:reload_state!).at_least(2).times
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
@ -218,10 +237,13 @@ describe LogStash::Agent do
it "does not upgrade the new config" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
expect(subject).to_not receive(:upgrade_pipeline)
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
@ -233,10 +255,13 @@ describe LogStash::Agent do
it "does upgrade the new config" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
expect(subject).to receive(:upgrade_pipeline).once.and_call_original
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
sleep 0.1
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
# a bad test design or missing class functionality.
sleep(0.1)
Stud.stop!(t)
t.join
subject.shutdown
@ -259,6 +284,10 @@ describe LogStash::Agent do
subject.register_pipeline(pipeline_settings)
end
after(:each) do
subject.close_pipelines
end
context "when fetching a new state" do
it "upgrades the state" do
expect(subject).to receive(:fetch_config).and_return(second_pipeline_config)
@ -291,6 +320,7 @@ describe LogStash::Agent do
after :each do
ENV["FOO"] = @foo_content
subject.close_pipelines
end
it "doesn't upgrade the state" do
@ -314,14 +344,20 @@ describe LogStash::Agent do
end
after(:each) do
subject.shutdown
# new pipelines will be created part of the upgrade process so we need
# to close any initialized pipelines
subject.close_pipelines
end
context "when the upgrade fails" do
before :each do
allow(subject).to receive(:fetch_config).and_return(new_pipeline_config)
allow(subject).to receive(:create_pipeline).and_return(nil)
allow(subject).to receive(:stop_pipeline)
allow(subject).to receive(:stop_pipeline) do |id|
# we register_pipeline but we never execute them so we have to mock #stop_pipeline to
# not call Pipeline#shutdown but Pipeline#close
subject.close_pipeline(id)
end
end
it "leaves the state untouched" do
@ -339,18 +375,29 @@ describe LogStash::Agent do
context "when the upgrade succeeds" do
let(:new_config) { "input { generator { count => 1 } } output { }" }
before :each do
allow(subject).to receive(:fetch_config).and_return(new_config)
allow(subject).to receive(:stop_pipeline)
allow(subject).to receive(:start_pipeline)
allow(subject).to receive(:start_pipeline).and_return(true)
allow(subject).to receive(:stop_pipeline) do |id|
# we register_pipeline but we never execute them so we have to mock #stop_pipeline to
# not call Pipeline#shutdown but Pipeline#close
subject.close_pipeline(id)
end
end
it "updates the state" do
subject.send(:"reload_pipeline!", default_pipeline_id)
expect(subject.pipelines[default_pipeline_id].config_str).to eq(new_config)
end
it "starts the pipeline" do
expect(subject).to receive(:stop_pipeline)
expect(subject).to receive(:start_pipeline)
expect(subject).to receive(:start_pipeline).and_return(true)
expect(subject).to receive(:stop_pipeline) do |id|
# we register_pipeline but we never execute them so we have to mock #stop_pipeline to
# not call Pipeline#shutdown but Pipeline#close
subject.close_pipeline(id)
end
subject.send(:"reload_pipeline!", default_pipeline_id)
end
end
@ -378,9 +425,9 @@ describe LogStash::Agent do
end
end
context "metrics after config reloading" do
let!(:config) { "input { generator { } } output { dummyoutput { } }" }
let!(:config_path) do
f = Stud::Temporary.file
f.write(config)
@ -388,6 +435,7 @@ describe LogStash::Agent do
f.close
f.path
end
let(:pipeline_args) do
{
"pipeline.workers" => 2,
@ -411,6 +459,13 @@ describe LogStash::Agent do
let!(:dummy_output) { LogStash::Outputs::DroppingDummyOutput.new }
let!(:dummy_output2) { DummyOutput2.new }
let(:initial_generator_threshold) { 1000 }
let(:pipeline_thread) do
Thread.new do
subject.register_pipeline(pipeline_settings)
subject.execute
end
end
before :each do
allow(LogStash::Outputs::DroppingDummyOutput).to receive(:new).at_least(:once).with(anything).and_return(dummy_output)
@ -424,20 +479,17 @@ describe LogStash::Agent do
@abort_on_exception = Thread.abort_on_exception
Thread.abort_on_exception = true
@t = Thread.new do
subject.register_pipeline(pipeline_settings)
subject.execute
end
pipeline_thread
# wait for some events to reach the dummy_output
sleep(0.01) until dummy_output.events_received > initial_generator_threshold
sleep(0.1) until dummy_output.events_received > initial_generator_threshold
end
after :each do
begin
subject.shutdown
Stud.stop!(@t)
@t.join
Stud.stop!(pipeline_thread)
pipeline_thread.join
ensure
Thread.abort_on_exception = @abort_on_exception
end
@ -446,8 +498,8 @@ describe LogStash::Agent do
context "when reloading a good config" do
let(:new_config_generator_counter) { 500 }
let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput2 {} }" }
before :each do
before :each do
File.open(config_path, "w") do |f|
f.write(new_config)
f.fsync
@ -496,12 +548,10 @@ describe LogStash::Agent do
value = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads][:last_error].value
expect(value).to be(nil)
end
end
context "when reloading a bad config" do
let(:new_config) { "input { generator { count => " }
let(:new_config_generator_counter) { 500 }
before :each do
File.open(config_path, "w") do |f|
@ -546,7 +596,6 @@ describe LogStash::Agent do
context "when reloading a config that raises exception on pipeline.run" do
let(:new_config) { "input { generator { count => 10000 } }" }
let(:new_config_generator_counter) { 500 }
class BrokenGenerator < LogStash::Inputs::Generator
def register
@ -555,14 +604,12 @@ describe LogStash::Agent do
end
before :each do
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(BrokenGenerator)
File.open(config_path, "w") do |f|
f.write(new_config)
f.fsync
end
end
it "does not increase the successful reload count" do
@ -573,7 +620,7 @@ describe LogStash::Agent do
}
end
it "increases the failured reload count" do
it "increases the failures reload count" do
expect { subject.send(:"reload_pipeline!", "main") }.to change {
snapshot = subject.metric.collector.snapshot_metric
reload_metrics = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads]

View file

@ -138,8 +138,8 @@ describe LogStash::Pipeline do
Thread.abort_on_exception = true
pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj)
Thread.new { pipeline.run }
sleep 0.1 while !pipeline.ready?
t = Thread.new { pipeline.run }
sleep(0.1) until pipeline.ready?
wait(3).for do
# give us a bit of time to flush the events
# puts("*****" + output.events.map{|e| e.message}.to_s)
@ -149,6 +149,7 @@ describe LogStash::Pipeline do
expect(output.events[0].get("tags")).to eq(["notdropped"])
expect(output.events[1].get("tags")).to eq(["notdropped"])
pipeline.shutdown
t.join
Thread.abort_on_exception = abort_on_exception_state
end
@ -192,12 +193,14 @@ describe LogStash::Pipeline do
pipeline_settings_obj.set("config.debug", false)
expect(logger).not_to receive(:debug).with(/Compiled pipeline/, anything)
pipeline = TestPipeline.new(test_config_with_filters)
pipeline.close
end
it "should print the compiled code if config.debug is set to true" do
pipeline_settings_obj.set("config.debug", true)
expect(logger).to receive(:debug).with(/Compiled pipeline/, anything)
pipeline = TestPipeline.new(test_config_with_filters, pipeline_settings_obj)
pipeline.close
end
end
@ -363,7 +366,6 @@ describe LogStash::Pipeline do
sample(["foo", "bar"]) do
expect(subject.size).to eq(2)
expect(subject[0].get("message")).to eq("foo\nbar")
expect(subject[0].get("type")).to be_nil
expect(subject[1].get("message")).to eq("foo\nbar")
@ -385,9 +387,14 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(logger).to receive(:warn)
thread = Thread.new { pipeline.run }
# pipeline must be first called outside the thread context because it lazyly initialize and will create a
# race condition if called in the thread
p = pipeline
t = Thread.new { p.run }
sleep(0.1) until pipeline.ready?
pipeline.shutdown
thread.join
t.join
end
it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do
@ -440,6 +447,10 @@ describe LogStash::Pipeline do
let(:settings) { LogStash::SETTINGS.clone }
subject { LogStash::Pipeline.new(config, settings, metric) }
after :each do
subject.close
end
context "when metric.collect is disabled" do
before :each do
settings.set("metric.collect", false)
@ -528,9 +539,21 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore)
end
# multiple pipelines cannot be instantiated using the same PQ settings, force memory queue
before :each do
pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type")
allow(pipeline_workers_setting).to receive(:value).and_return("memory")
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
end
let(:pipeline1) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
let(:pipeline2) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutputmore {}}") }
after do
pipeline1.close
pipeline2.close
end
it "should handle evaluating different config" do
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
@ -573,8 +596,8 @@ describe LogStash::Pipeline do
it "flushes the buffered contents of the filter" do
Thread.abort_on_exception = true
pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj)
Thread.new { pipeline.run }
sleep 0.1 while !pipeline.ready?
t = Thread.new { pipeline.run }
sleep(0.1) until pipeline.ready?
wait(3).for do
# give us a bit of time to flush the events
output.events.empty?
@ -582,6 +605,7 @@ describe LogStash::Pipeline do
event = output.events.pop
expect(event.get("message").count("\n")).to eq(99)
pipeline.shutdown
t.join
end
end
@ -596,6 +620,13 @@ describe LogStash::Pipeline do
let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
# multiple pipelines cannot be instantiated using the same PQ settings, force memory queue
before :each do
pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type")
allow(pipeline_workers_setting).to receive(:value).and_return("memory")
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
end
it "should handle evaluating different config" do
# When the functions are compiled from the AST it will generate instance
# variables that are unique to the actual config, the intances are pointing
@ -626,8 +657,14 @@ describe LogStash::Pipeline do
subject { described_class.new(config) }
it "returns nil when the pipeline isnt started" do
expect(subject.started_at).to be_nil
context "when the pipeline is not started" do
after :each do
subject.close
end
it "returns nil when the pipeline isnt started" do
expect(subject.started_at).to be_nil
end
end
it "return when the pipeline started working" do
@ -648,6 +685,10 @@ describe LogStash::Pipeline do
subject { described_class.new(config) }
context "when the pipeline is not started" do
after :each do
subject.close
end
it "returns 0" do
expect(subject.uptime).to eq(0)
end
@ -655,10 +696,15 @@ describe LogStash::Pipeline do
context "when the pipeline is started" do
it "return the duration in milliseconds" do
t = Thread.new { subject.run }
# subject must be first call outside the thread context because of lazy initialization
s = subject
t = Thread.new { s.run }
sleep(0.1) until subject.ready?
sleep(0.1)
expect(subject.uptime).to be > 0
subject.shutdown
t.join
end
end
end
@ -704,6 +750,12 @@ describe LogStash::Pipeline do
end
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) }
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
let(:pipeline_thread) do
# subject has to be called for the first time outside the thread because it will create a race condition
# with the subject.ready? call since subject is lazily initialized
s = subject
Thread.new { s.run }
end
before :each do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
@ -712,7 +764,9 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
Thread.new { subject.run }
pipeline_thread
sleep(0.1) until subject.ready?
# make sure we have received all the generated events
wait(3).for do
# give us a bit of time to flush the events
@ -722,6 +776,7 @@ describe LogStash::Pipeline do
after :each do
subject.shutdown
pipeline_thread.join
end
context "global metric" do
@ -787,6 +842,13 @@ describe LogStash::Pipeline do
let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
# multiple pipelines cannot be instantiated using the same PQ settings, force memory queue
before :each do
pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type")
allow(pipeline_workers_setting).to receive(:value).and_return("memory")
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
end
it "should not add ivars" do
expect(pipeline1.instance_variables).to eq(pipeline2.instance_variables)
end

View file

@ -36,7 +36,9 @@ describe LogStash::QueueFactory do
end
it "returns a `WrappedAckedQueue`" do
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue)
queue = subject.create(settings)
expect(queue).to be_kind_of(LogStash::Util::WrappedAckedQueue)
queue.close
end
describe "per pipeline id subdirectory creation" do
@ -50,6 +52,7 @@ describe LogStash::QueueFactory do
expect(Dir.exist?(queue_path)).to be_falsey
queue = subject.create(settings)
expect(Dir.exist?(queue_path)).to be_truthy
queue.close
end
end
end
@ -60,7 +63,9 @@ describe LogStash::QueueFactory do
end
it "returns a `WrappedAckedQueue`" do
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue)
queue = subject.create(settings)
expect(queue).to be_kind_of(LogStash::Util::WrappedAckedQueue)
queue.close
end
end
@ -70,7 +75,9 @@ describe LogStash::QueueFactory do
end
it "returns a `WrappedAckedQueue`" do
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedSynchronousQueue)
queue = subject.create(settings)
expect(queue).to be_kind_of(LogStash::Util::WrappedSynchronousQueue)
queue.close
end
end
end

View file

@ -87,7 +87,7 @@ public class FileLockFactory {
LOCK_MAP.put(lock, realLockPath.toString());
return lock;
} else {
throw new LockException("Lock held by another program: " + realLockPath);
throw new LockException("Lock held by another program on lock path: " + realLockPath);
}
} finally {
if (lock == null) { // not successful - clear up and move out
@ -106,7 +106,7 @@ public class FileLockFactory {
}
}
} else {
throw new LockException("Lock held by this virtual machine: " + realLockPath);
throw new LockException("Lock held by this virtual machine on lock path: " + realLockPath);
}
}

View file

@ -2,6 +2,8 @@ package org.logstash.ackedqueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.FileLockFactory;
import org.logstash.LockException;
import org.logstash.common.io.CheckpointIO;
import org.logstash.common.io.PageIO;
import org.logstash.common.io.PageIOFactory;
@ -10,6 +12,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.channels.FileLock;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
@ -66,6 +69,10 @@ public class Queue implements Closeable {
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
// exclusive dir access
private FileLock dirLock;
private final static String LOCK_NAME = ".lock";
private static final Logger logger = LogManager.getLogger(Queue.class);
public Queue(Settings settings) {
@ -142,82 +149,94 @@ public class Queue implements Closeable {
if (!this.closed.get()) { throw new IOException("queue already opened"); }
Checkpoint headCheckpoint;
lock.lock();
try {
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
} catch (NoSuchFileException e) {
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
// verify exclusive access to the dirPath
this.dirLock = FileLockFactory.getDefault().obtainLock(this.dirPath, LOCK_NAME);
this.seqNum = 0;
headPageNum = 0;
Checkpoint headCheckpoint;
try {
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
} catch (NoSuchFileException e) {
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
newCheckpointedHeadpage(headPageNum);
this.closed.set(false);
this.seqNum = 0;
headPageNum = 0;
return;
}
newCheckpointedHeadpage(headPageNum);
this.closed.set(false);
// at this point we have a head checkpoint to figure queue recovery
// reconstruct all tail pages state upto but excluding the head page
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
add(cp, new TailPage(cp, this, pageIO));
}
// transform the head page into a tail page only if the headpage is non-empty
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes
logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());
long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
firstUnackedSeqNum = pageIO.getMinSeqNum();
return;
}
headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
// at this point we have a head checkpoint to figure queue recovery
// reconstruct all tail pages state upto but excluding the head page
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
add(cp, new TailPage(cp, this, pageIO));
}
// transform the head page into a tail page only if the headpage is non-empty
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes
logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());
long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
firstUnackedSeqNum = pageIO.getMinSeqNum();
}
headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
}
this.headPage = new HeadPage(headCheckpoint, this, pageIO);
if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
// head page is empty, let's keep it as-is
this.currentByteSize += pageIO.getCapacity();
// but checkpoint it to update the firstUnackedPageNum if it changed
this.headPage.checkpoint();
} else {
// head page is non-empty, transform it into a tail page and create a new empty head page
add(headCheckpoint, this.headPage.behead());
headPageNum = headCheckpoint.getPageNum() + 1;
newCheckpointedHeadpage(headPageNum);
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
if (headCheckpoint.maxSeqNum() > this.seqNum) {
this.seqNum = headCheckpoint.maxSeqNum();
}
}
// only activate the first tail page
if (tailPages.size() > 0) {
this.tailPages.get(0).getPageIO().activate();
}
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
this.closed.set(false);
} catch (LockException e) {
throw new LockException("The queue failed to obtain exclusive access, cause: " + e.getMessage());
} finally {
lock.unlock();
}
this.headPage = new HeadPage(headCheckpoint, this, pageIO);
if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
// head page is empty, let's keep it as-is
this.currentByteSize += pageIO.getCapacity();
// but checkpoint it to update the firstUnackedPageNum if it changed
this.headPage.checkpoint();
} else {
// head page is non-empty, transform it into a tail page and create a new empty head page
add(headCheckpoint, this.headPage.behead());
headPageNum = headCheckpoint.getPageNum() + 1;
newCheckpointedHeadpage(headPageNum);
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
if (headCheckpoint.maxSeqNum() > this.seqNum) { this.seqNum = headCheckpoint.maxSeqNum(); }
}
// only activate the first tail page
if (tailPages.size() > 0) {
this.tailPages.get(0).getPageIO().activate();
}
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
this.closed.set(false);
}
// add a read tail page into this queue structures but also verify that this tail page
@ -597,7 +616,14 @@ public class Queue implements Closeable {
// unblock blocked writes. a write is blocked *after* the write has been performed so
// unblocking is safe and will return from the write call
notFull.signalAll();
} finally {
try {
FileLockFactory.getDefault().releaseLock(this.dirLock);
} catch (IOException e) {
// log error and ignore
logger.error("Queue close releaseLock failed, error={}", e.getMessage());
}
lock.unlock();
}
}

View file

@ -25,6 +25,8 @@ public class HeadPageTest {
assertThat(p.isFullyAcked(), is(false));
assertThat(p.hasSpace(10), is(true));
assertThat(p.hasSpace(100), is(false));
q.close();
}
@Test
@ -43,6 +45,8 @@ public class HeadPageTest {
assertThat(p.hasSpace(element.serialize().length), is(false));
assertThat(p.isFullyRead(), is(false));
assertThat(p.isFullyAcked(), is(false));
q.close();
}
@Test
@ -67,6 +71,8 @@ public class HeadPageTest {
assertThat(p.hasSpace(element.serialize().length), is(false));
assertThat(p.isFullyRead(), is(true));
assertThat(p.isFullyAcked(), is(false));
q.close();
}
@Test
@ -91,6 +97,8 @@ public class HeadPageTest {
assertThat(p.hasSpace(element.serialize().length), is(false));
assertThat(p.isFullyRead(), is(true));
assertThat(p.isFullyAcked(), is(false));
q.close();
}
// disabled test until we figure what to do in this condition

View file

@ -42,6 +42,8 @@ public class QueueTest {
q.open();
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
q.close();
}
@Test
@ -57,6 +59,8 @@ public class QueueTest {
assertThat(b.getElements().size(), is(equalTo(1)));
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
q.close();
}
@Test
@ -72,6 +76,8 @@ public class QueueTest {
assertThat(b.getElements().size(), is(equalTo(1)));
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
assertThat(q.nonBlockReadBatch(2), is(equalTo(null)));
q.close();
}
@Test
@ -95,6 +101,8 @@ public class QueueTest {
assertThat(b.getElements().size(), is(equalTo(1)));
assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(2).toString())));
q.close();
}
@Test
@ -137,6 +145,8 @@ public class QueueTest {
b = q.nonBlockReadBatch(10);
assertThat(b, is(equalTo(null)));
q.close();
}
@ -178,6 +188,8 @@ public class QueueTest {
b.close();
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(true)));
q.close();
}
@Test
@ -263,6 +275,8 @@ public class QueueTest {
assertThat(c.getMinSeqNum(), is(equalTo(3L)));
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(5L)));
assertThat(c.getFirstUnackedPageNum(), is(equalTo(1)));
q.close();
}
@Test
@ -304,6 +318,8 @@ public class QueueTest {
}
assertThat(q.getTailPages().size(), is(equalTo(0)));
q.close();
}
}
@ -353,6 +369,8 @@ public class QueueTest {
// since we did not ack and pages hold a single item
assertThat(q.getTailPages().size(), is(equalTo(ELEMENT_COUNT)));
q.close();
}
@Test
@ -406,6 +424,8 @@ public class QueueTest {
assertThat(q.getHeadPage().getElementCount() > 0L, is(true));
assertThat(q.getHeadPage().unreadCount(), is(equalTo(1L)));
assertThat(q.unreadCount, is(equalTo(1L)));
q.close();
}
@Test(timeout = 5000)
@ -440,6 +460,7 @@ public class QueueTest {
assertThat(q.isFull(), is(true));
executor.shutdown();
q.close();
}
@Test(timeout = 5000)
@ -483,6 +504,7 @@ public class QueueTest {
assertThat(q.isFull(), is(false));
executor.shutdown();
q.close();
}
@Test(timeout = 5000)
@ -523,6 +545,7 @@ public class QueueTest {
assertThat(q.isFull(), is(true)); // queue should still be full
executor.shutdown();
q.close();
}
@Test

View file

@ -151,6 +151,7 @@ public class Concurent {
// gotta hate exception handling in lambdas
consumers.forEach(c -> {try{c.join();} catch(InterruptedException e) {throw new RuntimeException(e);}});
q.close();
Instant end = Instant.now();