diff --git a/logstash-core/lib/logstash/queue_factory.rb b/logstash-core/lib/logstash/queue_factory.rb index a3a28cf03..70b215557 100644 --- a/logstash-core/lib/logstash/queue_factory.rb +++ b/logstash-core/lib/logstash/queue_factory.rb @@ -16,18 +16,20 @@ module LogStash checkpoint_max_writes = settings.get("queue.checkpoint.writes") checkpoint_max_interval = settings.get("queue.checkpoint.interval") + queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id")) + case queue_type when "memory_acked" # memory_acked is used in tests/specs - LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_bytes) + FileUtils.mkdir_p(queue_path) + LogStash::Util::WrappedAckedQueue.create_memory_based(queue_path, queue_page_capacity, queue_max_events, queue_max_bytes) + when "persisted" + # persisted is the disk based acked queue + FileUtils.mkdir_p(queue_path) + LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes) when "memory" # memory is the legacy and default setting LogStash::Util::WrappedSynchronousQueue.new - when "persisted" - # persisted is the disk based acked queue - queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id")) - FileUtils.mkdir_p(queue_path) - LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes) else raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory_acked', 'memory', 'persisted'" end