mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
use queue path in memory acked queue to namespace .lock file
This commit is contained in:
parent
5f9a6b5cbf
commit
6838cdb588
1 changed files with 8 additions and 6 deletions
|
@ -16,18 +16,20 @@ module LogStash
|
||||||
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
|
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
|
||||||
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
|
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
|
||||||
|
|
||||||
|
queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id"))
|
||||||
|
|
||||||
case queue_type
|
case queue_type
|
||||||
when "memory_acked"
|
when "memory_acked"
|
||||||
# memory_acked is used in tests/specs
|
# memory_acked is used in tests/specs
|
||||||
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_bytes)
|
FileUtils.mkdir_p(queue_path)
|
||||||
|
LogStash::Util::WrappedAckedQueue.create_memory_based(queue_path, queue_page_capacity, queue_max_events, queue_max_bytes)
|
||||||
|
when "persisted"
|
||||||
|
# persisted is the disk based acked queue
|
||||||
|
FileUtils.mkdir_p(queue_path)
|
||||||
|
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
|
||||||
when "memory"
|
when "memory"
|
||||||
# memory is the legacy and default setting
|
# memory is the legacy and default setting
|
||||||
LogStash::Util::WrappedSynchronousQueue.new
|
LogStash::Util::WrappedSynchronousQueue.new
|
||||||
when "persisted"
|
|
||||||
# persisted is the disk based acked queue
|
|
||||||
queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id"))
|
|
||||||
FileUtils.mkdir_p(queue_path)
|
|
||||||
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
|
|
||||||
else
|
else
|
||||||
raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory_acked', 'memory', 'persisted'"
|
raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory_acked', 'memory', 'persisted'"
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue