mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
support exclusive locking of PQ dir access
fix agent and pipeline and specs for queue exclusive access added comments and swapped all sleep 0.01 to 0.1 revert explicit pipeline close in specs using sample helper fix multiple pipelines specs use BasePipeline for config validation which does not instantiate a new queue review modifications improve queue exception message
This commit is contained in:
parent
bea468b375
commit
c6710cdbae
12 changed files with 351 additions and 143 deletions
|
@ -189,7 +189,22 @@ class LogStash::Agent
|
|||
end
|
||||
end
|
||||
|
||||
def close_pipeline(id)
|
||||
pipeline = @pipelines[id]
|
||||
if pipeline
|
||||
@logger.warn("closing pipeline", :id => id)
|
||||
pipeline.close
|
||||
end
|
||||
end
|
||||
|
||||
def close_pipelines
|
||||
@pipelines.each do |id, _|
|
||||
close_pipeline(id)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def start_webserver
|
||||
options = {:http_host => @http_host, :http_ports => @http_port, :http_environment => @http_environment }
|
||||
@webserver = LogStash::WebServer.new(@logger, self, options)
|
||||
|
@ -229,17 +244,17 @@ class LogStash::Agent
|
|||
@collect_metric
|
||||
end
|
||||
|
||||
def increment_reload_failures_metrics(id, config, exception)
|
||||
def increment_reload_failures_metrics(id, message, backtrace = nil)
|
||||
@instance_reload_metric.increment(:failures)
|
||||
@pipeline_reload_metric.namespace([id.to_sym, :reloads]).tap do |n|
|
||||
n.increment(:failures)
|
||||
n.gauge(:last_error, { :message => exception.message, :backtrace => exception.backtrace})
|
||||
n.gauge(:last_error, { :message => message, :backtrace =>backtrace})
|
||||
n.gauge(:last_failure_timestamp, LogStash::Timestamp.now)
|
||||
end
|
||||
if @logger.debug?
|
||||
@logger.error("Cannot load an invalid configuration.", :reason => exception.message, :backtrace => exception.backtrace)
|
||||
@logger.error("Cannot load an invalid configuration", :reason => message, :backtrace => backtrace)
|
||||
else
|
||||
@logger.error("Cannot load an invalid configuration.", :reason => exception.message)
|
||||
@logger.error("Cannot load an invalid configuration", :reason => message)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -261,7 +276,7 @@ class LogStash::Agent
|
|||
begin
|
||||
LogStash::Pipeline.new(config, settings, metric)
|
||||
rescue => e
|
||||
increment_reload_failures_metrics(settings.get("pipeline.id"), config, e)
|
||||
increment_reload_failures_metrics(settings.get("pipeline.id"), e.message, e.backtrace)
|
||||
return nil
|
||||
end
|
||||
end
|
||||
|
@ -294,15 +309,14 @@ class LogStash::Agent
|
|||
begin
|
||||
pipeline_validator = LogStash::BasePipeline.new(new_config, old_pipeline.settings)
|
||||
rescue => e
|
||||
increment_reload_failures_metrics(id, new_config, e)
|
||||
increment_reload_failures_metrics(id, e.message, e.backtrace)
|
||||
return
|
||||
end
|
||||
|
||||
# check if the new pipeline will be reloadable in which case we want to log that as an error and abort
|
||||
if !pipeline_validator.reloadable?
|
||||
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => id, :plugins => pipeline_validator.non_reloadable_plugins.map(&:class))
|
||||
# TODO: in the original code the failure metrics were not incremented, should we do it here?
|
||||
# increment_reload_failures_metrics(id, new_config, e)
|
||||
increment_reload_failures_metrics(id, "non reloadable pipeline")
|
||||
return
|
||||
end
|
||||
|
||||
|
@ -331,20 +345,28 @@ class LogStash::Agent
|
|||
# this is a scenario where the configuration is valid (compilable) but the new pipeline refused to start
|
||||
# and at this point NO pipeline is running
|
||||
@logger.error("failed to create the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id)
|
||||
increment_reload_failures_metrics(pipeline_id, "failed to create the reloaded pipeline")
|
||||
return
|
||||
end
|
||||
|
||||
### at this point pipeline#close must be called if upgrade_pipeline does not succeed
|
||||
|
||||
# check if the new pipeline will be reloadable in which case we want to log that as an error and abort. this should normally not
|
||||
# happen since the check should be done in reload_pipeline! prior to get here.
|
||||
if !new_pipeline.reloadable?
|
||||
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => pipeline_id, :plugins => new_pipeline.non_reloadable_plugins.map(&:class))
|
||||
increment_reload_failures_metrics(pipeline_id, "non reloadable pipeline")
|
||||
new_pipeline.close
|
||||
return
|
||||
end
|
||||
|
||||
# @pipelines[pipeline_id] must be initialized before #start_pipeline below which uses it
|
||||
@pipelines[pipeline_id] = new_pipeline
|
||||
|
||||
if !start_pipeline(pipeline_id)
|
||||
@logger.error("failed to start the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id)
|
||||
# do not call increment_reload_failures_metrics here since #start_pipeline already does it on failure
|
||||
new_pipeline.close
|
||||
return
|
||||
end
|
||||
|
||||
|
@ -373,6 +395,8 @@ class LogStash::Agent
|
|||
n.gauge(:last_failure_timestamp, LogStash::Timestamp.now)
|
||||
end
|
||||
@logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace)
|
||||
|
||||
# TODO: this is weird, why dont we return directly here? any reason we need to enter the while true loop below?!
|
||||
end
|
||||
end
|
||||
while true do
|
||||
|
|
|
@ -26,7 +26,7 @@ module LogStash; class BasePipeline
|
|||
|
||||
attr_reader :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id
|
||||
|
||||
def initialize(config_str, settings)
|
||||
def initialize(config_str, settings = SETTINGS)
|
||||
@logger = self.logger
|
||||
@config_str = config_str
|
||||
@config_hash = Digest::SHA1.hexdigest(@config_str)
|
||||
|
@ -59,8 +59,7 @@ module LogStash; class BasePipeline
|
|||
begin
|
||||
eval(config_code)
|
||||
rescue => e
|
||||
# TODO: the original code rescue e but does nothing with it, should we re-raise to have original exception details!?
|
||||
raise
|
||||
raise e
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -139,7 +138,13 @@ module LogStash; class Pipeline < BasePipeline
|
|||
|
||||
super(config_str, settings)
|
||||
|
||||
@queue = LogStash::QueueFactory.create(settings)
|
||||
begin
|
||||
@queue = LogStash::QueueFactory.create(settings)
|
||||
rescue => e
|
||||
@logger.error("Logstash failed to create queue", "exception" => e, "backtrace" => e.backtrace)
|
||||
raise e
|
||||
end
|
||||
|
||||
@input_queue_client = @queue.write_client
|
||||
@filter_queue_client = @queue.read_client
|
||||
@signal_queue = Queue.new
|
||||
|
@ -216,8 +221,7 @@ module LogStash; class Pipeline < BasePipeline
|
|||
shutdown_flusher
|
||||
shutdown_workers
|
||||
|
||||
@filter_queue_client.close
|
||||
@queue.close
|
||||
close
|
||||
|
||||
@logger.debug("Pipeline #{@pipeline_id} has been shutdown")
|
||||
|
||||
|
@ -225,6 +229,11 @@ module LogStash; class Pipeline < BasePipeline
|
|||
return 0
|
||||
end # def run
|
||||
|
||||
def close
|
||||
@filter_queue_client.close
|
||||
@queue.close
|
||||
end
|
||||
|
||||
def transition_to_running
|
||||
@running.make_true
|
||||
end
|
||||
|
@ -327,7 +336,8 @@ module LogStash; class Pipeline < BasePipeline
|
|||
# plugin.
|
||||
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
|
||||
"exception" => e, "backtrace" => e.backtrace)
|
||||
raise
|
||||
|
||||
raise e
|
||||
end
|
||||
|
||||
# Take an array of events and send them to the correct output
|
||||
|
|
|
@ -249,7 +249,7 @@ class LogStash::Runner < Clamp::StrictCommand
|
|||
config_loader = LogStash::Config::Loader.new(logger)
|
||||
config_str = config_loader.format_config(setting("path.config"), setting("config.string"))
|
||||
begin
|
||||
LogStash::Pipeline.new(config_str)
|
||||
LogStash::BasePipeline.new(config_str)
|
||||
puts "Configuration OK"
|
||||
logger.info "Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash"
|
||||
return 0
|
||||
|
|
|
@ -287,7 +287,7 @@ describe "conditionals in filter" do
|
|||
conditional "!([message] <= 'sample')" do
|
||||
sample("apple") { expect(subject.get("tags")).not_to include("success") }
|
||||
sample("zebra") { expect(subject.get("tags")).not_to include("failure") }
|
||||
sample("sample") { expect(subject.get("tags")).not_to include("success") }
|
||||
sample("sample") { expect(subject.get("tags")).not_to include("success")}
|
||||
end
|
||||
|
||||
conditional "!([message] >= 'sample')" do
|
||||
|
|
|
@ -16,7 +16,7 @@ describe LogStash::Agent do
|
|||
let(:config_file) { Stud::Temporary.pathname }
|
||||
let(:config_file_txt) { "input { generator { count => 100000 } } output { }" }
|
||||
|
||||
subject { LogStash::Agent.new(agent_settings) }
|
||||
subject { LogStash::Agent.new(agent_settings) }
|
||||
|
||||
before :each do
|
||||
# This MUST run first, before `subject` is invoked to ensure clean state
|
||||
|
@ -52,6 +52,10 @@ describe LogStash::Agent do
|
|||
}
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
subject.close_pipelines
|
||||
end
|
||||
|
||||
it "should delegate settings to new pipeline" do
|
||||
expect(LogStash::Pipeline).to receive(:new) do |arg1, arg2|
|
||||
expect(arg1).to eq(config_string)
|
||||
|
@ -75,7 +79,7 @@ describe LogStash::Agent do
|
|||
end
|
||||
end
|
||||
|
||||
describe "#execute" do
|
||||
describe "#execute" do
|
||||
let(:config_file_txt) { "input { generator { count => 100000 } } output { }" }
|
||||
|
||||
before :each do
|
||||
|
@ -105,7 +109,10 @@ describe LogStash::Agent do
|
|||
it "should not reload_state!" do
|
||||
expect(subject).to_not receive(:reload_state!)
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
subject.shutdown
|
||||
|
@ -118,11 +125,14 @@ describe LogStash::Agent do
|
|||
|
||||
it "does not upgrade the new config" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
expect(subject).to_not receive(:upgrade_pipeline)
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.send(:"reload_pipeline!", "main")
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
subject.shutdown
|
||||
|
@ -134,14 +144,16 @@ describe LogStash::Agent do
|
|||
|
||||
it "does upgrade the new config" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
expect(subject).to receive(:upgrade_pipeline).once.and_call_original
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.send(:"reload_pipeline!", "main")
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
|
||||
subject.shutdown
|
||||
end
|
||||
end
|
||||
|
@ -154,14 +166,16 @@ describe LogStash::Agent do
|
|||
|
||||
it "does not try to reload the pipeline" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
expect(subject).to_not receive(:reload_pipeline!)
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.reload_state!
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
|
||||
subject.shutdown
|
||||
end
|
||||
end
|
||||
|
@ -172,14 +186,16 @@ describe LogStash::Agent do
|
|||
|
||||
it "tries to reload the pipeline" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
expect(subject).to receive(:reload_pipeline!).once.and_call_original
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.reload_state!
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
|
||||
subject.shutdown
|
||||
end
|
||||
end
|
||||
|
@ -203,9 +219,12 @@ describe LogStash::Agent do
|
|||
it "should periodically reload_state" do
|
||||
allow(subject).to receive(:clean_state?).and_return(false)
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
expect(subject).to receive(:reload_state!).at_least(2).times
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
subject.shutdown
|
||||
|
@ -218,10 +237,13 @@ describe LogStash::Agent do
|
|||
|
||||
it "does not upgrade the new config" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
expect(subject).to_not receive(:upgrade_pipeline)
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
subject.shutdown
|
||||
|
@ -233,10 +255,13 @@ describe LogStash::Agent do
|
|||
|
||||
it "does upgrade the new config" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running?
|
||||
expect(subject).to receive(:upgrade_pipeline).once.and_call_original
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
sleep 0.1
|
||||
|
||||
# TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of
|
||||
# a bad test design or missing class functionality.
|
||||
sleep(0.1)
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
subject.shutdown
|
||||
|
@ -259,6 +284,10 @@ describe LogStash::Agent do
|
|||
subject.register_pipeline(pipeline_settings)
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
subject.close_pipelines
|
||||
end
|
||||
|
||||
context "when fetching a new state" do
|
||||
it "upgrades the state" do
|
||||
expect(subject).to receive(:fetch_config).and_return(second_pipeline_config)
|
||||
|
@ -291,6 +320,7 @@ describe LogStash::Agent do
|
|||
|
||||
after :each do
|
||||
ENV["FOO"] = @foo_content
|
||||
subject.close_pipelines
|
||||
end
|
||||
|
||||
it "doesn't upgrade the state" do
|
||||
|
@ -314,14 +344,20 @@ describe LogStash::Agent do
|
|||
end
|
||||
|
||||
after(:each) do
|
||||
subject.shutdown
|
||||
# new pipelines will be created part of the upgrade process so we need
|
||||
# to close any initialized pipelines
|
||||
subject.close_pipelines
|
||||
end
|
||||
|
||||
context "when the upgrade fails" do
|
||||
before :each do
|
||||
allow(subject).to receive(:fetch_config).and_return(new_pipeline_config)
|
||||
allow(subject).to receive(:create_pipeline).and_return(nil)
|
||||
allow(subject).to receive(:stop_pipeline)
|
||||
allow(subject).to receive(:stop_pipeline) do |id|
|
||||
# we register_pipeline but we never execute them so we have to mock #stop_pipeline to
|
||||
# not call Pipeline#shutdown but Pipeline#close
|
||||
subject.close_pipeline(id)
|
||||
end
|
||||
end
|
||||
|
||||
it "leaves the state untouched" do
|
||||
|
@ -339,18 +375,29 @@ describe LogStash::Agent do
|
|||
|
||||
context "when the upgrade succeeds" do
|
||||
let(:new_config) { "input { generator { count => 1 } } output { }" }
|
||||
|
||||
before :each do
|
||||
allow(subject).to receive(:fetch_config).and_return(new_config)
|
||||
allow(subject).to receive(:stop_pipeline)
|
||||
allow(subject).to receive(:start_pipeline)
|
||||
allow(subject).to receive(:start_pipeline).and_return(true)
|
||||
allow(subject).to receive(:stop_pipeline) do |id|
|
||||
# we register_pipeline but we never execute them so we have to mock #stop_pipeline to
|
||||
# not call Pipeline#shutdown but Pipeline#close
|
||||
subject.close_pipeline(id)
|
||||
end
|
||||
end
|
||||
|
||||
it "updates the state" do
|
||||
subject.send(:"reload_pipeline!", default_pipeline_id)
|
||||
expect(subject.pipelines[default_pipeline_id].config_str).to eq(new_config)
|
||||
end
|
||||
|
||||
it "starts the pipeline" do
|
||||
expect(subject).to receive(:stop_pipeline)
|
||||
expect(subject).to receive(:start_pipeline)
|
||||
expect(subject).to receive(:start_pipeline).and_return(true)
|
||||
expect(subject).to receive(:stop_pipeline) do |id|
|
||||
# we register_pipeline but we never execute them so we have to mock #stop_pipeline to
|
||||
# not call Pipeline#shutdown but Pipeline#close
|
||||
subject.close_pipeline(id)
|
||||
end
|
||||
subject.send(:"reload_pipeline!", default_pipeline_id)
|
||||
end
|
||||
end
|
||||
|
@ -378,9 +425,9 @@ describe LogStash::Agent do
|
|||
end
|
||||
end
|
||||
|
||||
|
||||
context "metrics after config reloading" do
|
||||
let!(:config) { "input { generator { } } output { dummyoutput { } }" }
|
||||
|
||||
let!(:config_path) do
|
||||
f = Stud::Temporary.file
|
||||
f.write(config)
|
||||
|
@ -388,6 +435,7 @@ describe LogStash::Agent do
|
|||
f.close
|
||||
f.path
|
||||
end
|
||||
|
||||
let(:pipeline_args) do
|
||||
{
|
||||
"pipeline.workers" => 2,
|
||||
|
@ -411,6 +459,13 @@ describe LogStash::Agent do
|
|||
let!(:dummy_output) { LogStash::Outputs::DroppingDummyOutput.new }
|
||||
let!(:dummy_output2) { DummyOutput2.new }
|
||||
let(:initial_generator_threshold) { 1000 }
|
||||
let(:pipeline_thread) do
|
||||
Thread.new do
|
||||
subject.register_pipeline(pipeline_settings)
|
||||
subject.execute
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
before :each do
|
||||
allow(LogStash::Outputs::DroppingDummyOutput).to receive(:new).at_least(:once).with(anything).and_return(dummy_output)
|
||||
|
@ -424,20 +479,17 @@ describe LogStash::Agent do
|
|||
@abort_on_exception = Thread.abort_on_exception
|
||||
Thread.abort_on_exception = true
|
||||
|
||||
@t = Thread.new do
|
||||
subject.register_pipeline(pipeline_settings)
|
||||
subject.execute
|
||||
end
|
||||
pipeline_thread
|
||||
|
||||
# wait for some events to reach the dummy_output
|
||||
sleep(0.01) until dummy_output.events_received > initial_generator_threshold
|
||||
sleep(0.1) until dummy_output.events_received > initial_generator_threshold
|
||||
end
|
||||
|
||||
after :each do
|
||||
begin
|
||||
subject.shutdown
|
||||
Stud.stop!(@t)
|
||||
@t.join
|
||||
Stud.stop!(pipeline_thread)
|
||||
pipeline_thread.join
|
||||
ensure
|
||||
Thread.abort_on_exception = @abort_on_exception
|
||||
end
|
||||
|
@ -446,8 +498,8 @@ describe LogStash::Agent do
|
|||
context "when reloading a good config" do
|
||||
let(:new_config_generator_counter) { 500 }
|
||||
let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput2 {} }" }
|
||||
before :each do
|
||||
|
||||
before :each do
|
||||
File.open(config_path, "w") do |f|
|
||||
f.write(new_config)
|
||||
f.fsync
|
||||
|
@ -496,12 +548,10 @@ describe LogStash::Agent do
|
|||
value = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads][:last_error].value
|
||||
expect(value).to be(nil)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
context "when reloading a bad config" do
|
||||
let(:new_config) { "input { generator { count => " }
|
||||
let(:new_config_generator_counter) { 500 }
|
||||
before :each do
|
||||
|
||||
File.open(config_path, "w") do |f|
|
||||
|
@ -546,7 +596,6 @@ describe LogStash::Agent do
|
|||
|
||||
context "when reloading a config that raises exception on pipeline.run" do
|
||||
let(:new_config) { "input { generator { count => 10000 } }" }
|
||||
let(:new_config_generator_counter) { 500 }
|
||||
|
||||
class BrokenGenerator < LogStash::Inputs::Generator
|
||||
def register
|
||||
|
@ -555,14 +604,12 @@ describe LogStash::Agent do
|
|||
end
|
||||
|
||||
before :each do
|
||||
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(BrokenGenerator)
|
||||
|
||||
File.open(config_path, "w") do |f|
|
||||
f.write(new_config)
|
||||
f.fsync
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
it "does not increase the successful reload count" do
|
||||
|
@ -573,7 +620,7 @@ describe LogStash::Agent do
|
|||
}
|
||||
end
|
||||
|
||||
it "increases the failured reload count" do
|
||||
it "increases the failures reload count" do
|
||||
expect { subject.send(:"reload_pipeline!", "main") }.to change {
|
||||
snapshot = subject.metric.collector.snapshot_metric
|
||||
reload_metrics = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads]
|
||||
|
|
|
@ -138,8 +138,8 @@ describe LogStash::Pipeline do
|
|||
Thread.abort_on_exception = true
|
||||
|
||||
pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj)
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
t = Thread.new { pipeline.run }
|
||||
sleep(0.1) until pipeline.ready?
|
||||
wait(3).for do
|
||||
# give us a bit of time to flush the events
|
||||
# puts("*****" + output.events.map{|e| e.message}.to_s)
|
||||
|
@ -149,6 +149,7 @@ describe LogStash::Pipeline do
|
|||
expect(output.events[0].get("tags")).to eq(["notdropped"])
|
||||
expect(output.events[1].get("tags")).to eq(["notdropped"])
|
||||
pipeline.shutdown
|
||||
t.join
|
||||
|
||||
Thread.abort_on_exception = abort_on_exception_state
|
||||
end
|
||||
|
@ -192,12 +193,14 @@ describe LogStash::Pipeline do
|
|||
pipeline_settings_obj.set("config.debug", false)
|
||||
expect(logger).not_to receive(:debug).with(/Compiled pipeline/, anything)
|
||||
pipeline = TestPipeline.new(test_config_with_filters)
|
||||
pipeline.close
|
||||
end
|
||||
|
||||
it "should print the compiled code if config.debug is set to true" do
|
||||
pipeline_settings_obj.set("config.debug", true)
|
||||
expect(logger).to receive(:debug).with(/Compiled pipeline/, anything)
|
||||
pipeline = TestPipeline.new(test_config_with_filters, pipeline_settings_obj)
|
||||
pipeline.close
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -363,7 +366,6 @@ describe LogStash::Pipeline do
|
|||
|
||||
sample(["foo", "bar"]) do
|
||||
expect(subject.size).to eq(2)
|
||||
|
||||
expect(subject[0].get("message")).to eq("foo\nbar")
|
||||
expect(subject[0].get("type")).to be_nil
|
||||
expect(subject[1].get("message")).to eq("foo\nbar")
|
||||
|
@ -385,9 +387,14 @@ describe LogStash::Pipeline do
|
|||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
|
||||
allow(logger).to receive(:warn)
|
||||
thread = Thread.new { pipeline.run }
|
||||
|
||||
# pipeline must be first called outside the thread context because it lazyly initialize and will create a
|
||||
# race condition if called in the thread
|
||||
p = pipeline
|
||||
t = Thread.new { p.run }
|
||||
sleep(0.1) until pipeline.ready?
|
||||
pipeline.shutdown
|
||||
thread.join
|
||||
t.join
|
||||
end
|
||||
|
||||
it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do
|
||||
|
@ -440,6 +447,10 @@ describe LogStash::Pipeline do
|
|||
let(:settings) { LogStash::SETTINGS.clone }
|
||||
subject { LogStash::Pipeline.new(config, settings, metric) }
|
||||
|
||||
after :each do
|
||||
subject.close
|
||||
end
|
||||
|
||||
context "when metric.collect is disabled" do
|
||||
before :each do
|
||||
settings.set("metric.collect", false)
|
||||
|
@ -528,9 +539,21 @@ describe LogStash::Pipeline do
|
|||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore)
|
||||
end
|
||||
|
||||
# multiple pipelines cannot be instantiated using the same PQ settings, force memory queue
|
||||
before :each do
|
||||
pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type")
|
||||
allow(pipeline_workers_setting).to receive(:value).and_return("memory")
|
||||
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
|
||||
end
|
||||
|
||||
let(:pipeline1) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
|
||||
let(:pipeline2) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutputmore {}}") }
|
||||
|
||||
after do
|
||||
pipeline1.close
|
||||
pipeline2.close
|
||||
end
|
||||
|
||||
it "should handle evaluating different config" do
|
||||
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
|
||||
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
|
||||
|
@ -573,8 +596,8 @@ describe LogStash::Pipeline do
|
|||
it "flushes the buffered contents of the filter" do
|
||||
Thread.abort_on_exception = true
|
||||
pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj)
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
t = Thread.new { pipeline.run }
|
||||
sleep(0.1) until pipeline.ready?
|
||||
wait(3).for do
|
||||
# give us a bit of time to flush the events
|
||||
output.events.empty?
|
||||
|
@ -582,6 +605,7 @@ describe LogStash::Pipeline do
|
|||
event = output.events.pop
|
||||
expect(event.get("message").count("\n")).to eq(99)
|
||||
pipeline.shutdown
|
||||
t.join
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -596,6 +620,13 @@ describe LogStash::Pipeline do
|
|||
let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
|
||||
let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
|
||||
|
||||
# multiple pipelines cannot be instantiated using the same PQ settings, force memory queue
|
||||
before :each do
|
||||
pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type")
|
||||
allow(pipeline_workers_setting).to receive(:value).and_return("memory")
|
||||
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
|
||||
end
|
||||
|
||||
it "should handle evaluating different config" do
|
||||
# When the functions are compiled from the AST it will generate instance
|
||||
# variables that are unique to the actual config, the intances are pointing
|
||||
|
@ -626,8 +657,14 @@ describe LogStash::Pipeline do
|
|||
|
||||
subject { described_class.new(config) }
|
||||
|
||||
it "returns nil when the pipeline isnt started" do
|
||||
expect(subject.started_at).to be_nil
|
||||
context "when the pipeline is not started" do
|
||||
after :each do
|
||||
subject.close
|
||||
end
|
||||
|
||||
it "returns nil when the pipeline isnt started" do
|
||||
expect(subject.started_at).to be_nil
|
||||
end
|
||||
end
|
||||
|
||||
it "return when the pipeline started working" do
|
||||
|
@ -648,6 +685,10 @@ describe LogStash::Pipeline do
|
|||
subject { described_class.new(config) }
|
||||
|
||||
context "when the pipeline is not started" do
|
||||
after :each do
|
||||
subject.close
|
||||
end
|
||||
|
||||
it "returns 0" do
|
||||
expect(subject.uptime).to eq(0)
|
||||
end
|
||||
|
@ -655,10 +696,15 @@ describe LogStash::Pipeline do
|
|||
|
||||
context "when the pipeline is started" do
|
||||
it "return the duration in milliseconds" do
|
||||
t = Thread.new { subject.run }
|
||||
# subject must be first call outside the thread context because of lazy initialization
|
||||
s = subject
|
||||
t = Thread.new { s.run }
|
||||
sleep(0.1) until subject.ready?
|
||||
|
||||
sleep(0.1)
|
||||
expect(subject.uptime).to be > 0
|
||||
subject.shutdown
|
||||
t.join
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -704,6 +750,12 @@ describe LogStash::Pipeline do
|
|||
end
|
||||
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) }
|
||||
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
|
||||
let(:pipeline_thread) do
|
||||
# subject has to be called for the first time outside the thread because it will create a race condition
|
||||
# with the subject.ready? call since subject is lazily initialized
|
||||
s = subject
|
||||
Thread.new { s.run }
|
||||
end
|
||||
|
||||
before :each do
|
||||
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
|
||||
|
@ -712,7 +764,9 @@ describe LogStash::Pipeline do
|
|||
allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
|
||||
|
||||
Thread.new { subject.run }
|
||||
pipeline_thread
|
||||
sleep(0.1) until subject.ready?
|
||||
|
||||
# make sure we have received all the generated events
|
||||
wait(3).for do
|
||||
# give us a bit of time to flush the events
|
||||
|
@ -722,6 +776,7 @@ describe LogStash::Pipeline do
|
|||
|
||||
after :each do
|
||||
subject.shutdown
|
||||
pipeline_thread.join
|
||||
end
|
||||
|
||||
context "global metric" do
|
||||
|
@ -787,6 +842,13 @@ describe LogStash::Pipeline do
|
|||
let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
|
||||
let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
|
||||
|
||||
# multiple pipelines cannot be instantiated using the same PQ settings, force memory queue
|
||||
before :each do
|
||||
pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type")
|
||||
allow(pipeline_workers_setting).to receive(:value).and_return("memory")
|
||||
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
|
||||
end
|
||||
|
||||
it "should not add ivars" do
|
||||
expect(pipeline1.instance_variables).to eq(pipeline2.instance_variables)
|
||||
end
|
||||
|
|
|
@ -36,7 +36,9 @@ describe LogStash::QueueFactory do
|
|||
end
|
||||
|
||||
it "returns a `WrappedAckedQueue`" do
|
||||
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue)
|
||||
queue = subject.create(settings)
|
||||
expect(queue).to be_kind_of(LogStash::Util::WrappedAckedQueue)
|
||||
queue.close
|
||||
end
|
||||
|
||||
describe "per pipeline id subdirectory creation" do
|
||||
|
@ -50,6 +52,7 @@ describe LogStash::QueueFactory do
|
|||
expect(Dir.exist?(queue_path)).to be_falsey
|
||||
queue = subject.create(settings)
|
||||
expect(Dir.exist?(queue_path)).to be_truthy
|
||||
queue.close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -60,7 +63,9 @@ describe LogStash::QueueFactory do
|
|||
end
|
||||
|
||||
it "returns a `WrappedAckedQueue`" do
|
||||
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue)
|
||||
queue = subject.create(settings)
|
||||
expect(queue).to be_kind_of(LogStash::Util::WrappedAckedQueue)
|
||||
queue.close
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -70,7 +75,9 @@ describe LogStash::QueueFactory do
|
|||
end
|
||||
|
||||
it "returns a `WrappedAckedQueue`" do
|
||||
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedSynchronousQueue)
|
||||
queue = subject.create(settings)
|
||||
expect(queue).to be_kind_of(LogStash::Util::WrappedSynchronousQueue)
|
||||
queue.close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -87,7 +87,7 @@ public class FileLockFactory {
|
|||
LOCK_MAP.put(lock, realLockPath.toString());
|
||||
return lock;
|
||||
} else {
|
||||
throw new LockException("Lock held by another program: " + realLockPath);
|
||||
throw new LockException("Lock held by another program on lock path: " + realLockPath);
|
||||
}
|
||||
} finally {
|
||||
if (lock == null) { // not successful - clear up and move out
|
||||
|
@ -106,7 +106,7 @@ public class FileLockFactory {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
throw new LockException("Lock held by this virtual machine: " + realLockPath);
|
||||
throw new LockException("Lock held by this virtual machine on lock path: " + realLockPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ package org.logstash.ackedqueue;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.logstash.FileLockFactory;
|
||||
import org.logstash.LockException;
|
||||
import org.logstash.common.io.CheckpointIO;
|
||||
import org.logstash.common.io.PageIO;
|
||||
import org.logstash.common.io.PageIOFactory;
|
||||
|
@ -10,6 +12,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -66,6 +69,10 @@ public class Queue implements Closeable {
|
|||
private final Condition notFull = lock.newCondition();
|
||||
private final Condition notEmpty = lock.newCondition();
|
||||
|
||||
// exclusive dir access
|
||||
private FileLock dirLock;
|
||||
private final static String LOCK_NAME = ".lock";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(Queue.class);
|
||||
|
||||
public Queue(Settings settings) {
|
||||
|
@ -142,82 +149,94 @@ public class Queue implements Closeable {
|
|||
|
||||
if (!this.closed.get()) { throw new IOException("queue already opened"); }
|
||||
|
||||
Checkpoint headCheckpoint;
|
||||
lock.lock();
|
||||
try {
|
||||
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
|
||||
} catch (NoSuchFileException e) {
|
||||
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
|
||||
// verify exclusive access to the dirPath
|
||||
this.dirLock = FileLockFactory.getDefault().obtainLock(this.dirPath, LOCK_NAME);
|
||||
|
||||
this.seqNum = 0;
|
||||
headPageNum = 0;
|
||||
Checkpoint headCheckpoint;
|
||||
try {
|
||||
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
|
||||
} catch (NoSuchFileException e) {
|
||||
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
|
||||
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
this.closed.set(false);
|
||||
this.seqNum = 0;
|
||||
headPageNum = 0;
|
||||
|
||||
return;
|
||||
}
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
this.closed.set(false);
|
||||
|
||||
// at this point we have a head checkpoint to figure queue recovery
|
||||
|
||||
// reconstruct all tail pages state upto but excluding the head page
|
||||
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
|
||||
|
||||
// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
|
||||
Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
|
||||
|
||||
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
||||
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
|
||||
|
||||
add(cp, new TailPage(cp, this, pageIO));
|
||||
}
|
||||
|
||||
// transform the head page into a tail page only if the headpage is non-empty
|
||||
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
|
||||
|
||||
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
|
||||
|
||||
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
|
||||
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes
|
||||
|
||||
logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
|
||||
logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
|
||||
long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
|
||||
if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
|
||||
logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
|
||||
firstUnackedSeqNum = pageIO.getMinSeqNum();
|
||||
return;
|
||||
}
|
||||
headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
|
||||
// at this point we have a head checkpoint to figure queue recovery
|
||||
|
||||
// reconstruct all tail pages state upto but excluding the head page
|
||||
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
|
||||
|
||||
// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
|
||||
Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
|
||||
|
||||
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
||||
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
|
||||
|
||||
add(cp, new TailPage(cp, this, pageIO));
|
||||
}
|
||||
|
||||
// transform the head page into a tail page only if the headpage is non-empty
|
||||
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
|
||||
|
||||
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
|
||||
|
||||
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
|
||||
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes
|
||||
|
||||
logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
|
||||
logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
|
||||
long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
|
||||
if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
|
||||
logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
|
||||
firstUnackedSeqNum = pageIO.getMinSeqNum();
|
||||
}
|
||||
headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
}
|
||||
this.headPage = new HeadPage(headCheckpoint, this, pageIO);
|
||||
|
||||
if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
|
||||
// head page is empty, let's keep it as-is
|
||||
|
||||
this.currentByteSize += pageIO.getCapacity();
|
||||
|
||||
// but checkpoint it to update the firstUnackedPageNum if it changed
|
||||
this.headPage.checkpoint();
|
||||
} else {
|
||||
// head page is non-empty, transform it into a tail page and create a new empty head page
|
||||
add(headCheckpoint, this.headPage.behead());
|
||||
|
||||
headPageNum = headCheckpoint.getPageNum() + 1;
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
|
||||
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
|
||||
if (headCheckpoint.maxSeqNum() > this.seqNum) {
|
||||
this.seqNum = headCheckpoint.maxSeqNum();
|
||||
}
|
||||
}
|
||||
|
||||
// only activate the first tail page
|
||||
if (tailPages.size() > 0) {
|
||||
this.tailPages.get(0).getPageIO().activate();
|
||||
}
|
||||
|
||||
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
|
||||
|
||||
this.closed.set(false);
|
||||
} catch (LockException e) {
|
||||
throw new LockException("The queue failed to obtain exclusive access, cause: " + e.getMessage());
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
this.headPage = new HeadPage(headCheckpoint, this, pageIO);
|
||||
|
||||
if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
|
||||
// head page is empty, let's keep it as-is
|
||||
|
||||
this.currentByteSize += pageIO.getCapacity();
|
||||
|
||||
// but checkpoint it to update the firstUnackedPageNum if it changed
|
||||
this.headPage.checkpoint();
|
||||
} else {
|
||||
// head page is non-empty, transform it into a tail page and create a new empty head page
|
||||
add(headCheckpoint, this.headPage.behead());
|
||||
|
||||
headPageNum = headCheckpoint.getPageNum() + 1;
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
|
||||
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
|
||||
if (headCheckpoint.maxSeqNum() > this.seqNum) { this.seqNum = headCheckpoint.maxSeqNum(); }
|
||||
}
|
||||
|
||||
// only activate the first tail page
|
||||
if (tailPages.size() > 0) {
|
||||
this.tailPages.get(0).getPageIO().activate();
|
||||
}
|
||||
|
||||
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
|
||||
|
||||
this.closed.set(false);
|
||||
}
|
||||
|
||||
// add a read tail page into this queue structures but also verify that this tail page
|
||||
|
@ -597,7 +616,14 @@ public class Queue implements Closeable {
|
|||
// unblock blocked writes. a write is blocked *after* the write has been performed so
|
||||
// unblocking is safe and will return from the write call
|
||||
notFull.signalAll();
|
||||
|
||||
} finally {
|
||||
try {
|
||||
FileLockFactory.getDefault().releaseLock(this.dirLock);
|
||||
} catch (IOException e) {
|
||||
// log error and ignore
|
||||
logger.error("Queue close releaseLock failed, error={}", e.getMessage());
|
||||
}
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ public class HeadPageTest {
|
|||
assertThat(p.isFullyAcked(), is(false));
|
||||
assertThat(p.hasSpace(10), is(true));
|
||||
assertThat(p.hasSpace(100), is(false));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -43,6 +45,8 @@ public class HeadPageTest {
|
|||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(false));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -67,6 +71,8 @@ public class HeadPageTest {
|
|||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -91,6 +97,8 @@ public class HeadPageTest {
|
|||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
// disabled test until we figure what to do in this condition
|
||||
|
|
|
@ -42,6 +42,8 @@ public class QueueTest {
|
|||
q.open();
|
||||
|
||||
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -57,6 +59,8 @@ public class QueueTest {
|
|||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -72,6 +76,8 @@ public class QueueTest {
|
|||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(q.nonBlockReadBatch(2), is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -95,6 +101,8 @@ public class QueueTest {
|
|||
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(2).toString())));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -137,6 +145,8 @@ public class QueueTest {
|
|||
|
||||
b = q.nonBlockReadBatch(10);
|
||||
assertThat(b, is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
|
||||
|
@ -178,6 +188,8 @@ public class QueueTest {
|
|||
b.close();
|
||||
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(true)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -263,6 +275,8 @@ public class QueueTest {
|
|||
assertThat(c.getMinSeqNum(), is(equalTo(3L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(5L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(1)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -304,6 +318,8 @@ public class QueueTest {
|
|||
}
|
||||
|
||||
assertThat(q.getTailPages().size(), is(equalTo(0)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -353,6 +369,8 @@ public class QueueTest {
|
|||
|
||||
// since we did not ack and pages hold a single item
|
||||
assertThat(q.getTailPages().size(), is(equalTo(ELEMENT_COUNT)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -406,6 +424,8 @@ public class QueueTest {
|
|||
assertThat(q.getHeadPage().getElementCount() > 0L, is(true));
|
||||
assertThat(q.getHeadPage().unreadCount(), is(equalTo(1L)));
|
||||
assertThat(q.unreadCount, is(equalTo(1L)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
@ -440,6 +460,7 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(true));
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
@ -483,6 +504,7 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(false));
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
@ -523,6 +545,7 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(true)); // queue should still be full
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -151,6 +151,7 @@ public class Concurent {
|
|||
|
||||
// gotta hate exception handling in lambdas
|
||||
consumers.forEach(c -> {try{c.join();} catch(InterruptedException e) {throw new RuntimeException(e);}});
|
||||
q.close();
|
||||
|
||||
Instant end = Instant.now();
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue