Extract the creation of the queue into a factory

I've move the initialization code into a factory for future work on the
pipeline.

Fixes #6498
This commit is contained in:
Pier-Hugues Pellerin 2017-01-05 09:36:44 -05:00
parent c9391e964e
commit 8e57042e25
3 changed files with 96 additions and 29 deletions

View file

@ -5,8 +5,6 @@ require "concurrent"
require "logstash/namespace"
require "logstash/errors"
require "logstash-core/logstash-core"
require "logstash/util/wrapped_acked_queue"
require "logstash/util/wrapped_synchronous_queue"
require "logstash/event"
require "logstash/config/file"
require "logstash/filters/base"
@ -21,6 +19,7 @@ require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector"
require "logstash/output_delegator"
require "logstash/filter_delegator"
require "logstash/queue_factory"
module LogStash; class Pipeline
include LogStash::Util::Loggable
@ -99,7 +98,8 @@ module LogStash; class Pipeline
rescue => e
raise
end
@queue = build_queue_from_settings
@queue = LogStash::QueueFactory.create(settings)
@input_queue_client = @queue.write_client
@filter_queue_client = @queue.read_client
@signal_queue = Queue.new
@ -120,32 +120,6 @@ module LogStash; class Pipeline
@flushing = Concurrent::AtomicReference.new(false)
end # def initialize
def build_queue_from_settings
queue_type = settings.get("queue.type")
queue_page_capacity = settings.get("queue.page_capacity")
queue_max_bytes = settings.get("queue.max_bytes")
queue_max_events = settings.get("queue.max_events")
checkpoint_max_acks = settings.get("queue.checkpoint.acks")
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
if queue_type == "memory_acked"
# memory_acked is used in tests/specs
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_bytes)
elsif queue_type == "memory"
# memory is the legacy and default setting
LogStash::Util::WrappedSynchronousQueue.new()
elsif queue_type == "persisted"
# persisted is the disk based acked queue
queue_path = settings.get("path.queue")
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
else
raise(ConfigurationError, "invalid queue.type setting")
end
end
private :build_queue_from_settings
def ready?
@ready.value
end

View file

@ -0,0 +1,33 @@
# encoding: utf-8
require "logstash/event"
require "logstash/namespace"
require "logstash/util/wrapped_acked_queue"
require "logstash/util/wrapped_synchronous_queue"
module LogStash
class QueueFactory
def self.create(settings)
queue_type = settings.get("queue.type")
queue_page_capacity = settings.get("queue.page_capacity")
queue_max_bytes = settings.get("queue.max_bytes")
queue_max_events = settings.get("queue.max_events")
checkpoint_max_acks = settings.get("queue.checkpoint.acks")
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
if queue_type == "memory_acked"
# memory_acked is used in tests/specs
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_bytes)
elsif queue_type == "memory"
# memory is the legacy and default setting
LogStash::Util::WrappedSynchronousQueue.new
elsif queue_type == "persisted"
# persisted is the disk based acked queue
queue_path = settings.get("path.queue")
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
else
raise ConfigurationError, "invalid queue.type setting"
end
end
end
end

View file

@ -0,0 +1,60 @@
# encoding: utf-8
require "logstash/queue_factory"
require "logstash/settings"
require "stud/temporary"
describe LogStash::QueueFactory do
let(:settings_array) do
[
LogStash::Setting::WritableDirectory.new("path.queue", Stud::Temporary.pathname),
LogStash::Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]),
LogStash::Setting::Bytes.new("queue.page_capacity", "250mb"),
LogStash::Setting::Bytes.new("queue.max_bytes", "1024mb"),
LogStash::Setting::Numeric.new("queue.max_events", 0),
LogStash::Setting::Numeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::Numeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::Numeric.new("queue.checkpoint.interval", 1000)
]
end
let(:settings) do
s = LogStash::Settings.new
settings_array.each do |setting|
s.register(setting)
end
s
end
subject { described_class }
context "when `queue.type` is `persisted`" do
before do
settings.set("queue.type", "persisted")
end
it "returns a `WrappedAckedQueue`" do
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue)
end
end
context "when `queue.type` is `memory_acked`" do
before do
settings.set("queue.type", "memory_acked")
end
it "returns a `WrappedAckedQueue`" do
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue)
end
end
context "when `queue.type` is `memory`" do
before do
settings.set("queue.type", "memory")
end
it "returns a `WrappedAckedQueue`" do
expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedSynchronousQueue)
end
end
end