add queue stats to node/stats api (#6331)

* add queue stats to node/stats api

example queue section:

```
"queue" : {
    "type" : "persisted",
    "capacity" : {
      "page_capacity_in_bytes" : 262144000,
      "max_queue_size_in_bytes" : 1073741824,
      "max_unread_events" : 0
    },
    "data" : {
      "free_space_in_bytes" : 33851523072,
      "current_size_in_bytes" : 262144000,
      "storage_type" : "hfs",
      "path" : "/logstash/data/queue"
    },
    "events" : {
      "acked_count" : 0,
      "unread_count" : 0,
      "unacked_count" : 0
    }
}
```

Closes #6182.

* migrate to use period metric pollers for storing queue stats per pipeline
This commit is contained in:
Tal Levy 2016-12-15 13:14:47 -08:00 committed by GitHub
parent 9c8bf203e2
commit 2b45a9b4ae
14 changed files with 283 additions and 10 deletions

View file

@ -87,6 +87,46 @@ public class JrubyAckedQueueExtLibrary implements Library {
return context.nil;
}
@JRubyMethod(name = "max_unread_events")
public IRubyObject ruby_max_unread_events(ThreadContext context) {
return context.runtime.newFixnum(queue.getMaxUnread());
}
@JRubyMethod(name = "max_size_in_bytes")
public IRubyObject ruby_max_size_in_bytes(ThreadContext context) {
return context.runtime.newFixnum(queue.getMaxBytes());
}
@JRubyMethod(name = "page_capacity")
public IRubyObject ruby_page_capacity(ThreadContext context) {
return context.runtime.newFixnum(queue.getPageCapacity());
}
@JRubyMethod(name = "dir_path")
public IRubyObject ruby_dir_path(ThreadContext context) {
return context.runtime.newString(queue.getDirPath());
}
@JRubyMethod(name = "current_byte_size")
public IRubyObject ruby_current_byte_size(ThreadContext context) {
return context.runtime.newFixnum(queue.getCurrentByteSize());
}
@JRubyMethod(name = "acked_count")
public IRubyObject ruby_acked_count(ThreadContext context) {
return context.runtime.newFixnum(queue.getAckedCount());
}
@JRubyMethod(name = "unacked_count")
public IRubyObject ruby_unacked_count(ThreadContext context) {
return context.runtime.newFixnum(queue.getUnackedCount());
}
@JRubyMethod(name = "unread_count")
public IRubyObject ruby_unread_count(ThreadContext context) {
return context.runtime.newFixnum(queue.getUnreadCount());
}
@JRubyMethod(name = "open")
public IRubyObject ruby_open(ThreadContext context)
{

View file

@ -81,6 +81,46 @@ public class JrubyAckedQueueMemoryExtLibrary implements Library {
return context.nil;
}
@JRubyMethod(name = "max_unread_events")
public IRubyObject ruby_max_unread_events(ThreadContext context) {
return context.runtime.newFixnum(queue.getMaxUnread());
}
@JRubyMethod(name = "max_size_in_bytes")
public IRubyObject ruby_max_size_in_bytes(ThreadContext context) {
return context.runtime.newFixnum(queue.getMaxBytes());
}
@JRubyMethod(name = "page_capacity")
public IRubyObject ruby_page_capacity(ThreadContext context) {
return context.runtime.newFixnum(queue.getPageCapacity());
}
@JRubyMethod(name = "dir_path")
public IRubyObject ruby_dir_path(ThreadContext context) {
return context.runtime.newString(queue.getDirPath());
}
@JRubyMethod(name = "current_byte_size")
public IRubyObject ruby_current_byte_size(ThreadContext context) {
return context.runtime.newFixnum(queue.getCurrentByteSize());
}
@JRubyMethod(name = "acked_count")
public IRubyObject ruby_acked_count(ThreadContext context) {
return context.runtime.newFixnum(queue.getAckedCount());
}
@JRubyMethod(name = "unread_count")
public IRubyObject ruby_unread_count(ThreadContext context) {
return context.runtime.newFixnum(queue.getUnreadCount());
}
@JRubyMethod(name = "unacked_count")
public IRubyObject ruby_unacked_count(ThreadContext context) {
return context.runtime.newFixnum(queue.getUnackedCount());
}
@JRubyMethod(name = "open")
public IRubyObject ruby_open(ThreadContext context)
{

View file

@ -178,6 +178,12 @@ class LogStash::Agent
@id_path ||= ::File.join(settings.get("path.data"), "uuid")
end
def running_pipelines
@upgrade_mutex.synchronize do
@pipelines.select {|pipeline_id, _| running_pipeline?(pipeline_id) }
end
end
def running_pipelines?
@upgrade_mutex.synchronize do
@pipelines.select {|pipeline_id, _| running_pipeline?(pipeline_id) }.any?
@ -209,7 +215,9 @@ class LogStash::Agent
end
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric)
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric,
settings.get("queue.type"),
self)
@periodic_pollers.start
end
@ -325,8 +333,9 @@ class LogStash::Agent
def start_pipelines
@instance_reload_metric.increment(:successes, 0)
@instance_reload_metric.increment(:failures, 0)
@pipelines.each do |id, _|
@pipelines.each do |id, pipeline|
start_pipeline(id)
pipeline.collect_stats
# no reloads yet, initalize all the reload metrics
init_pipeline_reload_metrics(id)
end

View file

@ -3,6 +3,9 @@ require "logstash/api/commands/base"
require 'logstash/util/thread_dump'
require_relative "hot_threads_reporter"
java_import java.nio.file.Files
java_import java.nio.file.Paths
module LogStash
module Api
module Commands
@ -19,10 +22,10 @@ module LogStash
:uptime_in_millis => service.get_shallow(:jvm, :uptime_in_millis),
}
end
def reloads
service.get_shallow(:stats, :reloads)
end
end
def process
extract_metrics(
@ -106,6 +109,7 @@ module LogStash
:outputs => plugin_stats(stats, :outputs)
},
:reloads => stats[:reloads],
:queue => stats[:queue]
}
end
end # module PluginsStats

View file

@ -31,7 +31,7 @@ module LogStash
def jvm_payload
@stats.jvm
end
def reloads
@stats.reloads
end

View file

@ -0,0 +1,20 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/base"
module LogStash module Instrument module PeriodicPoller
class PersistentQueue < Base
def initialize(metric, queue_type, agent, options = {})
super(metric, options)
@metric = metric
@queue_type = queue_type
@agent = agent
end
def collect
pipeline_id, pipeline = @agent.running_pipelines.first
unless pipeline.nil?
pipeline.collect_stats
end
end
end
end; end; end

View file

@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/os"
require "logstash/instrument/periodic_poller/jvm"
require "logstash/instrument/periodic_poller/pq"
module LogStash module Instrument
# Each PeriodPoller manager his own thread to do the poller
@ -9,10 +10,11 @@ module LogStash module Instrument
class PeriodicPollers
attr_reader :metric
def initialize(metric)
def initialize(metric, queue_type, pipelines)
@metric = metric
@periodic_pollers = [PeriodicPoller::Os.new(metric),
PeriodicPoller::JVM.new(metric)]
PeriodicPoller::JVM.new(metric),
PeriodicPoller::PersistentQueue.new(metric, queue_type, pipelines)]
end
def start

View file

@ -40,7 +40,8 @@ module LogStash; class Pipeline
:settings,
:metric,
:filter_queue_client,
:input_queue_client
:input_queue_client,
:queue
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
@ -549,6 +550,35 @@ module LogStash; class Pipeline
end
end
def collect_stats
pipeline_metric = @metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :queue])
pipeline_metric.gauge(:type, settings.get("queue.type"))
if @queue.is_a?(LogStash::Util::WrappedAckedQueue) && @queue.queue.is_a?(LogStash::AckedQueue)
queue = @queue.queue
dir_path = queue.dir_path
file_store = Files.get_file_store(Paths.get(dir_path))
pipeline_metric.namespace([:capacity]).tap do |n|
n.gauge(:page_capacity_in_bytes, queue.page_capacity)
n.gauge(:max_queue_size_in_bytes, queue.max_size_in_bytes)
n.gauge(:max_unread_events, queue.max_unread_events)
end
pipeline_metric.namespace([:data]).tap do |n|
n.gauge(:free_space_in_bytes, file_store.get_unallocated_space)
n.gauge(:current_size_in_bytes, queue.current_byte_size)
n.gauge(:storage_type, file_store.type)
n.gauge(:path, dir_path)
end
pipeline_metric.namespace([:events]).tap do |n|
n.gauge(:acked_count, queue.acked_count)
n.gauge(:unacked_count, queue.unacked_count)
n.gauge(:unread_count, queue.unread_count)
end
end
end
# Sometimes we log stuff that will dump the pipeline which may contain
# sensitive information (like the raw syntax tree which can contain passwords)
# We want to hide most of what's in here

View file

@ -33,6 +33,8 @@ module LogStash; module Util
private_class_method :new
attr_reader :queue
def with_queue(queue)
@queue = queue
@queue.open

View file

@ -106,6 +106,30 @@ public class Queue implements Closeable {
}
}
public String getDirPath() {
return this.dirPath;
}
public long getMaxBytes() {
return this.maxBytes;
}
public long getMaxUnread() {
return this.maxUnread;
}
public long getCurrentByteSize() {
return this.currentByteSize;
}
public int getPageCapacity() {
return this.pageCapacity;
}
public long getUnreadCount() {
return this.unreadCount;
}
// moved queue opening logic in open() method until we have something in place to used in-memory checkpoints for testing
// because for now we need to pass a Queue instance to the Page and we don't want to trigger a Queue recovery when
// testing Page
@ -585,6 +609,19 @@ public class Queue implements Closeable {
return this.tailPages.get(0).getPageNum();
}
public long getAckedCount() {
return headPage.ackedSeqNums.cardinality() + tailPages.stream()
.mapToLong(page -> page.ackedSeqNums.cardinality())
.sum();
}
public long getUnackedCount() {
long headPageCount = (headPage.getElementCount() - headPage.ackedSeqNums.cardinality());
long tailPagesCount = tailPages.stream()
.mapToLong(page -> (page.getElementCount() - page.ackedSeqNums.cardinality())).sum();
return headPageCount + tailPagesCount;
}
protected long nextSeqNum() {
return this.seqNum += 1;
}

View file

@ -1,11 +1,15 @@
package org.logstash.ackedqueue;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.common.io.ByteBufferPageIO;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@ -22,6 +26,13 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
public class QueueTest {
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
private String dataPath;
@Before
public void setUp() throws Exception {
dataPath = temporaryFolder.newFolder("data").getPath();
}
@Test
public void newQueue() throws IOException {
@ -507,4 +518,53 @@ public class QueueTest {
executor.shutdown();
}
}
@Test
public void testAckedCount() throws IOException {
Settings settings = TestSettings.getSettingsCheckpointFilePageMemory(100, dataPath);
Queue q = new Queue(settings);
q.open();
Queueable element1 = new StringElement("foobarbaz");
Queueable element2 = new StringElement("wowza");
Queueable element3 = new StringElement("third");
long firstSeqNum = q.write(element1);
Batch b = q.nonBlockReadBatch(1);
assertThat(b.getElements().size(), is(equalTo(1)));
q.close();
q = new Queue(settings);
q.open();
long secondSeqNum = q.write(element2);
long thirdSeqNum = q.write(element3);
b = q.nonBlockReadBatch(1);
assertThat(b.getElements().size(), is(equalTo(1)));
assertThat(b.getElements().get(0), is(equalTo(element1)));
b = q.nonBlockReadBatch(2);
assertThat(b.getElements().size(), is(equalTo(2)));
assertThat(b.getElements().get(0), is(equalTo(element2)));
assertThat(b.getElements().get(1), is(equalTo(element3)));
q.ack(Collections.singletonList(firstSeqNum));
q.close();
q = new Queue(settings);
q.open();
b = q.nonBlockReadBatch(2);
assertThat(b.getElements().size(), is(equalTo(2)));
q.ack(Arrays.asList(secondSeqNum, thirdSeqNum));
assertThat(q.getAckedCount(), equalTo(0L));
assertThat(q.getUnackedCount(), equalTo(0L));
q.close();
}
}

View file

@ -4,6 +4,7 @@ import org.logstash.common.io.ByteBufferPageIO;
import org.logstash.common.io.CheckpointIOFactory;
import org.logstash.common.io.FileCheckpointIO;
import org.logstash.common.io.MemoryCheckpointIO;
import org.logstash.common.io.MmapPageIO;
import org.logstash.common.io.PageIOFactory;
public class TestSettings {
@ -35,10 +36,11 @@ public class TestSettings {
public static Settings getSettingsCheckpointFilePageMemory(int capacity, String folder) {
Settings s = new FileSettings(folder);
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
s.setCapacity(capacity);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointMaxWrites(1);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(StringElement.class);
return s;

View file

@ -1,6 +1,8 @@
# Base class for a service like Kafka, ES, Logstash
class Service
attr_reader :settings
def initialize(name, settings)
@name = name
@settings = settings

View file

@ -47,4 +47,29 @@ describe "Test Monitoring API" do
end
end
it "can retrieve queue stats" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin
logstash_service.wait_for_logstash
Stud.try(max_retry.times, RSpec::Expectations::ExpectationNotMetError) do
result = logstash_service.monitoring_api.node_stats
expect(result["pipeline"]["queue"]).not_to be_nil
if logstash_service.settings.feature_flag == "persistent_queues"
expect(result["pipeline"]["queue"]["type"]).to eq "persisted"
expect(result["pipeline"]["queue"]["data"]["free_space_in_bytes"]).not_to be_nil
expect(result["pipeline"]["queue"]["data"]["current_size_in_bytes"]).not_to be_nil
expect(result["pipeline"]["queue"]["data"]["storage_type"]).not_to be_nil
expect(result["pipeline"]["queue"]["data"]["path"]).not_to be_nil
expect(result["pipeline"]["queue"]["events"]["acked_count"]).not_to be_nil
expect(result["pipeline"]["queue"]["events"]["unread_count"]).not_to be_nil
expect(result["pipeline"]["queue"]["events"]["unacked_count"]).not_to be_nil
expect(result["pipeline"]["queue"]["capacity"]["page_capacity_in_bytes"]).not_to be_nil
expect(result["pipeline"]["queue"]["capacity"]["max_queue_size_in_bytes"]).not_to be_nil
expect(result["pipeline"]["queue"]["capacity"]["max_unread_events"]).not_to be_nil
else
expect(result["pipeline"]["queue"]["type"]).to eq "memory"
end
end
end
end