live timers: API boundary, implementation, opt-in (#14748)

* live timers: introduce API boundary

Introduces an API boundary for timers as a first-class metric, as described
in elastic/logstash#14675, and migrates all known internal timers to use the
new API boundary for tracked execution.

Please refer to the specification for details on motivations.

This commit is net zero change to behaviour, and introduces a single new
undocumented setting `metric.timers` to `logstash.yml`, which presently only
takes its default value `delayed` to indicate that delayed committing of
execution time is acceptable.

It implements the new `TimerMetric` API in a way that is also net-zero-change.
Tracked executions are still performed by marking a start time, performing
the tracked execution, and incrementing an underlying long-type counter with
the number of elapsed milliseconds _after_ execution has completed. This means
that long-running execution is still missing from the metric until it has
completed.

The new Timer API is available to both the Ruby- and the Java-based plugin APIs

* timer metrics: sub-package and add baseline tests

* WIP: move execution metric ownership out of queue

* noop: remove useless abstract method

Our `AbstractMetric` implements `Metric` and does not need to declare
an abstract override of `Metric#getType`. Doing so prevents interfaces
from providing a default override for all implementers.

* timer metric tests: extract util, refactor for reuse

* timers: accumulate milli-excess-nanos

* live timers: single-checkpoint implementation

* timer metric: use explicit type parameters to make intent clear

* remove unused imports

* use safe int conversion

* test fixup: use given name for tested metric

* test helper: TimerMetricFactory prefers nanotime supplier

* timers: flesh out test coverage, incl live-timers

* test: move validation of queue-read metrics to ObservedExecution

* flow: support non-moving denominator (±infinity)

* metrics: add metric config pass-through to env2yaml
This commit is contained in:
Ry Biesemeyer 2022-12-13 13:35:53 -08:00 committed by GitHub
parent f21edfb7ea
commit e121650e56
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
61 changed files with 1596 additions and 339 deletions

View file

@ -94,6 +94,8 @@ func normalizeSetting(setting string) (string, error) {
"log.level",
"log.format",
"modules",
"metric.collect",
"metric.timers",
"path.logs",
"path.plugins",
"api.auth.type",

View file

@ -101,6 +101,7 @@ tasks.register("javaTests", Test) {
exclude '/org/logstash/plugins/NamespacedMetricImplTest.class'
exclude '/org/logstash/plugins/CounterMetricImplTest.class'
exclude '/org/logstash/plugins/factory/PluginFactoryExtTest.class'
exclude '/org/logstash/execution/ObservedExecutionTest.class'
}
tasks.register("rubyTests", Test) {
@ -117,6 +118,7 @@ tasks.register("rubyTests", Test) {
include '/org/logstash/plugins/NamespacedMetricImplTest.class'
include '/org/logstash/plugins/CounterMetricImplTest.class'
include '/org/logstash/plugins/factory/PluginFactoryExtTest.class'
include '/org/logstash/execution/ObservedExecutionTest.class'
}
test {

View file

@ -554,8 +554,8 @@ class LogStash::Agent
flow_metrics << create_flow_metric("input_throughput", get_counter(events_namespace, :in), uptime_precise_seconds)
flow_metrics << create_flow_metric("filter_throughput", get_counter(events_namespace, :out), uptime_precise_seconds)
flow_metrics << create_flow_metric("output_throughput", get_counter(events_namespace, :filtered), uptime_precise_seconds)
flow_metrics << create_flow_metric("queue_backpressure", get_counter(events_namespace, :queue_push_duration_in_millis), uptime_precise_millis)
flow_metrics << create_flow_metric("worker_concurrency", get_counter(events_namespace, :duration_in_millis), uptime_precise_millis)
flow_metrics << create_flow_metric("queue_backpressure", get_timer(events_namespace, :queue_push_duration_in_millis), uptime_precise_millis)
flow_metrics << create_flow_metric("worker_concurrency", get_timer(events_namespace, :duration_in_millis), uptime_precise_millis)
registered, unregistered = flow_metrics.partition do |flow_metric|
@metric.collector.register?([:stats,:flow], flow_metric.name.to_sym, flow_metric)
@ -574,6 +574,11 @@ class LogStash::Agent
end
private :get_counter
def get_timer(namespace, key)
org.logstash.instrument.metrics.timer.TimerMetric.fromRubyBase(namespace, key)
end
private :get_timer
def create_flow_metric(name, numerator_metric, denominator_metric)
org.logstash.instrument.metrics.FlowMetric.create(name, numerator_metric, denominator_metric)
end

View file

@ -34,12 +34,12 @@ module LogStash::Codecs
@encode_metric = __getobj__.metric.namespace(:encode)
@encode_metric.counter(:writes_in)
@encode_metric.report_time(:duration_in_millis, 0)
@encode_metric.timer(:duration_in_millis)
@decode_metric = __getobj__.metric.namespace(:decode)
@decode_metric.counter(:writes_in)
@decode_metric.counter(:out)
@decode_metric.report_time(:duration_in_millis, 0)
@decode_metric.timer(:duration_in_millis)
end
def encode(event)

View file

@ -52,6 +52,7 @@ module LogStash
Setting::Boolean.new("config.support_escapes", false),
Setting::String.new("config.field_reference.escape_style", "none", true, %w(none percent ampersand)),
Setting::Boolean.new("metric.collect", true),
Setting::String.new("metric.timers", "delayed", true, %w(delayed live)),
Setting::String.new("pipeline.id", "main"),
Setting::Boolean.new("pipeline.system", false),
Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum),
@ -127,6 +128,7 @@ module LogStash
java.lang.System.setProperty("ls.log.format", settings.get("log.format"))
java.lang.System.setProperty("ls.log.level", settings.get("log.level"))
java.lang.System.setProperty("ls.pipeline.separate_logs", settings.get("pipeline.separate_logs").to_s)
java.lang.System.setProperty("ls.metric.timers", settings.get("metric.timers"))
unless java.lang.System.getProperty("log4j.configurationFile")
log4j_config_location = ::File.join(settings.get("path.settings"), "log4j2.properties")

View file

@ -47,7 +47,10 @@ module LogStash module Instrument
#
def push(namespaces_path, key, type, *metric_type_params)
begin
get(namespaces_path, key, type).execute(*metric_type_params)
metric_proxy = get(namespaces_path, key, type)
return metric_proxy.execute(*metric_type_params) if metric_proxy.respond_to?(:execute)
logger.error("Collector: Cannot record metric action #{type}@#{metric_type_params.join('/')} on <#{metric_proxy}> at path #{namespaces_path.join('/')}/#{key}")
rescue MetricStore::NamespacesExpectedError => e
logger.error("Collector: Cannot record metric", :exception => e)
rescue NameError => e
@ -63,10 +66,15 @@ module LogStash module Instrument
def get(namespaces_path, key, type)
@metric_store.fetch_or_store(namespaces_path, key) do
LogStash::Instrument::MetricType.create(type, namespaces_path, key)
initialize_metric(type, namespaces_path, key)
end
end
# test injection, see MetricExtFactory
def initialize_metric(type, namespaces_path, key)
MetricType.create(type, namespaces_path, key)
end
##
# Ensures that a metric on the provided `namespaces_path` with the provided `key`
# is registered, using the provided `metric_instance` IFF it is not already present.

View file

@ -17,16 +17,9 @@
require "logstash/instrument/metric_type/counter"
require "logstash/instrument/metric_type/gauge"
require "logstash/instrument/metric_type/uptime"
module LogStash module Instrument
module MetricType
METRIC_TYPE_LIST = {
:counter => LogStash::Instrument::MetricType::Counter,
:gauge => LogStash::Instrument::MetricType::Gauge,
:uptime => LogStash::Instrument::MetricType::Uptime,
}.freeze
# Use the string to generate a concrete class for this metrics
#
# @param [String] The name of the class
@ -34,7 +27,13 @@ module LogStash module Instrument
# @param [String] The metric key
# @raise [NameError] If the class is not found
def self.create(type, namespaces, key)
METRIC_TYPE_LIST[type].new(namespaces, key)
case type
when :counter then return LogStash::Instrument::MetricType::Counter.new(namespaces, key)
when :gauge then return LogStash::Instrument::MetricType::Gauge.new(namespaces, key)
when :uptime then return org.logstash.instrument.metrics.UptimeMetric.new(key.to_s)
when :timer then return org.logstash.instrument.metrics.timer.TimerMetric::create(key.to_s)
else fail NameError, "Unknown Metric Type `#{type}`"
end
end
end
end; end

View file

@ -1,32 +0,0 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
java_import org.logstash.instrument.metrics.UptimeMetric
module LogStash module Instrument module MetricType
class Uptime < UptimeMetric
def initialize(namespaces, key)
super(key.to_s)
end
def execute(action, value = nil)
fail("Unsupported operation `action` on Uptime Metric")
end
end
end; end; end

View file

@ -50,6 +50,9 @@ module LogStash; class JavaPipeline < AbstractPipeline
@worker_threads = []
@worker_observer = org.logstash.execution.WorkerObserver.new(process_events_namespace_metric,
pipeline_events_namespace_metric)
@drain_queue = settings.get_value("queue.drain") || settings.get("queue.type") == MEMORY
@events_filtered = java.util.concurrent.atomic.LongAdder.new
@ -578,15 +581,19 @@ module LogStash; class JavaPipeline < AbstractPipeline
def init_worker_loop
begin
org.logstash.execution.WorkerLoop.new(
lir_execution,
filter_queue_client,
@events_filtered,
@events_consumed,
@flushRequested,
@flushing,
@shutdownRequested,
@drain_queue,
@preserve_event_order)
filter_queue_client, # QueueReadClient
lir_execution, # CompiledPipeline
@worker_observer, # WorkerObserver
# pipeline reporter counters
@events_consumed, # LongAdder
@events_filtered, # LongAdder
# signaling channels
@flushRequested, # AtomicBoolean
@flushing, # AtomicBoolean
@shutdownRequested, # AtomicBoolean
# behaviour config pass-through
@drain_queue, # boolean
@preserve_event_order) # boolean
rescue => e
@logger.error(
"Worker loop initialization error",

View file

@ -488,7 +488,7 @@ module LogStash
def validate(value)
unless valid?(value)
raise ArgumentError.new("Invalid value \"#{value}, valid options are within the range of #{Port::VALID_PORT_RANGE.first}-#{Port::VALID_PORT_RANGE.last}")
raise ArgumentError.new("Invalid value \"#{name}: #{value}\", valid options are within the range of #{Port::VALID_PORT_RANGE.first}-#{Port::VALID_PORT_RANGE.last}")
end
end
end
@ -513,7 +513,7 @@ module LogStash
def validate(value)
super(value)
unless @possible_strings.empty? || @possible_strings.include?(value)
raise ArgumentError.new("Invalid value \"#{value}\". Options are: #{@possible_strings.inspect}")
raise ArgumentError.new("Invalid value \"#{name}: #{value}\". Options are: #{@possible_strings.inspect}")
end
end
end

View file

@ -19,7 +19,7 @@ require "logstash/instrument/collector"
require "spec_helper"
describe LogStash::Instrument::Collector do
subject { LogStash::Instrument::Collector.new }
subject(:metrics_collector) { LogStash::Instrument::Collector.new }
describe "#push" do
let(:namespaces_path) { [:root, :pipelines, :pipelines01] }
let(:key) { :my_key }
@ -57,6 +57,30 @@ describe LogStash::Instrument::Collector do
end
end
describe '#get' do
let(:namespaces_path) { [:root, :pipelines, :pipelines01] }
let(:key) { :my_key }
{
counter: LogStash::Instrument::MetricType::Counter,
gauge: LogStash::Instrument::MetricType::Gauge,
uptime: org.logstash.instrument.metrics.UptimeMetric,
timer: org.logstash.instrument.metrics.timer.TimerMetric,
}.each do |type, type_specific_implementation|
context "with (type: `#{type}`)" do
it "gets an instance of #{type_specific_implementation}" do
expect(metrics_collector.get(namespaces_path, key, type)).to be_a_kind_of(type_specific_implementation)
end
it 'gets a singleton instance from multiple consecutive calls' do
first = metrics_collector.get(namespaces_path, key, type)
second = metrics_collector.get(namespaces_path, key, type)
expect(second).to equal(first)
end
end
end
end
describe "#snapshot_metric" do
it "return a `LogStash::Instrument::MetricStore`" do
expect(subject.snapshot_metric).to be_kind_of(LogStash::Instrument::Snapshot)

View file

@ -20,20 +20,22 @@ require_relative "../../support/matchers"
require "spec_helper"
describe LogStash::Instrument::Metric do
let(:collector) { [] }
let(:collector) { LogStash::Instrument::Collector.new }
let(:namespace) { :root }
subject { LogStash::Instrument::Metric.new(collector) }
before(:each) { allow(collector).to receive(:push).and_call_original }
context "#increment" do
it "a counter by 1" do
metric = subject.increment(:root, :error_rate)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :increment, 1)
subject.increment(:root, :error_rate)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :increment, 1)
end
it "a counter by a provided value" do
metric = subject.increment(:root, :error_rate, 20)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :increment, 20)
subject.increment(:root, :error_rate, 20)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :increment, 20)
end
it "raises an exception if the key is an empty string" do
@ -45,15 +47,15 @@ describe LogStash::Instrument::Metric do
end
end
context "#decrement" do
context "#decrement", skip: "LongCounter impl does not support decrement" do
it "a counter by 1" do
metric = subject.decrement(:root, :error_rate)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :decrement, 1)
subject.decrement(:root, :error_rate)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :decrement, 1)
end
it "a counter by a provided value" do
metric = subject.decrement(:root, :error_rate, 20)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :decrement, 20)
subject.decrement(:root, :error_rate, 20)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :decrement, 20)
end
it "raises an exception if the key is an empty string" do
@ -67,8 +69,8 @@ describe LogStash::Instrument::Metric do
context "#gauge" do
it "set the value of a key" do
metric = subject.gauge(:root, :size_queue, 20)
expect(collector).to be_a_metric_event([:root, :size_queue], :gauge, :set, 20)
subject.gauge(:root, :size_queue, 20)
expect(collector).to have_received(:push).with([:root], :size_queue, :gauge, :set, 20)
end
it "raises an exception if the key is an empty string" do
@ -87,10 +89,8 @@ describe LogStash::Instrument::Metric do
it "records the duration" do
subject.time(:root, :duration_ms) { sleep(sleep_time) }
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 5)
expect(collector[0]).to match(:root)
expect(collector[1]).to be(:duration_ms)
expect(collector[2]).to be(:counter)
timer = subject.timer(:root, :duration_ms)
expect(timer.value).to be_within(50).of(sleep_time_ms)
end
it "returns the value of the executed block" do
@ -100,13 +100,14 @@ describe LogStash::Instrument::Metric do
it "return a TimedExecution" do
execution = subject.time(:root, :duration_ms)
sleep(sleep_time)
timer = subject.timer(:root, :duration_ms)
expect(timer.value).to eq(0) # no live tracking without a block
execution_time = execution.stop
expect(execution_time).to eq(collector.last)
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 0.1)
expect(collector[0]).to match(:root)
expect(collector[1]).to be(:duration_ms)
expect(collector[2]).to be(:counter)
expect(execution_time).to be_within(50).of(sleep_time_ms)
expect(timer.value).to be_within(50).of(sleep_time_ms)
end
end
@ -119,8 +120,8 @@ describe LogStash::Instrument::Metric do
it "uses the same collector as the creator class" do
child = subject.namespace(sub_key)
metric = child.increment(:error_rate)
expect(collector).to be_a_metric_event([sub_key, :error_rate], :counter, :increment, 1)
child.increment(:error_rate)
expect(collector).to have_received(:push).with([:my_sub_key], :error_rate, :counter, :increment, 1)
end
end
end

View file

@ -21,11 +21,13 @@ require "spec_helper"
describe LogStash::Instrument::NamespacedMetric do
let(:namespace) { :root }
let(:collector) { [] }
let(:collector) { LogStash::Instrument::Collector.new }
let(:metric) { LogStash::Instrument::Metric.new(collector) }
subject { described_class.new(metric, namespace) }
before(:each) { allow(collector).to receive(:push).and_call_original }
it "defines the same interface as `Metric`" do
expect(described_class).to implement_interface_of(LogStash::Instrument::Metric)
end
@ -47,32 +49,32 @@ describe LogStash::Instrument::NamespacedMetric do
context "#increment" do
it "a counter by 1" do
metric = subject.increment(:error_rate)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :increment, 1)
subject.increment(:error_rate)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :increment, 1)
end
it "a counter by a provided value" do
metric = subject.increment(:error_rate, 20)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :increment, 20)
subject.increment(:error_rate, 20)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :increment, 20)
end
end
context "#decrement" do
context "#decrement", skip: "LongCounter impl does not support decrement" do
it "a counter by 1" do
metric = subject.decrement(:error_rate)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :decrement, 1)
subject.decrement(:error_rate)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :decrement, 1)
end
it "a counter by a provided value" do
metric = subject.decrement(:error_rate, 20)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :decrement, 20)
subject.decrement(:error_rate, 20)
expect(collector).to have_received(:push).with([:root], :error_rate, :counter, :decrement, 20)
end
end
context "#gauge" do
it "set the value of a key" do
metric = subject.gauge(:size_queue, 20)
expect(collector).to be_a_metric_event([:root, :size_queue], :gauge, :set, 20)
subject.gauge(:size_queue, 20)
expect(collector).to have_received(:push).with([:root], :size_queue, :gauge, :set, 20)
end
end
@ -83,22 +85,36 @@ describe LogStash::Instrument::NamespacedMetric do
it "records the duration" do
subject.time(:duration_ms) { sleep(sleep_time) }
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 5)
expect(collector[0]).to match([:root])
expect(collector[1]).to be(:duration_ms)
expect(collector[2]).to be(:counter)
timer = metric.timer(namespace, :duration_ms)
expect(timer.value).to be_within(50).of(sleep_time_ms)
end
it "return a TimedExecution" do
execution = subject.time(:duration_ms)
sleep(sleep_time)
timer = metric.timer(namespace, :duration_ms)
expect(timer.value).to eq(0) # no live tracking without a block
execution_time = execution.stop
expect(execution_time).to eq(collector.last)
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 0.1)
expect(collector[0]).to match([:root])
expect(collector[1]).to be(:duration_ms)
expect(collector[2]).to be(:counter)
expect(execution_time).to be_within(50).of(sleep_time_ms)
expect(timer.value).to be_within(50).of(sleep_time_ms)
end
end
context "#namespace" do
let(:namespace) { [:deeply, :nested] }
let(:sub_key) { [:even, :deeper] }
it "creates a new metric object and append the `sub_key` to the `base_key`" do
expect(subject.namespace(sub_key).namespace_name).to eq(namespace + sub_key)
end
it "uses the same collector as the creator class" do
child = subject.namespace(sub_key)
child.increment(:error_rate)
expect(collector).to have_received(:push).with((namespace + sub_key), :error_rate, :counter, :increment, 1)
end
end

View file

@ -50,9 +50,9 @@ describe LogStash::FilterDelegator do
counter
}
let(:counter_time) {
counter = metric.counter(:duration_in_millis)
counter.increment(0)
counter
timer = metric.timer(:duration_in_millis)
timer.report_untracked_millis(0)
timer
}
let(:events) { [LogStash::Event.new, LogStash::Event.new] }

View file

@ -39,65 +39,6 @@ describe LogStash::WrappedSynchronousQueue do
let(:write_client) { subject.write_client }
let(:read_client) { subject.read_client }
context "when reading from the queue" do
let(:collector) { LogStash::Instrument::Collector.new }
before do
read_client.set_events_metric(LogStash::Instrument::Metric.new(collector).namespace(:events))
read_client.set_pipeline_metric(LogStash::Instrument::Metric.new(collector).namespace(:pipeline))
end
context "when the queue is empty" do
it "doesnt record the `duration_in_millis`" do
batch = read_client.read_batch
read_client.close_batch(batch)
store = collector.snapshot_metric.metric_store
expect(store.get_shallow(:events, :out).value).to eq(0)
expect(store.get_shallow(:events, :out)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:events, :filtered).value).to eq(0)
expect(store.get_shallow(:events, :filtered)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:events, :duration_in_millis).value).to eq(0)
expect(store.get_shallow(:events, :duration_in_millis)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to eq(0)
expect(store.get_shallow(:pipeline, :duration_in_millis)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:pipeline, :out).value).to eq(0)
expect(store.get_shallow(:pipeline, :out)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:pipeline, :filtered).value).to eq(0)
expect(store.get_shallow(:pipeline, :filtered)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
end
end
context "when we have item in the queue" do
it "records the `duration_in_millis`" do
batch = []
5.times {|i| batch.push(LogStash::Event.new({"message" => "value-#{i}"}))}
write_client.push_batch(batch)
read_batch = read_client.read_batch.to_java
sleep(0.1) # simulate some work for the `duration_in_millis`
# TODO: this interaction should be cleaned in an upcoming PR,
# This is what the current pipeline does.
read_client.add_filtered_metrics(read_batch.filteredSize)
read_client.add_output_metrics(read_batch.filteredSize)
read_client.close_batch(read_batch)
store = collector.snapshot_metric.metric_store
expect(store.get_shallow(:events, :out).value).to eq(5)
expect(store.get_shallow(:events, :filtered).value).to eq(5)
expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0
expect(store.get_shallow(:pipeline, :out).value).to eq(5)
expect(store.get_shallow(:pipeline, :filtered).value).to eq(5)
end
end
end
context "when writing to the queue" do
before :each do
read_client.set_events_metric(LogStash::Instrument::NamespacedNullMetric.new(nil, :null))

View file

@ -19,14 +19,6 @@ require "rspec"
require "rspec/expectations"
require "stud/try"
RSpec::Matchers.define :be_a_metric_event do |namespace, type, *args|
match do
namespace == Array(actual[0]).concat(Array(actual[1])) &&
type == actual[2] &&
args == actual[3..-1]
end
end
# Match to test `NullObject` pattern
RSpec::Matchers.define :implement_interface_of do |type, key, value|
match do |actual|

View file

@ -37,7 +37,7 @@ shared_examples "metrics commons operations" do
end
end
describe "#decrement" do
describe "#decrement", skip: "LongCounter impl does not support decrement" do
it "allows to decrement a key with no amount" do
expect { subject.decrement(key, 100) }.not_to raise_error
end
@ -100,7 +100,6 @@ shared_examples "metrics commons operations" do
expect { execution.stop }.not_to raise_error
end
it "raises an exception if the key is an empty string" do
expect { subject.time("") {} }.to raise_error(LogStash::Instrument::MetricNoKeyProvided)
end

View file

@ -43,6 +43,14 @@ public interface NamespacedMetric extends Metric {
*/
CounterMetric counter(String metric);
/**
* Creates a timer with the name {@code metric}.
*
* @param metric name of the counter
* @return an instance tracking a counter metric allowing easy incrementing and resetting
*/
TimerMetric timer(String metric);
/**
* Increment the {@code metric} metric by 1.
*

View file

@ -0,0 +1,49 @@
package co.elastic.logstash.api;
import java.io.IOException;
import java.util.function.Supplier;
/**
* This {@code TimerMetric} is a write-only interface for timing execution.
*
* <p>It includes two primary methods of tracking timed execution:
* <dl>
* <dt>{@link TimerMetric#time}</dt>
* <dd>Track the execution time of the provided block or closure.
* This is the preferred method, as it requires no math or
* external time-tracking.</dd>
* <dt>{@link TimerMetric#reportUntrackedMillis}</dt>
* <dd>Report milliseconds elapsed that were <em>NOt</em> tracked.
* This method requires provisioning your own time source
* (typically {@link System#nanoTime()}) and performing your
* own time conversion math.</dd>
* </dl>
*
* A namespaced instance of {@code TimerMetric} can be acquired by plugins
* using {@link NamespacedMetric#timer(String)}, or can be invoked directly
* from a metric namespace with {@link NamespacedMetric#time(String, Supplier)}
* or {@link NamespacedMetric#reportTime(String, long)}.
*/
public interface TimerMetric {
<T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E;
void reportUntrackedMillis(final long untrackedMillis);
default <E extends Throwable> void time(final ExceptionalRunnable<E> exceptionalRunnable) throws E {
this.<Void, E>time(() -> {
exceptionalRunnable.run();
return null;
});
}
@FunctionalInterface
interface ExceptionalSupplier<T,E extends Throwable> {
T get() throws E;
}
@FunctionalInterface
interface ExceptionalRunnable<E extends Throwable> {
void run() throws E;
}
}

View file

@ -353,12 +353,20 @@ public final class CompiledPipeline {
}
}
public interface Execution <QB extends QueueBatch> {
/**
* @return the number of events that was processed, could be less o greater than batch.size(), depending if
* the pipeline drops or clones events during the filter stage.
* */
int compute(final QB batch, final boolean flush, final boolean shutdown);
}
/**
* Instances of this class represent a fully compiled pipeline execution. Note that this class
* has a separate lifecycle from {@link CompiledPipeline} because it holds per (worker-thread)
* state and thus needs to be instantiated once per thread.
*/
public abstract class CompiledExecution {
public abstract class CompiledExecution implements Execution<QueueBatch> {
/**
* Compiled {@link IfVertex, indexed by their ID as returned by {@link Vertex#getId()}.
@ -379,12 +387,6 @@ public final class CompiledPipeline {
compiledOutputs = compileOutputs();
}
/**
* @return the number of events that was processed, could be less o greater than batch.size(), depending if
* the pipeline drops or clones events during the filter stage.
* */
public abstract int compute(final QueueBatch batch, final boolean flush, final boolean shutdown);
public abstract int compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown);
/**

View file

@ -34,10 +34,10 @@ import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@JRubyClass(name = "AbstractFilterDelegator")
public abstract class AbstractFilterDelegatorExt extends RubyObject {
@ -52,7 +52,7 @@ public abstract class AbstractFilterDelegatorExt extends RubyObject {
protected transient LongCounter eventMetricIn;
protected transient LongCounter eventMetricTime;
protected transient TimerMetric eventMetricTime;
public AbstractFilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
@ -65,7 +65,7 @@ public abstract class AbstractFilterDelegatorExt extends RubyObject {
metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY);
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
eventMetricTime = LongCounter.fromRubyBase(metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY);
eventMetricTime = TimerMetric.fromRubyBase(metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY);
namespacedMetric.gauge(context, MetricKeys.NAME_KEY, configName(context));
}
}
@ -130,9 +130,7 @@ public abstract class AbstractFilterDelegatorExt extends RubyObject {
public RubyArray multiFilter(final IRubyObject input) {
RubyArray batch = (RubyArray) input;
eventMetricIn.increment((long) batch.size());
final long start = System.nanoTime();
final RubyArray result = doMultiFilter(batch);
eventMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
final RubyArray result = eventMetricTime.time(() -> doMultiFilter(batch));
int count = 0;
for (final JrubyEventExtLibrary.RubyEvent event : (Collection<JrubyEventExtLibrary.RubyEvent>) result) {
if (!event.getEvent().isCancelled()) {

View file

@ -21,7 +21,7 @@
package org.logstash.config.ir.compiler;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
@ -36,6 +36,7 @@ import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
@JRubyClass(name = "AbstractOutputDelegator")
@ -57,7 +58,7 @@ public abstract class AbstractOutputDelegatorExt extends RubyObject {
private transient LongCounter eventMetricIn;
private transient LongCounter eventMetricTime;
private transient TimerMetric eventMetricTime;
public AbstractOutputDelegatorExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
@ -117,9 +118,7 @@ public abstract class AbstractOutputDelegatorExt extends RubyObject {
final RubyArray batch = (RubyArray) events;
final int count = batch.size();
eventMetricIn.increment((long) count);
final long start = System.nanoTime();
doOutput(batch);
eventMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
eventMetricTime.time(() -> doOutput(batch));
eventMetricOut.increment((long) count);
return this;
}
@ -134,7 +133,7 @@ public abstract class AbstractOutputDelegatorExt extends RubyObject {
namespacedMetric.gauge(context, MetricKeys.NAME_KEY, configName(context));
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
eventMetricTime = LongCounter.fromRubyBase(metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY);
eventMetricTime = TimerMetric.fromRubyBase(metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY);
}
}

View file

@ -33,6 +33,7 @@ import org.jruby.internal.runtime.methods.DynamicMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.timer.NullTimerMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import java.util.UUID;
@ -70,7 +71,7 @@ public final class FilterDelegatorExt extends AbstractFilterDelegatorExt {
public FilterDelegatorExt initForTesting(final IRubyObject filter, RubyObject configNameDouble) {
eventMetricOut = LongCounter.DUMMY_COUNTER;
eventMetricIn = LongCounter.DUMMY_COUNTER;
eventMetricTime = LongCounter.DUMMY_COUNTER;
eventMetricTime = NullTimerMetric.getInstance();
this.filter = filter;
filterMethod = filter.getMetaClass().searchMethod(FILTER_METHOD_NAME);
flushes = filter.respondsTo("flush");

View file

@ -24,22 +24,16 @@ import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.CounterMetric;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Metric;
import co.elastic.logstash.api.NamespacedMetric;
import co.elastic.logstash.api.PluginConfigSpec;
import org.jruby.RubySymbol;
import org.jruby.runtime.ThreadContext;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import co.elastic.logstash.api.TimerMetric;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class JavaCodecDelegator implements Codec {
@ -52,13 +46,13 @@ public class JavaCodecDelegator implements Codec {
protected final CounterMetric encodeMetricIn;
protected final CounterMetric encodeMetricTime;
protected final TimerMetric encodeMetricTime;
protected final CounterMetric decodeMetricIn;
protected final CounterMetric decodeMetricOut;
protected final CounterMetric decodeMetricTime;
protected final TimerMetric decodeMetricTime;
public JavaCodecDelegator(final Context context, final Codec codec) {
@ -71,12 +65,12 @@ public class JavaCodecDelegator implements Codec {
final NamespacedMetric encodeMetric = metric.namespace(ENCODE_KEY);
encodeMetricIn = encodeMetric.counter(IN_KEY);
encodeMetricTime = encodeMetric.counter(MetricKeys.DURATION_IN_MILLIS_KEY.asJavaString());
encodeMetricTime = encodeMetric.timer(MetricKeys.DURATION_IN_MILLIS_KEY.asJavaString());
final NamespacedMetric decodeMetric = metric.namespace(DECODE_KEY);
decodeMetricIn = decodeMetric.counter(IN_KEY);
decodeMetricOut = decodeMetric.counter(MetricKeys.OUT_KEY.asJavaString());
decodeMetricTime = decodeMetric.counter(MetricKeys.DURATION_IN_MILLIS_KEY.asJavaString());
decodeMetricTime = decodeMetric.timer(MetricKeys.DURATION_IN_MILLIS_KEY.asJavaString());
}
}
@ -84,39 +78,27 @@ public class JavaCodecDelegator implements Codec {
public void decode(final ByteBuffer buffer, final Consumer<Map<String, Object>> eventConsumer) {
decodeMetricIn.increment();
final long start = System.nanoTime();
codec.decode(buffer, (event) -> {
decodeMetricTime.time(() -> codec.decode(buffer, (event) -> {
decodeMetricOut.increment();
eventConsumer.accept(event);
});
decodeMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
}));
}
@Override
public void flush(final ByteBuffer buffer, final Consumer<Map<String, Object>> eventConsumer) {
decodeMetricIn.increment();
final long start = System.nanoTime();
codec.flush(buffer, (event) -> {
decodeMetricTime.time(() -> codec.flush(buffer, (event) -> {
decodeMetricOut.increment();
eventConsumer.accept(event);
});
decodeMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
}));
}
@Override
public void encode(final Event event, final OutputStream out) throws IOException {
encodeMetricIn.increment();
final long start = System.nanoTime();
codec.encode(event, out);
decodeMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
encodeMetricTime.time(() -> codec.encode(event, out));
}
@Override

View file

@ -78,14 +78,15 @@ import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.FlowMetric;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.MetricType;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
import org.logstash.instrument.metrics.gauge.NumberGauge;
import org.logstash.instrument.metrics.FlowMetric;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
import org.logstash.plugins.factory.PluginFactoryExt;
@ -276,6 +277,16 @@ public class AbstractPipelineExt extends RubyBasicObject {
return context.nil;
}
@JRubyMethod(name = "process_events_namespace_metric")
public final IRubyObject processEventsNamespaceMetric(final ThreadContext context) {
return metric.namespace(context, EVENTS_METRIC_NAMESPACE);
}
@JRubyMethod(name = "pipeline_events_namespace_metric")
public final IRubyObject pipelineEventsNamespaceMetric(final ThreadContext context) {
return metric.namespace(context, pipelineNamespacedPath(EVENTS_KEY));
}
@JRubyMethod(name = "filter_queue_client")
public final QueueReadClientBase filterQueueClient() {
return filterQueueClient;
@ -511,12 +522,12 @@ public class AbstractPipelineExt extends RubyBasicObject {
this.flowMetrics.add(outputThroughput);
storeMetric(context, flowNamespace, outputThroughput);
final LongCounter queuePushWaitInMillis = initOrGetCounterMetric(context, eventsNamespace, PUSH_DURATION_KEY);
final TimerMetric queuePushWaitInMillis = initOrGetTimerMetric(context, eventsNamespace, PUSH_DURATION_KEY);
final FlowMetric backpressureFlow = createFlowMetric(QUEUE_BACKPRESSURE_KEY, queuePushWaitInMillis, uptimeInPreciseMillis);
this.flowMetrics.add(backpressureFlow);
storeMetric(context, flowNamespace, backpressureFlow);
final LongCounter durationInMillis = initOrGetCounterMetric(context, eventsNamespace, DURATION_IN_MILLIS_KEY);
final TimerMetric durationInMillis = initOrGetTimerMetric(context, eventsNamespace, DURATION_IN_MILLIS_KEY);
final FlowMetric concurrencyFlow = createFlowMetric(WORKER_CONCURRENCY_KEY, durationInMillis, uptimeInPreciseMillis);
this.flowMetrics.add(concurrencyFlow);
storeMetric(context, flowNamespace, concurrencyFlow);
@ -568,6 +579,16 @@ public class AbstractPipelineExt extends RubyBasicObject {
return retrievedMetric.toJava(LongCounter.class);
}
private TimerMetric initOrGetTimerMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol metricName) {
final IRubyObject collector = this.metric.collector(context);
final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath);
final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("timer")});
return retrievedMetric.toJava(TimerMetric.class);
}
private Optional<NumberGauge> initOrGetNumberGaugeMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol metricName) {

View file

@ -0,0 +1,19 @@
package org.logstash.execution;
import org.logstash.config.ir.CompiledPipeline;
class ObservedExecution<QB extends QueueBatch> implements CompiledPipeline.Execution<QB> {
private final WorkerObserver workerObserver;
private final CompiledPipeline.Execution<QB> execution;
public ObservedExecution(final WorkerObserver workerObserver,
final CompiledPipeline.Execution<QB> execution) {
this.workerObserver = workerObserver;
this.execution = execution;
}
@Override
public int compute(QB batch, boolean flush, boolean shutdown) {
return workerObserver.observeExecutionComputation(batch, () -> execution.compute(batch, flush, shutdown));
}
}

View file

@ -20,6 +20,8 @@
package org.logstash.execution;
import co.elastic.logstash.api.TimerMetric;
import java.io.IOException;
/**
@ -32,5 +34,10 @@ public interface QueueReadClient {
void addOutputMetrics(int filteredSize);
void addFilteredMetrics(int filteredSize);
void closeBatch(QueueBatch batch) throws IOException;
public <V, E extends Exception> V executeWithTimers(final TimerMetric.ExceptionalSupplier<V,E> supplier) throws E;
public <E extends Exception> void executeWithTimers(final TimerMetric.ExceptionalRunnable<E> runnable) throws E;
boolean isEmpty();
}

View file

@ -28,11 +28,13 @@ import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import java.io.IOException;
@ -52,14 +54,12 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
protected long waitForMillis = 50;
private final ConcurrentHashMap<Long, QueueBatch> inflightBatches = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Long> inflightClocks = new ConcurrentHashMap<>();
private transient LongCounter eventMetricOut;
private transient LongCounter eventMetricFiltered;
private transient LongCounter eventMetricTime;
private transient TimerMetric eventMetricTime;
private transient LongCounter pipelineMetricOut;
private transient LongCounter pipelineMetricFiltered;
private transient LongCounter pipelineMetricTime;
private transient TimerMetric pipelineMetricTime;
protected QueueReadClientBase(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
@ -78,7 +78,7 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
synchronized(namespacedMetric.getMetric()) {
eventMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
eventMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
eventMetricTime = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
eventMetricTime = TimerMetric.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
}
return this;
}
@ -89,7 +89,7 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
synchronized(namespacedMetric.getMetric()) {
pipelineMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
pipelineMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
pipelineMetricTime = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
pipelineMetricTime = TimerMetric.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
}
return this;
}
@ -131,13 +131,6 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
public void closeBatch(QueueBatch batch) throws IOException {
batch.close();
inflightBatches.remove(Thread.currentThread().getId());
Long startTime = inflightClocks.remove(Thread.currentThread().getId());
if (startTime != null && batch.filteredSize() > 0) {
// stop timer and record metrics iff the batch is non-empty.
long elapsedTimeMillis = (System.nanoTime() - startTime) / 1_000_000;
eventMetricTime.increment(elapsedTimeMillis);
pipelineMetricTime.increment(elapsedTimeMillis);
}
}
/**
@ -196,7 +189,6 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
public void startMetrics(QueueBatch batch) {
long threadId = Thread.currentThread().getId();
inflightBatches.put(threadId, batch);
inflightClocks.put(threadId, System.nanoTime());
}
@Override
@ -211,5 +203,21 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
pipelineMetricOut.increment(filteredSize);
}
@Override
public <V, E extends Exception> V executeWithTimers(final co.elastic.logstash.api.TimerMetric.ExceptionalSupplier<V,E> supplier) throws E {
return eventMetricTime.time(() -> pipelineMetricTime.time(supplier));
}
@Override
public <E extends Exception> void executeWithTimers(co.elastic.logstash.api.TimerMetric.ExceptionalRunnable<E> runnable) throws E {
eventMetricTime.time(() -> pipelineMetricTime.time(runnable));
}
@JRubyMethod(name = "execute_with_timers")
public IRubyObject executeWithTimersRuby(final ThreadContext context,
final Block block) {
return executeWithTimers(() -> block.call(context));
}
public abstract void close() throws IOException;
}

View file

@ -21,6 +21,7 @@ package org.logstash.execution;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.config.ir.CompiledPipeline;
@ -33,7 +34,7 @@ public final class WorkerLoop implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
private final CompiledPipeline.CompiledExecution execution;
private final ObservedExecution<QueueBatch> execution;
private final QueueReadClient readClient;
@ -49,28 +50,26 @@ public final class WorkerLoop implements Runnable {
private final boolean drainQueue;
private final boolean preserveEventOrder;
public WorkerLoop(
final CompiledPipeline pipeline,
final QueueReadClient readClient,
final LongAdder filteredCounter,
final LongAdder consumedCounter,
final AtomicBoolean flushRequested,
final AtomicBoolean flushing,
final AtomicBoolean shutdownRequested,
final boolean drainQueue,
final boolean preserveEventOrder)
final QueueReadClient readClient,
final CompiledPipeline compiledPipeline,
final WorkerObserver workerObserver,
final LongAdder consumedCounter,
final LongAdder filteredCounter,
final AtomicBoolean flushRequested,
final AtomicBoolean flushing,
final AtomicBoolean shutdownRequested,
final boolean drainQueue,
final boolean preserveEventOrder)
{
this.execution = workerObserver.ofExecution(compiledPipeline.buildExecution(preserveEventOrder));
this.readClient = readClient;
this.consumedCounter = consumedCounter;
this.filteredCounter = filteredCounter;
this.execution = pipeline.buildExecution(preserveEventOrder);
this.drainQueue = drainQueue;
this.readClient = readClient;
this.flushRequested = flushRequested;
this.flushing = flushing;
this.shutdownRequested = shutdownRequested;
this.preserveEventOrder = preserveEventOrder;
}
@Override
@ -83,13 +82,10 @@ public final class WorkerLoop implements Runnable {
final boolean isFlush = flushRequested.compareAndSet(true, false);
if (batch.filteredSize() > 0 || isFlush) {
consumedCounter.add(batch.filteredSize());
readClient.startMetrics(batch);
final int outputCount = execution.compute(batch, isFlush, false);
int filteredCount = batch.filteredSize();
filteredCounter.add(filteredCount);
readClient.addOutputMetrics(outputCount);
readClient.addFilteredMetrics(filteredCount);
execution.compute(batch, isFlush, false);
filteredCounter.add(batch.filteredSize());
readClient.closeBatch(batch);
if (isFlush) {
flushing.set(false);
}
@ -98,7 +94,6 @@ public final class WorkerLoop implements Runnable {
//we are shutting down, queue is drained if it was required, now perform a final flush.
//for this we need to create a new empty batch to contain the final flushed events
final QueueBatch batch = readClient.newBatch();
readClient.startMetrics(batch);
execution.compute(batch, true, true);
readClient.closeBatch(batch);
} catch (final Exception ex) {

View file

@ -0,0 +1,63 @@
package org.logstash.execution;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.timer.TimerMetric;
public class WorkerObserver {
private final transient LongCounter processEventsFilteredMetric;
private final transient LongCounter processEventsOutMetric;
private final transient TimerMetric processEventsDurationMetric;
private final transient LongCounter pipelineEventsFilteredMetric;
private final transient LongCounter pipelineEventsOutMetric;
private final transient TimerMetric pipelineEventsDurationMetric;
public WorkerObserver(final AbstractNamespacedMetricExt processEventsMetric,
final AbstractNamespacedMetricExt pipelineEventsMetric) {
synchronized(processEventsMetric.getMetric()) {
this.processEventsOutMetric = LongCounter.fromRubyBase(processEventsMetric, MetricKeys.OUT_KEY);
this.processEventsFilteredMetric = LongCounter.fromRubyBase(processEventsMetric, MetricKeys.FILTERED_KEY);
this.processEventsDurationMetric = TimerMetric.fromRubyBase(processEventsMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
}
synchronized(pipelineEventsMetric.getMetric()) {
this.pipelineEventsOutMetric = LongCounter.fromRubyBase(pipelineEventsMetric, MetricKeys.OUT_KEY);
this.pipelineEventsFilteredMetric = LongCounter.fromRubyBase(pipelineEventsMetric, MetricKeys.FILTERED_KEY);
this.pipelineEventsDurationMetric = TimerMetric.fromRubyBase(pipelineEventsMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
}
}
public <QB extends QueueBatch> ObservedExecution<QB> ofExecution(final CompiledPipeline.Execution<QB> execution) {
return new ObservedExecution<>(this, execution);
}
<E extends Exception> int observeExecutionComputation(final QueueBatch batch, final co.elastic.logstash.api.TimerMetric.ExceptionalSupplier<Integer,E> supplier) throws E {
return executeWithTimers(() -> {
final int outputCount = supplier.get();
final int filteredCount = batch.filteredSize();
incrementFilteredMetrics(filteredCount);
incrementOutMetrics(outputCount);
return outputCount;
});
}
public <V, E extends Exception> V executeWithTimers(final co.elastic.logstash.api.TimerMetric.ExceptionalSupplier<V,E> supplier) throws E {
return processEventsDurationMetric.time(() -> pipelineEventsDurationMetric.time(supplier));
}
private void incrementOutMetrics(final long amount) {
this.processEventsOutMetric.increment(amount);
this.pipelineEventsOutMetric.increment(amount);
}
private void incrementFilteredMetrics(final long amount) {
this.processEventsFilteredMetric.increment(amount);
this.pipelineEventsFilteredMetric.increment(amount);
}
}

View file

@ -22,7 +22,6 @@ package org.logstash.ext;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyArray;
@ -33,12 +32,12 @@ import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.queue.QueueWriter;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.timer.TimerMetric;
import static org.logstash.instrument.metrics.MetricKeys.*;
@ -51,15 +50,15 @@ public final class JRubyWrappedWriteClientExt extends RubyObject implements Queu
private transient LongCounter eventsMetricsCounter;
private transient LongCounter eventsMetricsTime;
private transient TimerMetric eventsMetricsTime;
private transient LongCounter pipelineMetricsCounter;
private transient LongCounter pipelineMetricsTime;
private transient TimerMetric pipelineMetricsTime;
private transient LongCounter pluginMetricsCounter;
private transient LongCounter pluginMetricsTime;
private transient TimerMetric pluginMetricsTime;
public JRubyWrappedWriteClientExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
@ -87,45 +86,43 @@ public final class JRubyWrappedWriteClientExt extends RubyObject implements Queu
getMetric(metric, STATS_KEY, EVENTS_KEY);
eventsMetricsCounter = LongCounter.fromRubyBase(eventsMetrics, MetricKeys.IN_KEY);
eventsMetricsTime = LongCounter.fromRubyBase(eventsMetrics, MetricKeys.PUSH_DURATION_KEY);
eventsMetricsTime = TimerMetric.fromRubyBase(eventsMetrics, MetricKeys.PUSH_DURATION_KEY);
final AbstractNamespacedMetricExt pipelineEventMetrics =
getMetric(metric, STATS_KEY, PIPELINES_KEY, pipelineIdSym, EVENTS_KEY);
pipelineMetricsCounter = LongCounter.fromRubyBase(pipelineEventMetrics, MetricKeys.IN_KEY);
pipelineMetricsTime = LongCounter.fromRubyBase(pipelineEventMetrics, MetricKeys.PUSH_DURATION_KEY);
pipelineMetricsTime = TimerMetric.fromRubyBase(pipelineEventMetrics, MetricKeys.PUSH_DURATION_KEY);
final AbstractNamespacedMetricExt pluginMetrics =
getMetric(metric, STATS_KEY, PIPELINES_KEY, pipelineIdSym, PLUGINS_KEY, INPUTS_KEY, pluginIdSym, EVENTS_KEY);
pluginMetricsCounter =
LongCounter.fromRubyBase(pluginMetrics, MetricKeys.OUT_KEY);
pluginMetricsTime = LongCounter.fromRubyBase(pluginMetrics, MetricKeys.PUSH_DURATION_KEY);
pluginMetricsTime = TimerMetric.fromRubyBase(pluginMetrics, MetricKeys.PUSH_DURATION_KEY);
}
return this;
}
@JRubyMethod(name = {"push", "<<"}, required = 1)
public IRubyObject push(final ThreadContext context, final IRubyObject event)
throws InterruptedException {
final long start = System.nanoTime();
public IRubyObject push(final ThreadContext context,
final IRubyObject event) throws InterruptedException {
final JrubyEventExtLibrary.RubyEvent rubyEvent = (JrubyEventExtLibrary.RubyEvent) event;
incrementCounters(1L);
final IRubyObject res = writeClient.doPush(context, (JrubyEventExtLibrary.RubyEvent) event);
incrementTimers(start);
return res;
return executeWithTimers(() -> writeClient.doPush(context, rubyEvent));
}
@SuppressWarnings("unchecked")
@JRubyMethod(name = "push_batch", required = 1)
public IRubyObject pushBatch(final ThreadContext context, final IRubyObject batch)
throws InterruptedException {
final long start = System.nanoTime();
incrementCounters((long) ((Collection<IRubyObject>) batch).size());
final IRubyObject res = writeClient.doPushBatch(
context, (Collection<JrubyEventExtLibrary.RubyEvent>) batch
);
incrementTimers(start);
return res;
public IRubyObject pushBatch(final ThreadContext context,
final IRubyObject batch) throws InterruptedException {
final Collection<JrubyEventExtLibrary.RubyEvent> rubyEvents = (Collection<JrubyEventExtLibrary.RubyEvent>) batch;
incrementCounters(rubyEvents.size());
return executeWithTimers(() -> writeClient.doPushBatch(
context, rubyEvents
));
}
/**
@ -146,15 +143,14 @@ public final class JRubyWrappedWriteClientExt extends RubyObject implements Queu
pluginMetricsCounter.increment(count);
}
private void incrementTimers(final long start) {
final long increment = TimeUnit.MILLISECONDS.convert(
System.nanoTime() - start, TimeUnit.NANOSECONDS
);
eventsMetricsTime.increment(increment);
pipelineMetricsTime.increment(increment);
pluginMetricsTime.increment(increment);
private <V, E extends Exception> V executeWithTimers(final co.elastic.logstash.api.TimerMetric.ExceptionalSupplier<V,E> supplier) throws E {
return eventsMetricsTime.time(() -> pipelineMetricsTime.time(() -> pluginMetricsTime.time(supplier)));
}
private <E extends Exception> void executeWithTimers(final Runnable runnable) {
eventsMetricsTime.time(() -> pipelineMetricsTime.time(() -> pluginMetricsTime.time(runnable::run)));
}
private AbstractNamespacedMetricExt getMetric(final AbstractMetricExt base,
final RubySymbol... keys) {
@ -163,9 +159,7 @@ public final class JRubyWrappedWriteClientExt extends RubyObject implements Queu
@Override
public void push(Map<String, Object> event) {
final long start = System.nanoTime();
incrementCounters(1L);
writeClient.push(event);
incrementTimers(start);
executeWithTimers(() -> writeClient.push(event));
}
}

View file

@ -84,6 +84,4 @@ public final class JrubyMemoryReadClientExt extends QueueReadClientBase {
startMetrics(batch);
return batch;
}
}

View file

@ -42,9 +42,6 @@ public abstract class AbstractMetric<T> implements Metric<T> {
this.name = name;
}
@Override
public abstract MetricType getType();
@JsonValue
public abstract T getValue();

View file

@ -21,6 +21,7 @@
package org.logstash.instrument.metrics;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
@ -56,4 +57,19 @@ public abstract class AbstractMetricExt extends RubyObject {
);
protected abstract IRubyObject getCollector(ThreadContext context);
/**
* Normalize the namespace to an array
* @param namespaceName a ruby-object, which may be an array
* @return an array
*/
@SuppressWarnings("rawtypes")
protected RubyArray normalizeNamespace(final IRubyObject namespaceName) {
if (namespaceName instanceof RubyArray) {
return (RubyArray) namespaceName;
} else {
return RubyArray.newArray(namespaceName.getRuntime(), namespaceName);
}
}
}

View file

@ -48,6 +48,10 @@ public abstract class AbstractNamespacedMetricExt extends AbstractMetricExt {
final IRubyObject value) {
return getGauge(context, key, value);
}
@JRubyMethod
public IRubyObject timer(final ThreadContext context, final IRubyObject key) {
return getTimer(context, key);
}
@JRubyMethod(required = 1, optional = 1)
public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) {
@ -89,6 +93,8 @@ public abstract class AbstractNamespacedMetricExt extends AbstractMetricExt {
protected abstract IRubyObject getCounter(ThreadContext context, IRubyObject key);
protected abstract IRubyObject getTimer(ThreadContext context, IRubyObject key);
protected abstract IRubyObject doTime(ThreadContext context, IRubyObject key, Block block);
protected abstract IRubyObject doReportTime(ThreadContext context,

View file

@ -50,7 +50,14 @@ public abstract class AbstractSimpleMetricExt extends AbstractMetricExt {
@JRubyMethod
public IRubyObject gauge(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject value) {
return getGauge(context, namespace, key, value);
return getGauge(context, normalizeNamespace(namespace), key, value);
}
@JRubyMethod
public IRubyObject timer(final ThreadContext context,
final IRubyObject namespace,
final IRubyObject key) {
return getTimer(context, namespace, key);
}
@JRubyMethod(name = "report_time")
@ -60,8 +67,10 @@ public abstract class AbstractSimpleMetricExt extends AbstractMetricExt {
}
@JRubyMethod
public IRubyObject time(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final Block block) {
public IRubyObject time(final ThreadContext context,
final IRubyObject namespace,
final IRubyObject key,
final Block block) {
return doTime(context, namespace, key, block);
}
@ -72,6 +81,8 @@ public abstract class AbstractSimpleMetricExt extends AbstractMetricExt {
protected abstract IRubyObject getGauge(ThreadContext context, IRubyObject namespace,
IRubyObject key, IRubyObject value);
protected abstract IRubyObject getTimer(ThreadContext context, IRubyObject namespace, IRubyObject key);
protected abstract IRubyObject doReportTime(ThreadContext context, IRubyObject namespace,
IRubyObject key, IRubyObject duration);

View file

@ -51,6 +51,8 @@ abstract class BaseFlowMetric extends AbstractMetric<Map<String, Double>> implem
final LongSupplier nanoTimeSupplier;
static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP);
private static final OptionalDouble NEGATIVE_INFINITY_OPTIONAL_DOUBLE = OptionalDouble.of(Double.NEGATIVE_INFINITY);
private static final OptionalDouble POSITIVE_INFINITY_OPTIONAL_DOUBLE = OptionalDouble.of(Double.POSITIVE_INFINITY);
BaseFlowMetric(final LongSupplier nanoTimeSupplier,
final String name,
@ -118,7 +120,11 @@ abstract class BaseFlowMetric extends AbstractMetric<Map<String, Double>> implem
final BigDecimal deltaDenominator = current.denominator().subtract(baseline.denominator());
if (deltaDenominator.signum() == 0) {
return OptionalDouble.empty();
switch (deltaNumerator.signum()) {
case -1: return NEGATIVE_INFINITY_OPTIONAL_DOUBLE;
case 0: return OptionalDouble.empty();
case +1: return POSITIVE_INFINITY_OPTIONAL_DOUBLE;
}
}
final BigDecimal rate = deltaNumerator.divide(deltaDenominator, LIMITED_PRECISION);

View file

@ -21,6 +21,8 @@
package org.logstash.instrument.metrics;
import java.util.concurrent.TimeUnit;
import co.elastic.logstash.api.TimerMetric;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
@ -51,8 +53,9 @@ public final class MetricExt extends AbstractSimpleMetricExt {
private static final RubySymbol DECREMENT = RubyUtil.RUBY.newSymbol("decrement");
private static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge");
private static final RubySymbol TIMER = RubyUtil.RUBY.newSymbol("timer");
private static final RubySymbol SET = RubyUtil.RUBY.newSymbol("set");
private static final RubySymbol GET = RubyUtil.RUBY.newSymbol("get");
private transient IRubyObject collector;
@ -91,7 +94,7 @@ public final class MetricExt extends AbstractSimpleMetricExt {
final IRubyObject key, final IRubyObject value) {
MetricExt.validateKey(context, null, key);
return collector.callMethod(
context, "push", new IRubyObject[]{namespace, key, COUNTER, INCREMENT, value}
context, "push", new IRubyObject[]{normalizeNamespace(namespace), key, COUNTER, INCREMENT, value}
);
}
@ -104,7 +107,7 @@ public final class MetricExt extends AbstractSimpleMetricExt {
final IRubyObject key, final IRubyObject value) {
MetricExt.validateKey(context, null, key);
return collector.callMethod(
context, "push", new IRubyObject[]{namespace, key, COUNTER, DECREMENT, value}
context, "push", new IRubyObject[]{normalizeNamespace(namespace), key, COUNTER, DECREMENT, value}
);
}
@ -136,34 +139,41 @@ public final class MetricExt extends AbstractSimpleMetricExt {
final IRubyObject key, final IRubyObject value) {
MetricExt.validateKey(context, null, key);
return collector.callMethod(
context, "push", new IRubyObject[]{namespace, key, GAUGE, SET, value}
context, "push", new IRubyObject[]{normalizeNamespace(namespace), key, GAUGE, SET, value}
);
}
@Override
protected IRubyObject getTimer(final ThreadContext context,
final IRubyObject namespace,
final IRubyObject key) {
MetricExt.validateKey(context, null, key);
return collector.callMethod(context,
"get", new IRubyObject[]{normalizeNamespace(namespace), key, TIMER}
);
}
@Override
protected IRubyObject doReportTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject duration) {
MetricExt.validateKey(context, null, key);
return collector.callMethod(
context, "push", new IRubyObject[]{namespace, key, COUNTER, INCREMENT, duration}
);
final TimerMetric timer = timer(context, namespace, key).toJava(TimerMetric.class);
timer.reportUntrackedMillis(duration.convertToInteger().getLongValue());
return context.nil;
}
@Override
protected IRubyObject doTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final Block block) {
protected IRubyObject doTime(final ThreadContext context,
final IRubyObject namespace,
final IRubyObject key,
final Block block) {
MetricExt.validateKey(context, null, key);
if (!block.isGiven()) {
return MetricExt.TimedExecution.create(this, namespace, key);
}
final long startTime = System.nanoTime();
final IRubyObject res = block.call(context);
this.reportTime(context, namespace, key, RubyFixnum.newFixnum(
context.runtime, TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS
)
));
return res;
final TimerMetric timer = timer(context, namespace, key).toJava(TimerMetric.class);
return timer.time(() -> block.call(context));
}
@Override
@ -172,7 +182,7 @@ public final class MetricExt extends AbstractSimpleMetricExt {
validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS);
return NamespacedMetricExt.create(
this,
name instanceof RubyArray ? (RubyArray) name : RubyArray.newArray(context.runtime, name)
normalizeNamespace(name)
);
}

View file

@ -37,6 +37,12 @@ public enum MetricType {
* A counter backed by a {@link Number} type that includes decimal precision
*/
COUNTER_DECIMAL("counter/decimal"),
/**
* A timer backed by a {@link Long} type
*/
TIMER_LONG("timer/long"),
/**
* A gauge backed by a {@link String} type
*/

View file

@ -57,11 +57,7 @@ public final class NamespacedMetricExt extends AbstractNamespacedMetricExt {
public NamespacedMetricExt initialize(final ThreadContext context, final IRubyObject metric,
final IRubyObject namespaceName) {
this.metric = (MetricExt) metric;
if (namespaceName instanceof RubyArray) {
this.namespaceName = (RubyArray) namespaceName;
} else {
this.namespaceName = RubyArray.newArray(context.runtime, namespaceName);
}
this.namespaceName = normalizeNamespace(namespaceName);
return this;
}
@ -83,6 +79,11 @@ public final class NamespacedMetricExt extends AbstractNamespacedMetricExt {
return metric.gauge(context, namespaceName, key, value);
}
@Override
protected IRubyObject getTimer(ThreadContext context, IRubyObject key) {
return metric.timer(context, namespaceName, key);
}
@Override
protected IRubyObject doIncrement(final ThreadContext context, final IRubyObject[] args) {
if (args.length == 1) {
@ -122,9 +123,7 @@ public final class NamespacedMetricExt extends AbstractNamespacedMetricExt {
protected NamespacedMetricExt createNamespaced(final ThreadContext context,
final IRubyObject name) {
MetricExt.validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS);
return create(this.metric, (RubyArray) namespaceName.op_plus(
name instanceof RubyArray ? name : RubyArray.newArray(context.runtime, name)
));
return create(this.metric, (RubyArray) namespaceName.op_plus(normalizeNamespace(name)));
}
@Override

View file

@ -31,12 +31,15 @@ import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.timer.NullTimerMetric;
@JRubyClass(name = "NullMetric")
public final class NullMetricExt extends AbstractSimpleMetricExt {
private static final long serialVersionUID = 1L;
private static final IRubyObject NULL_TIMER_METRIC = RubyUtil.toRubyObject(NullTimerMetric.getInstance());
private transient IRubyObject collector;
public static NullMetricExt create() {
@ -83,6 +86,14 @@ public final class NullMetricExt extends AbstractSimpleMetricExt {
return context.nil;
}
@Override
protected IRubyObject getTimer(final ThreadContext context,
final IRubyObject namespace,
final IRubyObject key) {
MetricExt.validateKey(context, null, key);
return NULL_TIMER_METRIC;
}
@Override
protected IRubyObject doReportTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject duration) {

View file

@ -85,6 +85,11 @@ public final class NullNamespacedMetricExt extends AbstractNamespacedMetricExt {
return context.nil;
}
@Override
protected IRubyObject getTimer(ThreadContext context, IRubyObject key) {
return this.metric.getTimer(context, namespaceName, key);
}
@Override
protected IRubyObject doIncrement(final ThreadContext context, final IRubyObject[] args) {
return context.nil;

View file

@ -0,0 +1,62 @@
package org.logstash.instrument.metrics.timer;
import org.logstash.instrument.metrics.AbstractMetric;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongSupplier;
import static org.logstash.instrument.metrics.timer.Util.subMilliExcessNanos;
import static org.logstash.instrument.metrics.timer.Util.wholeMillisFromNanos;
/**
* This {@code AfterCompletionTimerMetric} is based on a counter,
* which is incremented after tracked execution is complete.
*/
public class AfterCompletionTimerMetric extends AbstractMetric<Long> implements TimerMetric {
private final LongAdder millis = new LongAdder();
private final LongAdder excessNanos = new LongAdder();
private final LongSupplier nanoTimeSupplier;
protected AfterCompletionTimerMetric(String name) {
this(name, System::nanoTime);
}
AfterCompletionTimerMetric(final String name,
final LongSupplier nanoTimeSupplier) {
super(name);
this.nanoTimeSupplier = nanoTimeSupplier;
}
@Override
public <T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E {
final long startNanos = this.nanoTimeSupplier.getAsLong();
try {
return exceptionalSupplier.get();
} finally {
final long durationNanos = this.nanoTimeSupplier.getAsLong() - startNanos;
this.reportNanosElapsed(durationNanos);
}
}
@Override
public void reportUntrackedMillis(final long untrackedMillis) {
this.millis.add(untrackedMillis);
}
private void reportNanosElapsed(final long nanosElapsed) {
long wholeMillis = wholeMillisFromNanos(nanosElapsed);
long excessNanos = subMilliExcessNanos(nanosElapsed);
this.millis.add(wholeMillis);
this.excessNanos.add(excessNanos);
}
@Override
public Long getValue() {
final long wholeMillis = this.millis.sum();
final long millisFromNanos = wholeMillisFromNanos(this.excessNanos.sum());
return Math.addExact(wholeMillis, millisFromNanos);
}
}

View file

@ -0,0 +1,176 @@
package org.logstash.instrument.metrics.timer;
import org.logstash.instrument.metrics.AbstractMetric;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongSupplier;
import static org.logstash.instrument.metrics.timer.Util.subMilliExcessNanos;
import static org.logstash.instrument.metrics.timer.Util.wholeMillisFromNanos;
/**
* This {@code ConcurrentLiveTimerMetric} tracks live concurrent execution.
* It is concurrency-safe and lock-free.
*
* <p>It works by keeping track of a {@code TrackedMillisState}, which contains
* a timestamped checkpoint since which concurrency has been constant. From this
* checkpoint the cumulative concurrently-elapsed time can be calculated.
*
* <p>When concurrency increases or decreases, the checkpoint is atomically replaced.
*
* <p>It separately records untracked millis.</p>
*/
public class ConcurrentLiveTimerMetric extends AbstractMetric<Long> implements TimerMetric {
private final LongAdder untrackedMillis = new LongAdder();
private final AtomicReference<TrackedMillisState> trackedMillisState;
private final LongSupplier nanoTimeSupplier;
protected ConcurrentLiveTimerMetric(final String name) {
this(name, System::nanoTime);
}
ConcurrentLiveTimerMetric(final String name, final LongSupplier nanoTimeSupplier) {
super(name);
this.nanoTimeSupplier = Objects.requireNonNullElse(nanoTimeSupplier, System::nanoTime);
this.trackedMillisState = new AtomicReference<>(new StaticTrackedMillisState());
}
@Override
public <T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E {
try {
trackedMillisState.getAndUpdate(TrackedMillisState::withIncrementedConcurrency);
return exceptionalSupplier.get();
} finally {
trackedMillisState.getAndUpdate(TrackedMillisState::withDecrementedConcurrency);
}
}
@Override
public void reportUntrackedMillis(final long untrackedMillis) {
this.untrackedMillis.add(untrackedMillis);
}
@Override
public Long getValue() {
return Math.addExact(getUntrackedMillis(), getTrackedMillis());
}
private long getUntrackedMillis() {
return this.untrackedMillis.longValue();
}
private long getTrackedMillis() {
return this.trackedMillisState.getAcquire().getValue();
}
interface TrackedMillisState {
TrackedMillisState withIncrementedConcurrency();
TrackedMillisState withDecrementedConcurrency();
long getValue();
}
private class StaticTrackedMillisState implements TrackedMillisState {
private final long cumulativeMillis;
private final int excessNanos;
StaticTrackedMillisState(final long cumulativeMillis,
final int excessNanos) {
this.cumulativeMillis = cumulativeMillis;
this.excessNanos = excessNanos;
}
public StaticTrackedMillisState() {
this(0L, 0);
}
@Override
public TrackedMillisState withIncrementedConcurrency() {
return new DynamicTrackedMillisState(nanoTimeSupplier.getAsLong(), this.cumulativeMillis, this.excessNanos, 1);
}
@Override
public TrackedMillisState withDecrementedConcurrency() {
throw new IllegalStateException("TimerMetrics cannot track negative concurrency");
}
@Override
public long getValue() {
return cumulativeMillis;
}
}
private class DynamicTrackedMillisState implements TrackedMillisState {
private final long checkpointNanoTime;
private final long millisAtCheckpoint;
private final int excessNanosAtCheckpoint;
private final int concurrencySinceCheckpoint;
DynamicTrackedMillisState(long checkpointNanoTime,
long millisAtCheckpoint,
int excessNanosAtCheckpoint,
int concurrencySinceCheckpoint) {
this.checkpointNanoTime = checkpointNanoTime;
this.millisAtCheckpoint = millisAtCheckpoint;
this.excessNanosAtCheckpoint = excessNanosAtCheckpoint;
this.concurrencySinceCheckpoint = concurrencySinceCheckpoint;
}
@Override
public TrackedMillisState withIncrementedConcurrency() {
return withAdjustedConcurrency(Vector.INCREMENT);
}
@Override
public TrackedMillisState withDecrementedConcurrency() {
return withAdjustedConcurrency(Vector.DECREMENT);
}
@Override
public long getValue() {
final long nanoAdjustment = getNanoAdjustment(nanoTimeSupplier.getAsLong());
final long milliAdjustment = wholeMillisFromNanos(nanoAdjustment);
return Math.addExact(this.millisAtCheckpoint, milliAdjustment);
}
private TrackedMillisState withAdjustedConcurrency(final Vector concurrencyAdjustmentVector) {
final int newConcurrency = Math.addExact(this.concurrencySinceCheckpoint, concurrencyAdjustmentVector.value());
final long newCheckpointNanoTime = nanoTimeSupplier.getAsLong();
final long totalNanoAdjustment = getNanoAdjustment(newCheckpointNanoTime);
final long newCheckpointMillis = Math.addExact(this.millisAtCheckpoint, wholeMillisFromNanos(totalNanoAdjustment));
final int newCheckpointExcessNanos = subMilliExcessNanos(totalNanoAdjustment);
if (newConcurrency <= 0) {
return new StaticTrackedMillisState(newCheckpointMillis, newCheckpointExcessNanos);
} else {
return new DynamicTrackedMillisState(newCheckpointNanoTime, newCheckpointMillis, newCheckpointExcessNanos, newConcurrency);
}
}
private long getNanoAdjustment(final long checkpointNanoTime) {
final long deltaNanoTime = Math.subtractExact(checkpointNanoTime, this.checkpointNanoTime);
final long calculatedNanoAdjustment = Math.multiplyExact(deltaNanoTime, this.concurrencySinceCheckpoint);
return Math.addExact(calculatedNanoAdjustment, this.excessNanosAtCheckpoint);
}
}
/**
* This private enum is a type-safety guard for
* {@link DynamicTrackedMillisState#withAdjustedConcurrency(Vector)}.
*/
private enum Vector {
INCREMENT{ int value() { return +1; } },
DECREMENT{ int value() { return -1; } };
abstract int value();
}
}

View file

@ -0,0 +1,34 @@
package org.logstash.instrument.metrics.timer;
/**
* This {@code NullTimerMetric} is adheres to our internal {@link TimerMetric}
* interface, but does not keep track of execution time. It is used as a stand-in
* when metrics are disabled.
*/
public class NullTimerMetric implements TimerMetric {
private static final TimerMetric INSTANCE = new NullTimerMetric();
public static TimerMetric getInstance() { return INSTANCE; }
private NullTimerMetric() { }
@Override
public <T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E {
return exceptionalSupplier.get();
}
@Override
public void reportUntrackedMillis(long untrackedMillis) {
// no-op
}
@Override
public String getName() {
return "NULL";
}
@Override
public Long getValue() {
return 0L;
}
}

View file

@ -0,0 +1,40 @@
package org.logstash.instrument.metrics.timer;
import org.jruby.RubySymbol;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.MetricType;
/**
* The {@code TimerMetric} is a logstash-internal extension of its public
* {@link co.elastic.logstash.api.TimerMetric} counterpart that provides read-
* and introspection-access via {@code Metric<Long>}.
*/
public interface TimerMetric extends co.elastic.logstash.api.TimerMetric, Metric<Long> {
Long getValue();
@Override
default MetricType getType() {
return MetricType.TIMER_LONG;
}
static TimerMetric create(final String name) {
return TimerMetricFactory.INSTANCE.create(name);
}
static TimerMetric fromRubyBase(final AbstractNamespacedMetricExt metric,
final RubySymbol key) {
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
final IRubyObject timer = metric.timer(context, key);
final TimerMetric javaTimer;
if (TimerMetric.class.isAssignableFrom(timer.getJavaClass())) {
javaTimer = timer.toJava(TimerMetric.class);
} else {
javaTimer = NullTimerMetric.getInstance();
}
return javaTimer;
}
}

View file

@ -0,0 +1,24 @@
package org.logstash.instrument.metrics.timer;
import java.util.function.LongSupplier;
public class TimerMetricFactory {
static final TimerMetricFactory INSTANCE = new TimerMetricFactory();
private TimerMetricFactory() {
}
public TimerMetric create(final String name) {
return create(name, System::nanoTime);
}
TimerMetric create(final String name, final LongSupplier nanoTimeSupplier) {
// INTERNAL-ONLY system property escape hatch, set with `metric.timers` config in logstash.yml
final String timerType = System.getProperty("ls.metric.timers", "delayed");
switch (timerType) {
case "live" : return new ConcurrentLiveTimerMetric(name, nanoTimeSupplier);
case "delayed": return new AfterCompletionTimerMetric(name, nanoTimeSupplier);
default : throw new IllegalStateException(String.format("Unknown timer type `%s`", timerType));
}
}
}

View file

@ -0,0 +1,15 @@
package org.logstash.instrument.metrics.timer;
public class Util {
private Util() {}
private static final long NANOS_PER_MILLI = 1_000_000L;
static long wholeMillisFromNanos(final long excessNanos) {
return Math.floorDiv(excessNanos, NANOS_PER_MILLI);
}
static int subMilliExcessNanos(final long excessNanos) {
return Math.toIntExact(Math.floorMod(excessNanos, NANOS_PER_MILLI));
}
}

View file

@ -30,10 +30,10 @@ import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Rubyfier;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.timer.TimerMetric;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -60,6 +60,11 @@ public class NamespacedMetricImpl implements NamespacedMetric {
return new CounterMetricImpl(this.threadContext, this.metrics, metric);
}
@Override
public co.elastic.logstash.api.TimerMetric timer(final String metric) {
return TimerMetric.fromRubyBase(metrics, threadContext.getRuntime().newSymbol(metric));
}
@Override
public NamespacedMetric namespace(final String... key) {
final IRubyObject[] rubyfiedKeys = Stream.of(key)
@ -87,16 +92,12 @@ public class NamespacedMetricImpl implements NamespacedMetric {
@Override
public <T> T time(final String key, final Supplier<T> callable) {
final long start = System.nanoTime();
final T ret = callable.get();
final long end = System.nanoTime();
this.reportTime(key, TimeUnit.NANOSECONDS.toMillis(end - start));
return ret;
return timer(key).time(callable::get);
}
@Override
public void reportTime(final String key, final long duration) {
this.metrics.reportTime(this.threadContext, this.getSymbol(key), this.convert(duration));
timer(key).reportUntrackedMillis(duration);
}
@Override

View file

@ -0,0 +1,163 @@
package org.logstash.execution;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.runtime.ThreadContext;
import org.junit.Test;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.config.ir.RubyEnvTestCase;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.ManualAdvanceClock;
import org.logstash.instrument.metrics.MetricExt;
import org.logstash.instrument.metrics.MetricExtFactory;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.timer.TimerMetric;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class ObservedExecutionTest extends RubyEnvTestCase {
/**
* This test emulates events flowing through multiple workers in two pipelines to ensure
* that our {@link ObservedExecution} correctly records event counts for filtered and output,
* along with the timing of execution.
*/
@Test
public void compute() throws IOException {
final ManualAdvanceClock manualAdvanceClock = new ManualAdvanceClock(Instant.now());
final MetricExt rootMetric = MetricExtFactory.newMetricExtFromTestClock(manualAdvanceClock);
final MockCompiledExecution mockQueueBatchExecution = new MockCompiledExecution(manualAdvanceClock);
final AbstractNamespacedMetricExt processEventsNamespace = namespaceMetric(rootMetric, "events");
final AbstractNamespacedMetricExt pipelineAEventsNamespace = namespaceMetric(rootMetric, "pipelines", "a", "events");
final AbstractNamespacedMetricExt pipelineBEventsNamespace = namespaceMetric(rootMetric, "pipelines", "b", "events");
// we create two worker observers, one for each pipeline, connected to the relevant metric namespaces
final WorkerObserver pipelineAWorkerObserver = new WorkerObserver(processEventsNamespace, pipelineAEventsNamespace);
final WorkerObserver pipelineBWorkerObserver = new WorkerObserver(processEventsNamespace, pipelineBEventsNamespace);
// we create three observed executions to test, one for pipeline A, and two for pipeline B
final ObservedExecution<MockQueueBatch> executionPipelineAWorker1 = pipelineAWorkerObserver.ofExecution(mockQueueBatchExecution);
final ObservedExecution<MockQueueBatch> executionPipelineBWorker1 = pipelineBWorkerObserver.ofExecution(mockQueueBatchExecution);
final ObservedExecution<MockQueueBatch> executionPipelineBWorker2 = pipelineBWorkerObserver.ofExecution(mockQueueBatchExecution);
// in pipeline A, we take 110.9ms to filter 100 events and output 10 events
final MockQueueBatch mockQueueBatchA = new MockQueueBatch(100, 10, 110_900_000L);
final int eventsOutputA = executionPipelineAWorker1.compute(mockQueueBatchA, false, false);
assertThat(eventsOutputA, is(equalTo(10)));
// in pipeline B on worker 1, we take 1010.9ms to filter 1000 events and output 100 events
final MockQueueBatch mockQueueBatchB = new MockQueueBatch(1000, 100, 1_010_900_000L);
final int eventsOutputB = executionPipelineBWorker1.compute(mockQueueBatchB, false, false);
assertThat(eventsOutputB, is(equalTo(100)));
// again in pipeline B on worker 1, we take 10010.9ms to filter 1000 events and output 1000 events
final MockQueueBatch mockQueueBatchB2 = new MockQueueBatch(1000, 1000, 10_010_900_000L);
final int eventsOutputB2 = executionPipelineBWorker1.compute(mockQueueBatchB2, false, false);
assertThat(eventsOutputB2, is(equalTo(1000)));
// and in pipeline B on worker 2, we take 100010.9ms to filter 1000 events and output 10000 events
final MockQueueBatch mockQueueBatchB3 = new MockQueueBatch(1000, 10000, 100_010_900_000L);
final int eventsOutputB3 = executionPipelineBWorker2.compute(mockQueueBatchB3, false, false);
assertThat(eventsOutputB3, is(equalTo(10000)));
// validate that the inbound filter counts made it to our independent pipeline metrics and to the combined process
final LongCounter pipelineAEventsFilteredCounter = LongCounter.fromRubyBase(pipelineAEventsNamespace, MetricKeys.FILTERED_KEY);
final LongCounter pipelineBEventsFilteredCounter = LongCounter.fromRubyBase(pipelineBEventsNamespace, MetricKeys.FILTERED_KEY);
final LongCounter processEventsFilteredCounter = LongCounter.fromRubyBase(processEventsNamespace, MetricKeys.FILTERED_KEY);
assertThat(pipelineAEventsFilteredCounter.getValue(), is(equalTo(100L)));
assertThat(pipelineBEventsFilteredCounter.getValue(), is(equalTo(3000L)));
assertThat(processEventsFilteredCounter.getValue(), is(equalTo(3100L)));
// validate that the outbound execution counts made it to our independent pipeline metrics and to the combined process
final LongCounter pipelineAEventsOutCounter = LongCounter.fromRubyBase(pipelineAEventsNamespace, MetricKeys.OUT_KEY);
final LongCounter pipelineBEventsOutCounter = LongCounter.fromRubyBase(pipelineBEventsNamespace, MetricKeys.OUT_KEY);
final LongCounter processEventsOutCounter = LongCounter.fromRubyBase(processEventsNamespace, MetricKeys.OUT_KEY);
assertThat(pipelineAEventsOutCounter.getValue(), is(equalTo(10L)));
assertThat(pipelineBEventsOutCounter.getValue(), is(equalTo(11100L)));
assertThat(processEventsOutCounter.getValue(), is(equalTo(11110L)));
// validate that the timings were reported to our independent pipeline metrics and to the combined process
final TimerMetric pipelineADurationTimer = TimerMetric.fromRubyBase(pipelineAEventsNamespace, MetricKeys.DURATION_IN_MILLIS_KEY);
final TimerMetric pipelineBDurationTimer = TimerMetric.fromRubyBase(pipelineBEventsNamespace, MetricKeys.DURATION_IN_MILLIS_KEY);
final TimerMetric processDurationTimer = TimerMetric.fromRubyBase(processEventsNamespace, MetricKeys.DURATION_IN_MILLIS_KEY);
assertThat(pipelineADurationTimer.getValue(), is(equalTo(110L))); // 110.9 -> 110
assertThat(pipelineBDurationTimer.getValue(), is(equalTo(111032L))); // 1010.9 + 10010.9 + 100010.9 = 111032.7 -> 111032
assertThat(processDurationTimer.getValue(), is(equalTo(111143L))); // 110.9 + 101.9 + 1001.9 + 10001.9 = 111143.6 -> 111143
}
private AbstractNamespacedMetricExt namespaceMetric(final MetricExt metricExt, final String... namespaces) {
final Ruby runtime = metricExt.getRuntime();
final ThreadContext context = runtime.getCurrentContext();
return metricExt.namespace(context, runtime.newArray(Arrays.stream(namespaces).map(runtime::newSymbol).collect(Collectors.toList())));
}
/**
* This {@code MockCompiledExecution} is an implementation of {@link CompiledPipeline.Execution}
* whose behaviour for {@code compute} is determined by the {@link MockQueueBatch} it receives.
* it is instantiated with a {@link ManualAdvanceClock}, which it advances during execution by
* {@code MockQueueBatch.executionDurationNanos}, and each computation returns the provided
* {@code MockQueueBatch.computationOutputSize} as its result.
*/
static class MockCompiledExecution implements CompiledPipeline.Execution<MockQueueBatch> {
private final ManualAdvanceClock manualAdvanceClock;
public MockCompiledExecution(final ManualAdvanceClock manualAdvanceClock) {
this.manualAdvanceClock = manualAdvanceClock;
}
@Override
public int compute(MockQueueBatch batch, boolean flush, boolean shutdown) {
this.manualAdvanceClock.advance(Duration.ofNanos(batch.executionDurationNanos));
return batch.computationOutputSize;
}
}
/**
* A minimal implementation of {@code QueueBatch} exclusively for use with {@link MockCompiledExecution}
* and providing the minimum subset of {@code QueueBatch}'s interface to satisfy {@link ObservedExecution<MockQueueBatch>}.
*/
static class MockQueueBatch implements QueueBatch {
private final int initialSize;
private final int computationOutputSize;
private final long executionDurationNanos;
public MockQueueBatch(int initialSize, int computationOutputSize, long executionDurationNanos) {
this.initialSize = initialSize;
this.computationOutputSize = computationOutputSize;
this.executionDurationNanos = executionDurationNanos;
}
@Override
public int filteredSize() {
return this.initialSize;
}
@Override
public RubyArray<JrubyEventExtLibrary.RubyEvent> to_a() {
throw new IllegalStateException("Mock Batch `to_a` method is not defined.");
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> events() {
throw new IllegalStateException("Mock Batch `events` method is not defined.");
}
@Override
public void close() throws IOException {
// no-op
}
}
}

View file

@ -2,10 +2,13 @@ package org.logstash.instrument.metrics;
import org.junit.Test;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.AbstractGaugeMetric;
import org.logstash.instrument.metrics.gauge.NumberGauge;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
@ -111,6 +114,74 @@ public class ExtendedFlowMetricTest {
assertThat(flowMetricValue, hasEntry("lifetime", 17.0));
}
// NOTE: in this test neither numerator nor denominator is tied to our clock,
// so our clock is ONLY used for retention and NOT factored in to the math for
// the rate of change of our numerator relative to our denominator.
// This is useful for clock-invert rates like time-per-event or for ratio
// rates like events-out-per-events-in where the denominator is not guaranteed
// to be constantly incrementing.
@Test
public void testNonMovingDenominator() {
final ManualAdvanceClock clock = new ManualAdvanceClock(Instant.now());
final NumberGauge numeratorMetric = new NumberGauge("numerator", 0);
final NumberGauge denominatorMetric = new NumberGauge("denominator", 0);
final ExtendedFlowMetric flowMetric = new ExtendedFlowMetric(clock::nanoTime, "flow", numeratorMetric, denominatorMetric);
assertThat(flowMetric.getValue(), is(anEmptyMap()));
clock.advance(Duration.ofSeconds(1));
numeratorMetric.set(17);
flowMetric.capture();
// our numerator has advanced, but our denominator has not.
// now: (17/0); baseline: (0/0); (17-0)/(0-0) -> (17/0)
// numerator has infinite growth relative to denominator
validateMetricValue(flowMetric, (flowMetricValue) -> {
assertThat(flowMetricValue, is(not(anEmptyMap())));
assertThat(flowMetricValue, hasEntry("current", Double.POSITIVE_INFINITY));
assertThat(flowMetricValue, hasEntry("lifetime", Double.POSITIVE_INFINITY));
});
// change denominator, advance clock, capture
clock.advance(Duration.ofSeconds(1));
denominatorMetric.set(13);
flowMetric.capture();
// both numerator and denominator have changed since baselines
// now: (17/13); baseline: (0/0); (17-0)/(13-0) -> 1.308 numerators per denominator
validateMetricValue(flowMetric, (flowMetricValue) -> {
assertThat(flowMetricValue, is(not(anEmptyMap())));
assertThat(flowMetricValue, hasEntry("current", 1.308));
assertThat(flowMetricValue, hasEntry("lifetime", 1.308));
});
// our denominator moves and we do a series of captures
denominatorMetric.set(12);
for (int i = 0; i < 20; i++) {
clock.advance(Duration.ofSeconds(1));
flowMetric.capture();
}
// our numerator moves _negative_ relative to the capture from 10s ago.
clock.advance(Duration.ofSeconds(1));
numeratorMetric.set(10);
flowMetric.capture();
// now: (10/12)
validateMetricValue(flowMetric, (flowMetricValue) -> {
assertThat(flowMetricValue, is(not(anEmptyMap())));
// current window baseline: (17/12); (17-10)/(12-12) -> (-7/0)
assertThat(flowMetricValue, hasEntry("current", Double.NEGATIVE_INFINITY));
// lifetime baseline: (0/0); (10-0)/(12-0) -> (10/12)
assertThat(flowMetricValue, hasEntry("lifetime", 0.8333));
});
}
private <T> void validateMetricValue(final Metric<T> metric, final Consumer<T> validator) {
validator.accept(metric.getValue());
}
private long maxRetentionPlusMinResolutionBuffer(final FlowMetricRetentionPolicy policy) {
return Math.addExact(policy.retentionNanos(), policy.resolutionNanos());

View file

@ -1,6 +1,5 @@
package org.logstash.instrument.metrics;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
@ -8,7 +7,7 @@ import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
class ManualAdvanceClock extends Clock {
public class ManualAdvanceClock extends TestClock {
private final ZoneId zoneId;
private final AtomicReference<Instant> currentInstant;
private final Instant zeroInstant;
@ -33,7 +32,7 @@ class ManualAdvanceClock extends Clock {
}
@Override
public Clock withZone(ZoneId zone) {
public TestClock withZone(ZoneId zone) {
return new ManualAdvanceClock(this.zeroInstant, this.currentInstant, zone);
}

View file

@ -0,0 +1,80 @@
package org.logstash.instrument.metrics;
import org.jruby.RubyClass;
import org.jruby.RubyModule;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.timer.TestTimerMetricFactory;
import java.util.function.Function;
import static org.logstash.RubyUtil.METRIC_CLASS;
/**
* This {@code MetricExtFactory} can be used from any tests that
* inherit from {@link org.logstash.config.ir.RubyEnvTestCase}, which ensures
* that the ruby-parts of Logstash are loaded and/or available.
*/
public class MetricExtFactory {
private final TestClock testClock;
public MetricExtFactory(final TestClock testClock) {
this.testClock = testClock;
}
private static final RubyClass COLLECTOR_CLASS = (RubyClass) RubyUtil.RUBY.evalScriptlet(
"require 'logstash/instrument/collector'\n" +
"::LogStash::Instrument::Collector");
private static final RubyClass INTERCEPTOR_MODULE_CLASS = (RubyClass) RubyUtil.RUBY.evalScriptlet(
String.join("\n",
"Class.new(Module) do",
" def initialize(intercept_type, metric_factory)",
" define_method(:initialize_metric) do |type, namespaces_path, key|",
" return super(type, namespaces_path, key) unless type == intercept_type",
" metric_factory.create(key)",
" end",
" end",
"end"
)
);
public static MetricExt newMetricExtFromTestClock(final TestClock testClock) {
return new MetricExtFactory(testClock).newRoot();
}
public MetricExt newRoot() {
final IRubyObject metricCollector = COLLECTOR_CLASS.callMethod("new");
rubyExtend(metricCollector, metricFactoryInterceptor("uptime", (new TestUptimeMetricFactory(testClock::nanoTime))::newUptimeMetric));
rubyExtend(metricCollector, metricFactoryInterceptor("timer", (new TestTimerMetricFactory(testClock::nanoTime))::newTimerMetric));
return (MetricExt)METRIC_CLASS.newInstance(RubyUtil.RUBY.getCurrentContext(), metricCollector, Block.NULL_BLOCK);
}
private RubyModule metricFactoryInterceptor(final String type, final Function<String,?> javaMetricFactory) {
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
final IRubyObject interceptType = context.runtime.newSymbol(type);
final IRubyObject metricFactory = JavaUtil.convertJavaToUsableRubyObject(context.runtime, MetricFactory.of(javaMetricFactory));
final IRubyObject interceptorModule = INTERCEPTOR_MODULE_CLASS.newInstance(context, interceptType, metricFactory, Block.NULL_BLOCK);
return (RubyModule) interceptorModule;
}
private static void rubyExtend(final IRubyObject base, final RubyModule module) {
base.callMethod(base.getRuntime().getCurrentContext(), "extend", module);
}
@FunctionalInterface
interface MetricFactory {
IRubyObject create(final IRubyObject key);
static MetricFactory of(final Function<String,?> javaMetricFactory) {
return key -> JavaUtil.convertJavaToUsableRubyObject(RubyUtil.RUBY, javaMetricFactory.apply(key.asJavaString()));
}
}
}

View file

@ -43,6 +43,7 @@ public class MetricTypeTest {
Map<MetricType, String> nameMap = new HashMap<>(EnumSet.allOf(MetricType.class).size());
nameMap.put(MetricType.COUNTER_LONG, "counter/long");
nameMap.put(MetricType.COUNTER_DECIMAL, "counter/decimal");
nameMap.put(MetricType.TIMER_LONG, "timer/long");
nameMap.put(MetricType.GAUGE_TEXT, "gauge/text");
nameMap.put(MetricType.GAUGE_BOOLEAN, "gauge/boolean");
nameMap.put(MetricType.GAUGE_NUMBER, "gauge/number");

View file

@ -0,0 +1,7 @@
package org.logstash.instrument.metrics;
import java.time.Clock;
public abstract class TestClock extends Clock {
abstract public long nanoTime();
}

View file

@ -0,0 +1,15 @@
package org.logstash.instrument.metrics;
import java.util.function.LongSupplier;
public class TestUptimeMetricFactory {
private final LongSupplier nanoTimeSupplier;
public TestUptimeMetricFactory(LongSupplier nanoTimeSupplier) {
this.nanoTimeSupplier = nanoTimeSupplier;
}
public UptimeMetric newUptimeMetric(final String name) {
return new UptimeMetric(name, nanoTimeSupplier);
}
}

View file

@ -0,0 +1,8 @@
package org.logstash.instrument.metrics.timer;
public class AfterCompletionTimerMetricTest extends TimerMetricTest {
@Override
TimerMetric initTimerMetric(final String name) {
return testTimerMetricFactory.newAfterCompletionTimerMetric(name);
}
}

View file

@ -0,0 +1,124 @@
package org.logstash.instrument.metrics.timer;
import com.google.common.util.concurrent.Monitor;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A {@code BlockingTask} is a test tool for coordinating sequential operations in what
* is normally asynchronous code. Its {@link Factory} provides methods for spawning tasks
* that block in an {@code ExecutorService} until they are released by your code.
* @param <T>
*/
class BlockingTask<T> {
public static class Factory {
private final ExecutorService executorService;
public Factory(final ExecutorService executorService) {
this.executorService = executorService;
}
/**
* Executes the provided {@code Consumer<ControlChannel>} in the executor service
* and blocks until control is yielded in the executing thread by sending
* {@link ControlChannel#markReadyAndBlockUntilRelease}.
*
* @param function your task, which <em>MUST</em> send {@link ControlChannel#markReadyAndBlockUntilRelease}.
* @return a {@code BlockingTask} for you to send {@link BlockingTask#complete()}
*
* @param <TT> the return-type of your function, which may be {@code Void}.
*/
public <TT> BlockingTask<TT> wrapping(final Function<ControlChannel, TT> function) {
final ControlChannel controlChannel = new ControlChannel();
final Future<TT> future = executorService.submit(() -> {
return function.apply(controlChannel);
});
controlChannel.blockUntilReady();
return new BlockingTask<TT>(controlChannel, future);
}
/**
* Spawns a task in the executor and blocks the current thread until the task is running.
*
* <p>Your deferred action will be executed <em>after</em> the task is released
* and <em>before</em> control is returned to the thread that releases it.
*
* @param supplier your code, which will be executed in the executor pool when this task is released.
* @return a {@code BlockingTask} waiting
*
* @param <TT> the return-type of your supplier, which may be {@code Void}.
*/
public <TT> BlockingTask<TT> deferUntilReleased(final Supplier<TT> supplier) {
return wrapping((controlChannel) -> {
controlChannel.markReadyAndBlockUntilRelease();
return supplier.get();
});
}
}
private final ControlChannel controlChannel;
private final Future<T> future;
private static final Duration SAFEGUARD = Duration.ofSeconds(10);
private BlockingTask(final ControlChannel controlChannel,
final Future<T> future) {
this.controlChannel = controlChannel;
this.future = future;
}
public T complete() throws ExecutionException, InterruptedException, TimeoutException {
controlChannel.release();
return future.get(SAFEGUARD.getSeconds(), TimeUnit.SECONDS);
}
public static class ControlChannel {
private volatile boolean isReady = false;
private volatile boolean isReleased = false;
private final Monitor monitor = new Monitor();
private final Monitor.Guard guardReady = monitor.newGuard(() -> isReady);
private final Monitor.Guard guardRelease = monitor.newGuard(() -> isReleased);
public void markReadyAndBlockUntilRelease() {
try {
monitor.enterInterruptibly(10, TimeUnit.SECONDS);
this.isReady = true;
monitor.waitFor(guardRelease, SAFEGUARD);
monitor.leave();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void blockUntilReady() {
try {
monitor.enterWhen(guardReady, SAFEGUARD);
monitor.leave();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void release() {
try {
monitor.enterInterruptibly(SAFEGUARD);
isReleased = true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
monitor.leave();
}
}
}
}

View file

@ -0,0 +1,15 @@
package org.logstash.instrument.metrics.timer;
import org.junit.Test;
public class ConcurrentLiveTimerMetricTest extends TimerMetricTest {
@Override
TimerMetric initTimerMetric(final String name) {
return testTimerMetricFactory.newConcurrentLiveTimerMetric(name);
}
@Test
public void testValueDuringConcurrentTrackedExecutions() throws Exception {
sharedTestWithConcurrentTrackedExecutions(true);
}
}

View file

@ -0,0 +1,37 @@
package org.logstash.instrument.metrics.timer;
import org.logstash.instrument.metrics.TestClock;
import java.util.function.LongSupplier;
/**
* This {@code TestTimerMetricFactory} provides factory methods for constructing implementations
* of {@link TimerMetric} for use in test that are connected to a nano-time supplier (typically
* {@link TestClock#nanoTime()} from {@link org.logstash.instrument.metrics.ManualAdvanceClock}).
*
* <p>The factory methods use the package-private constructors provided by the respective
* implementations, but are <em>public</em>, which makes them available to other test packages.
*/
public class TestTimerMetricFactory {
private final LongSupplier nanoTimeSupplier;
public TestTimerMetricFactory(TestClock testClock) {
this(testClock::nanoTime);
}
public TestTimerMetricFactory(final LongSupplier nanoTimeSupplier) {
this.nanoTimeSupplier = nanoTimeSupplier;
}
public AfterCompletionTimerMetric newAfterCompletionTimerMetric(final String name) {
return new AfterCompletionTimerMetric(name, this.nanoTimeSupplier);
}
public ConcurrentLiveTimerMetric newConcurrentLiveTimerMetric(final String name) {
return new ConcurrentLiveTimerMetric(name, this.nanoTimeSupplier);
}
public TimerMetric newTimerMetric(final String name) {
return TimerMetricFactory.INSTANCE.create(name, this.nanoTimeSupplier);
}
}

View file

@ -0,0 +1,208 @@
package org.logstash.instrument.metrics.timer;
import org.junit.Test;
import org.logstash.instrument.metrics.ManualAdvanceClock;
import org.logstash.instrument.metrics.MetricType;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* This {@code TimerMetricTest} is meant to be inherited by tests covering the
* implementations of {@link TimerMetric}, and includes baseline tests for guaranteeing
* that the value of the metric includes the duration of execution before control is
* given back to the caller.
*
* <p>At an <em>interface</em>-level, we only guarantee that tracked executions will
* be committed into the value <em>before</em> control is returned to the caller, which
* means that the value may or may not include "uncommitted" or mid-execution
* tracked timings. As a result, these shared tests can only validate the cumulative
* value when there are zero currently-tracked executions in-flight. Implementations
* that report live-tracking will need to validate mid-execution behaviour on their
* own.</p>
*/
public abstract class TimerMetricTest {
protected final ManualAdvanceClock manualAdvanceClock = new ManualAdvanceClock(Instant.now());
protected final TestTimerMetricFactory testTimerMetricFactory = new TestTimerMetricFactory(manualAdvanceClock);
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final BlockingTask.Factory blockingTaskFactory = new BlockingTask.Factory(executorService);
abstract TimerMetric initTimerMetric(String name);
@Test
public void testBaselineFunctionality() {
final TimerMetric timerMetric = initTimerMetric("duration_in_millis");
assertThat(timerMetric.getValue(), is(equalTo(0L)));
// nothing executing, no advance
manualAdvanceClock.advance(Duration.ofSeconds(30));
assertThat(timerMetric.getValue(), is(equalTo(0L)));
// single execution, advances whole millis
timerMetric.time(() -> {
manualAdvanceClock.advance(Duration.ofNanos(1234567890L));
});
assertThat(timerMetric.getValue(), is(equalTo(1234L)));
// nothing executing, no advance
manualAdvanceClock.advance(Duration.ofSeconds(30));
assertThat(timerMetric.getValue(), is(equalTo(1234L)));
// untracked execution, advances as expected
timerMetric.reportUntrackedMillis(7326L);
assertThat(timerMetric.getValue(), is(equalTo(8560L)));
}
@Test
public void testValueAfterConcurrentTrackedExecutions() throws Exception {
sharedTestWithConcurrentTrackedExecutions(false);
}
/**
* This shared test optionally validates the value of the timer metric after
* each state change, enabling additional validations for live timers.
*
* @param validateLiveTracking whether to validate the value of the timer after each clock change.
* @throws Exception
*/
void sharedTestWithConcurrentTrackedExecutions(final boolean validateLiveTracking) throws Exception {
final TimerMetric timerMetric = initTimerMetric("duration_in_millis");
// assert baseline timer is not incrementing when time is passing
assertThat(timerMetric.getValue(), is(equalTo(0L)));
manualAdvanceClock.advance(Duration.ofMillis(10_000_000_000L));
assertThat(timerMetric.getValue(), is(equalTo(0L)));
// methodology note: each state-change affects a single column in a decimal-formatted long,
// which gives us a bread-crumb for identifying the cause of issues when our expectation
// does not match
long expectedAdvance = 0L;
final BlockingTask<Void> taskOne = timedBlockingTask(timerMetric);
manualAdvanceClock.advance(Duration.ofMillis(1L));
expectedAdvance += Math.multiplyExact(1L, 1);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
final BlockingTask<Void> taskTwo = timedBlockingTask(timerMetric);
manualAdvanceClock.advance(Duration.ofMillis(10L));
expectedAdvance += Math.multiplyExact(10L, 2);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
taskOne.complete();
manualAdvanceClock.advance(Duration.ofMillis(100L));
expectedAdvance += Math.multiplyExact(100L, 1);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
final BlockingTask<Void> taskThree = timedBlockingTask(timerMetric);
manualAdvanceClock.advance(Duration.ofMillis(1_000L));
expectedAdvance += Math.multiplyExact(1_000L, 2);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
final BlockingTask<Void> taskFour = timedBlockingTask(timerMetric);
manualAdvanceClock.advance(Duration.ofMillis(10_000L));
expectedAdvance += Math.multiplyExact(10_000L, 3);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
taskThree.complete();
manualAdvanceClock.advance(Duration.ofMillis(100_000L));
expectedAdvance += Math.multiplyExact(100_000L, 2);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
taskTwo.complete();
manualAdvanceClock.advance(Duration.ofMillis(1_000_000L));
expectedAdvance += Math.multiplyExact(1_000_000L, 1);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
taskFour.complete();
manualAdvanceClock.advance(Duration.ofMillis(10_000_000L));
expectedAdvance += Math.multiplyExact(10_000_000L, 0);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
final BlockingTask<Void> taskFive = timedBlockingTask(timerMetric);
manualAdvanceClock.advance(Duration.ofMillis(100_000_000L));
expectedAdvance += Math.multiplyExact(100_000_000L, 1);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
taskFive.complete();
manualAdvanceClock.advance(Duration.ofMillis(1_000_000_000L));
expectedAdvance += Math.multiplyExact(1_000_000_000L, 0);
if (validateLiveTracking) { assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance))); }
// note: we assert both
assertThat(timerMetric.getValue(), is(equalTo(101_232_121L)));
assertThat(timerMetric.getValue(), is(equalTo(expectedAdvance)));
}
@Test
public void testReturnValue() {
final TimerMetric timerMetric = initTimerMetric("ok");
final Optional<String> original = Optional.of("hello");
final Optional<String> result = timerMetric.time(() -> original);
assertSame(original, result);
}
@Test
public void testName() {
final TimerMetric timerMetric = initTimerMetric("testing-timer-metric");
assertThat(timerMetric.getName(), equalTo("testing-timer-metric"));
}
@Test
public void testType() {
final TimerMetric timerMetric = initTimerMetric("testing-timer-metric-2");
assertThat(timerMetric.getType(), equalTo(MetricType.TIMER_LONG));
}
private static class ACheckedException extends Exception {
private static final long serialVersionUID = 1L;
public ACheckedException(String message) {
super(message);
}
}
@Test
public void testThrowing() {
final TimerMetric timerMetric = initTimerMetric("oh no");
final ACheckedException checkedException = new ACheckedException("gotcha");
try {
timerMetric.time(() -> { throw checkedException; });
} catch (ACheckedException chk) {
assertSame(checkedException, chk);
return;
}
fail("Checked exception not caught!");
}
@Test
public void testAccumulatesExcessNanos() {
final TimerMetric timerMetric = initTimerMetric("precisely");
for (int i = 0; i < 1000; i++) {
timerMetric.time(() -> manualAdvanceClock.advance(Duration.ofNanos(999_999L)));
}
assertThat(timerMetric.getValue(), is(equalTo(999L)));
}
private BlockingTask<Void> timedBlockingTask(final TimerMetric timerMetric) {
return blockingTaskFactory.wrapping((controlChannel) -> {
timerMetric.time(controlChannel::markReadyAndBlockUntilRelease);
return null;
});
}
}