Add Metrics for dead letter queue

Add an initial set of metrics for the dead letter queue.

Metrics are supplied under the pipeline in the following format:

"dead_letter_queue": {
        "queue_size_in_bytes": ...,
      }

Metrics are populated via a PeriodicPoller

Also fixed up calculation of currentQueueSize to take account
of version headers, which was previously being skipped.

Additionally, whether the dlq is enabled, and if so, the path
of the dlq is supplied under the pipelines API endpoint

Resolves #7287

Fixes #7338
This commit is contained in:
Rob Bavey 2017-06-05 16:40:58 -04:00
parent 5b4a725a36
commit 2d2cee4d2e
10 changed files with 139 additions and 19 deletions

View file

@ -27,8 +27,8 @@ module LogStash
def pipeline(pipeline_id)
extract_metrics(
[:stats, :pipelines, pipeline_id.to_sym, :config],
:workers, :batch_size, :batch_delay, :config_reload_automatic, :config_reload_interval
)
:workers, :batch_size, :batch_delay, :config_reload_automatic, :config_reload_interval, :dead_letter_queue_enabled, :dead_letter_queue_path
).reject{|_, v|v.nil?}
rescue
{}
end

View file

@ -120,8 +120,8 @@ module LogStash
},
:reloads => stats[:reloads],
:queue => stats[:queue]
}
end
}.merge(stats[:dlq] ? {:dead_letter_queue => stats[:dlq]} : {})
end
end # module PluginsStats
end
end

View file

@ -0,0 +1,19 @@
# encoding: utf-8
require 'logstash/instrument/periodic_poller/base'
module LogStash module Instrument module PeriodicPoller
class DeadLetterQueue < Base
def initialize(metric, agent, options = {})
super(metric, options)
@metric = metric
@agent = agent
end
def collect
_, pipeline = @agent.with_running_pipelines { |pipelines| pipelines.first }
unless pipeline.nil?
pipeline.collect_dlq_stats
end
end
end
end end end

View file

@ -1,4 +1,5 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/dlq"
require "logstash/instrument/periodic_poller/os"
require "logstash/instrument/periodic_poller/jvm"
require "logstash/instrument/periodic_poller/pq"
@ -14,7 +15,8 @@ module LogStash module Instrument
@metric = metric
@periodic_pollers = [PeriodicPoller::Os.new(metric),
PeriodicPoller::JVM.new(metric),
PeriodicPoller::PersistentQueue.new(metric, queue_type, pipelines)]
PeriodicPoller::PersistentQueue.new(metric, queue_type, pipelines),
PeriodicPoller::DeadLetterQueue.new(metric, pipelines)]
end
def start

View file

@ -58,11 +58,7 @@ module LogStash; class BasePipeline
@outputs = nil
@agent = agent
if settings.get_value("dead_letter_queue.enable")
@dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"))
else
@dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
end
@dlq_writer = dlq_writer
grammar = LogStashConfigParser.new
parsed_config = grammar.parse(config_str)
@ -85,6 +81,14 @@ module LogStash; class BasePipeline
end
end
def dlq_writer
if settings.get_value("dead_letter_queue.enable")
@dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"))
else
@dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
end
end
def compile_lir
source_with_metadata = SourceWithMetadata.new("str", "pipeline", self.config_str)
LogStash::Compiler.compile_sources(source_with_metadata)
@ -245,6 +249,7 @@ module LogStash; class Pipeline < BasePipeline
# Since we start lets assume that the metric namespace is cleared
# this is useful in the context of pipeline reloading
collect_stats
collect_dlq_stats
@logger.debug("Starting pipeline", default_logging_keys)
@ -383,6 +388,9 @@ module LogStash; class Pipeline < BasePipeline
config_metric.gauge(:batch_delay, batch_delay)
config_metric.gauge(:config_reload_automatic, @settings.get("config.reload.automatic"))
config_metric.gauge(:config_reload_interval, @settings.get("config.reload.interval"))
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
config_metric.gauge(:dead_letter_queue_path, @dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
@logger.info("Starting pipeline", default_logging_keys(
"pipeline.workers" => pipeline_workers,
@ -416,6 +424,10 @@ module LogStash; class Pipeline < BasePipeline
end
end
def dlq_enabled?
@settings.get("dead_letter_queue.enable")
end
# Main body of what a worker thread does
# Repeatedly takes batches off the queue, filters, then outputs them
def worker_loop(batch_size, batch_delay)
@ -696,10 +708,16 @@ module LogStash; class Pipeline < BasePipeline
.each {|t| t.delete("status") }
end
def collect_dlq_stats
if dlq_enabled?
dlq_metric = @metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :dlq])
dlq_metric.gauge(:queue_size_in_bytes, @dlq_writer.get_current_queue_size)
end
end
def collect_stats
pipeline_metric = @metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :queue])
pipeline_metric.gauge(:type, settings.get("queue.type"))
if @queue.is_a?(LogStash::Util::WrappedAckedQueue) && @queue.queue.is_a?(LogStash::AckedQueue)
queue = @queue.queue
dir_path = queue.dir_path

View file

@ -115,7 +115,8 @@ describe LogStash::Api::Modules::Node do
"batch_size" => Numeric,
"batch_delay" => Numeric,
"config_reload_automatic" => Boolean,
"config_reload_interval" => Numeric
"config_reload_interval" => Numeric,
"dead_letter_queue_enabled" => Boolean
}
},
"os" => {

View file

@ -0,0 +1,17 @@
# encoding: utf-8
require "spec_helper"
require "logstash/instrument/periodic_poller/dlq"
require "logstash/instrument/collector"
describe LogStash::Instrument::PeriodicPoller::DeadLetterQueue do
subject { LogStash::Instrument::PeriodicPoller::DeadLetterQueue }
let(:metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) }
let(:agent) { double("agent")}
let(:options) { {} }
subject(:dlq) { described_class.new(metric, agent, options) }
it "should initialize cleanly" do
expect { dlq }.not_to raise_error
end
end

View file

@ -104,12 +104,19 @@ describe LogStash::Pipeline do
let(:worker_thread_count) { 5 }
let(:safe_thread_count) { 1 }
let(:override_thread_count) { 42 }
let(:dead_letter_queue_enabled) { false }
let(:dead_letter_queue_path) { }
let(:pipeline_settings_obj) { LogStash::SETTINGS }
let(:pipeline_settings) { {} }
before :each do
pipeline_workers_setting = LogStash::SETTINGS.get_setting("pipeline.workers")
allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count)
dlq_enabled_setting = LogStash::SETTINGS.get_setting("dead_letter_queue.enable")
allow(dlq_enabled_setting).to receive(:value).and_return(dead_letter_queue_enabled)
dlq_path_setting = LogStash::SETTINGS.get_setting("path.dead_letter_queue")
allow(dlq_path_setting).to receive(:value).and_return(dead_letter_queue_path)
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
end
@ -840,6 +847,33 @@ describe LogStash::Pipeline do
expect(collected_metric[:stats][:pipelines][:main][:plugins][:filters][plugin_name][:name].value).to eq(LogStash::Filters::Multiline.config_name)
end
end
context 'when dlq is disabled' do
let (:collect_stats) { subject.collect_dlq_stats}
let (:collected_stats) { collected_metric[:stats][:pipelines][:main][:dlq]}
let (:available_stats) {[:path, :queue_size_in_bytes]}
it 'should show not show any dlq stats' do
collect_stats
expect(collected_stats).to be_nil
end
end
context 'when dlq is enabled' do
let (:dead_letter_queue_enabled) { true }
let (:dead_letter_queue_path) { Stud::Temporary.directory }
let (:pipeline_dlq_path) { "#{dead_letter_queue_path}/#{pipeline_id}"}
let (:collect_stats) { subject.collect_dlq_stats }
let (:collected_stats) { collected_metric[:stats][:pipelines][:main][:dlq]}
it 'should show dlq stats' do
collect_stats
# A newly created dead letter queue with no entries will have a size of 1 (the version 'header')
expect(collected_stats[:queue_size_in_bytes].value).to eq(1)
end
end
end
end

View file

@ -32,6 +32,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
@ -45,10 +46,10 @@ public class DeadLetterQueueWriter {
static final String LOCK_FILE = ".lock";
private final long maxSegmentSize;
private final long maxQueueSize;
private LongAdder currentQueueSize;
private final Path queuePath;
private final FileLock lock;
private RecordIOWriter currentWriter;
private long currentQueueSize;
private int currentSegmentIndex;
private Timestamp lastEntryTimestamp;
private boolean open;
@ -68,11 +69,11 @@ public class DeadLetterQueueWriter {
}
throw new RuntimeException("uh oh, someone else is writing to this dead-letter queue");
}
this.queuePath = queuePath;
this.maxSegmentSize = maxSegmentSize;
this.maxQueueSize = maxQueueSize;
this.currentQueueSize = getStartupQueueSize();
this.currentQueueSize = new LongAdder();
this.currentQueueSize.add(getStartupQueueSize());
currentSegmentIndex = getSegmentPaths(queuePath)
.map(s -> s.getFileName().toString().split("\\.")[0])
@ -106,7 +107,9 @@ public class DeadLetterQueueWriter {
}
private RecordIOWriter nextWriter() throws IOException {
return new RecordIOWriter(queuePath.resolve(String.format(SEGMENT_FILE_PATTERN, ++currentSegmentIndex)));
RecordIOWriter recordIOWriter = new RecordIOWriter(queuePath.resolve(String.format(SEGMENT_FILE_PATTERN, ++currentSegmentIndex)));
currentQueueSize.increment();
return recordIOWriter;
}
static Stream<Path> getSegmentPaths(Path path) throws IOException {
@ -130,17 +133,16 @@ public class DeadLetterQueueWriter {
private void innerWriteEntry(DLQEntry entry) throws IOException {
byte[] record = entry.serialize();
int eventPayloadSize = RECORD_HEADER_SIZE + record.length;
if (currentQueueSize + eventPayloadSize > maxQueueSize) {
if (currentQueueSize.longValue() + eventPayloadSize > maxQueueSize) {
logger.error("cannot write event to DLQ: reached maxQueueSize of " + maxQueueSize);
return;
} else if (currentWriter.getPosition() + eventPayloadSize > maxSegmentSize) {
currentWriter.close();
currentWriter = nextWriter();
}
currentQueueSize += currentWriter.writeEvent(record);
currentQueueSize.add(currentWriter.writeEvent(record));
}
public synchronized void close() throws IOException {
this.lock.release();
if (currentWriter != null) {
@ -153,4 +155,12 @@ public class DeadLetterQueueWriter {
public boolean isOpen() {
return open;
}
public Path getPath(){
return queuePath;
}
public long getCurrentQueueSize() {
return currentQueueSize.longValue();
}
}

View file

@ -48,6 +48,25 @@ describe "Test Monitoring API" do
end
end
it 'can retrieve dlq stats' do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin
logstash_service.wait_for_logstash
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
expect(result).not_to be_nil
# we use fetch here since we want failed fetches to raise an exception
# and trigger the retry block
queue_stats = result.fetch('pipelines').fetch('main')['dead_letter_queue']
if logstash_service.settings.get("dead_letter_queue.enable")
expect(queue_stats['queue_size_in_bytes']).not_to be_nil
else
expect(queue_stats).to be nil
end
end
end
it "can retrieve queue stats" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin