mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
This commit changes the behavior of PQ size checking.
When it checks the size usage, instead of throwing exception that stops the pipeline,
it gives warning msg in every converge state if it fails the check
Fixed: #14257
Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
(cherry picked from commit c725aabb49
)
Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com>
This commit is contained in:
parent
6b2b4460a0
commit
30f020f5f6
2 changed files with 21 additions and 16 deletions
|
@ -22,13 +22,15 @@ java_import "java.nio.file.Files"
|
|||
|
||||
module LogStash
|
||||
class PersistedQueueConfigValidator
|
||||
include LogStash::Util::Loggable
|
||||
|
||||
def initialize
|
||||
@last_check_pipeline_configs = Array.new
|
||||
@last_check_pass = false
|
||||
end
|
||||
|
||||
# Check the config of persistent queue. Raise BootstrapCheckError if fail
|
||||
# Check the config of persistent queue. Raise BootstrapCheckError if queue.page_capacity > queue.max_bytes
|
||||
# Print warning message if fail the space checking
|
||||
# @param running_pipelines [Hash pipeline_id (sym) => JavaPipeline]
|
||||
# @param pipeline_configs [Array PipelineConfig]
|
||||
def check(running_pipelines, pipeline_configs)
|
||||
|
@ -37,6 +39,7 @@ module LogStash
|
|||
@last_check_pipeline_configs = pipeline_configs
|
||||
return unless has_update
|
||||
|
||||
warn_msg = []
|
||||
err_msg = []
|
||||
queue_path_file_system = Hash.new # (String: queue path, String: file system)
|
||||
required_free_bytes = Hash.new # (String: file system, Integer: size)
|
||||
|
@ -53,7 +56,7 @@ module LogStash
|
|||
file_system = get_file_system(queue_path)
|
||||
|
||||
check_page_capacity(err_msg, pipeline_id, max_bytes, page_capacity)
|
||||
check_queue_usage(err_msg, pipeline_id, max_bytes, used_bytes)
|
||||
check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)
|
||||
|
||||
queue_path_file_system[queue_path] = file_system
|
||||
if used_bytes < max_bytes
|
||||
|
@ -61,10 +64,11 @@ module LogStash
|
|||
end
|
||||
end
|
||||
|
||||
check_disk_space(err_msg, queue_path_file_system, required_free_bytes)
|
||||
check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
|
||||
|
||||
@last_check_pass = err_msg.empty?
|
||||
@last_check_pass = err_msg.empty? && warn_msg.empty?
|
||||
|
||||
logger.warn(warn_msg.flatten.join(" ")) unless warn_msg.empty?
|
||||
raise(LogStash::BootstrapCheckError, err_msg.flatten.join(" ")) unless err_msg.empty?
|
||||
end
|
||||
|
||||
|
@ -74,21 +78,21 @@ module LogStash
|
|||
end
|
||||
end
|
||||
|
||||
def check_queue_usage(err_msg, pipeline_id, max_bytes, used_bytes)
|
||||
def check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)
|
||||
if used_bytes > max_bytes
|
||||
err_msg << "Pipeline #{pipeline_id} current queue size (#{used_bytes}) is greater than 'queue.max_bytes' (#{max_bytes})."
|
||||
warn_msg << "Pipeline #{pipeline_id} current queue size (#{used_bytes}) is greater than 'queue.max_bytes' (#{max_bytes})."
|
||||
end
|
||||
end
|
||||
|
||||
# Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/ devices.
|
||||
# It uses the filesystem of the path and count the required bytes by filesystem
|
||||
def check_disk_space(err_msg, queue_path_file_system, required_free_bytes)
|
||||
disk_err_msg =
|
||||
def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
|
||||
disk_warn_msg =
|
||||
queue_path_file_system
|
||||
.select { |queue_path, file_system| !FsUtil.hasFreeSpace(Paths.get(queue_path), required_free_bytes.fetch(file_system, 0)) }
|
||||
.map { |queue_path, file_system| "Persistent queue path #{queue_path} is unable to allocate #{required_free_bytes.fetch(file_system, 0)} more bytes on top of its current usage." }
|
||||
.map { |queue_path, file_system| "The persistent queue on path \"#{queue_path}\" won't fit in file system \"#{file_system}\" when full. Please free or allocate #{required_free_bytes.fetch(file_system, 0)} more bytes." }
|
||||
|
||||
err_msg << disk_err_msg unless disk_err_msg.empty?
|
||||
warn_msg << disk_warn_msg unless disk_warn_msg.empty?
|
||||
end
|
||||
|
||||
def get_file_system(queue_path)
|
||||
|
|
|
@ -48,9 +48,9 @@ describe LogStash::PersistedQueueConfigValidator do
|
|||
|
||||
context("'queue.max_bytes' = 0 which is less than 'queue.page_capacity'") do
|
||||
it "should not throw" do
|
||||
expect(pq_config_validator.logger).not_to receive(:warn)
|
||||
settings.set_value("queue.max_bytes", 0)
|
||||
expect { pq_config_validator.check({}, pipeline_configs) }
|
||||
.not_to raise_error
|
||||
pq_config_validator.check({}, pipeline_configs)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -69,9 +69,9 @@ describe LogStash::PersistedQueueConfigValidator do
|
|||
end
|
||||
|
||||
it "should throw" do
|
||||
expect(pq_config_validator.logger).to receive(:warn).once.with(/greater than 'queue.max_bytes'/)
|
||||
settings.set_value("queue.max_bytes", "1mb")
|
||||
expect { pq_config_validator.check({}, pipeline_configs) }
|
||||
.to raise_error(LogStash::BootstrapCheckError, /greater than 'queue.max_bytes'/)
|
||||
pq_config_validator.check({}, pipeline_configs)
|
||||
end
|
||||
|
||||
after do
|
||||
|
@ -96,8 +96,9 @@ describe LogStash::PersistedQueueConfigValidator do
|
|||
expect(required_free_bytes.values[0]).to eq(1024**5 * 1000 * 2) # require 2000pb
|
||||
end.and_call_original
|
||||
|
||||
expect { pq_config_validator.check({}, pipeline_configs) }
|
||||
.to raise_error(LogStash::BootstrapCheckError, /is unable to allocate/)
|
||||
expect(pq_config_validator.logger).to receive(:warn).once.with(/won't fit in file system/)
|
||||
|
||||
pq_config_validator.check({}, pipeline_configs)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue