include pipeline id in queue path

create queue sub directory based on pipeline id

Fixes #6540
This commit is contained in:
Joao Duarte 2017-01-16 17:40:21 +00:00 committed by João Duarte
parent f2486324af
commit b4c7c97452
2 changed files with 20 additions and 2 deletions

View file

@ -1,4 +1,5 @@
# encoding: utf-8
require "fileutils"
require "logstash/event"
require "logstash/namespace"
require "logstash/util/wrapped_acked_queue"
@ -24,7 +25,8 @@ module LogStash
LogStash::Util::WrappedSynchronousQueue.new
when "persisted"
# persisted is the disk based acked queue
queue_path = settings.get("path.queue")
queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id"))
FileUtils.mkdir_p(queue_path)
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
else
raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory_acked', 'memory', 'persisted'"

View file

@ -4,6 +4,7 @@ require "logstash/settings"
require "stud/temporary"
describe LogStash::QueueFactory do
let(:pipeline_id) { "my_pipeline" }
let(:settings_array) do
[
LogStash::Setting::WritableDirectory.new("path.queue", Stud::Temporary.pathname),
@ -13,7 +14,8 @@ describe LogStash::QueueFactory do
LogStash::Setting::Numeric.new("queue.max_events", 0),
LogStash::Setting::Numeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::Numeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::Numeric.new("queue.checkpoint.interval", 1000)
LogStash::Setting::Numeric.new("queue.checkpoint.interval", 1000),
LogStash::Setting::String.new("pipeline.id", pipeline_id)
]
end
@ -36,6 +38,20 @@ describe LogStash::QueueFactory do
it "returns a `WrappedAckedQueue`" do
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue)
end
describe "per pipeline id subdirectory creation" do
let(:queue_path) { ::File.join(settings.get("path.queue"), pipeline_id) }
after :each do
FileUtils.rmdir(queue_path)
end
it "creates a queue directory based on the pipeline id" do
expect(Dir.exist?(queue_path)).to be_falsey
queue = subject.create(settings)
expect(Dir.exist?(queue_path)).to be_truthy
end
end
end
context "when `queue.type` is `memory_acked`" do