Collect queue growth events and bytes metrics when PQ is enabled. (#14554)

* Collect growth events and bytes metrics if PQ is enabled: Java changes.

* Move queue flow under queue namespace.

* Pipeline level PQ flow metrics: add unit & integration tests.

* Include queue info in node stats sample.

* Apply suggestions from code review

Change uptime precision for PQ growth metrics to uptime seconds since PQ events are based on seconds.

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Add safeguard when using lazy delegating gauge type.

* flow metrics: simplify generics of lazy implementation

Enables interface `FlowMetrics::create` to take suppliers that _implement_
a `Metric<? extends Number>` instead of requiring them to be pre-cast, and
avoid unnecessary exposure of the metrics value-type into our lazy init.

* flow metrics: use lazy init for PQ gauge-based metrics

* noop: use enum equality

Avoids routing two enum values through `MetricType#toString()`
and `String#equals()` when they can be compared directly.

* Apply suggestions from code review

Optional.ofNullable used for safe return. Doc includes real tested expected metric values.

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>

* flow metrics: make lazy-init wraper inherit from AbstractMetric

this allows the Jackson serialization annotations to work

* flow metrics: move pipeline queue-based flows into pipeline flow namespace

* Follow up for moving PQ growth metrics under pipeline.*.flow.
- Unit and integration tests are added or fixed.
- Documentation added along with sample response data

* flow: pipeline pq flow rates docs

* Do not expect flow in the queue section of API. Metrics moved to flow section.

Update logstash-core/spec/logstash/api/commands/stats_spec.rb

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Integration test failure fix.

Mistake: `flow_status` should be `pipeline_flow_stats`

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Integration test failures fix.

Number should be Numeric in the ruby specs.

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Make CI happy.

* api specs: use PQ only where needed

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>
Co-authored-by: Ry Biesemeyer <ry.biesemeyer@elastic.co>
This commit is contained in:
Mashhur 2022-10-13 15:30:31 -07:00 committed by GitHub
parent db6a7bc619
commit f19e9cb647
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 212 additions and 40 deletions

View file

@ -502,6 +502,10 @@ Example response:
Flow rates provide visibility into how a Logstash instance or an individual pipeline is _currently_ performing relative to _itself_ over time.
This allows us to attach _meaning_ to the cumulative-value metrics that are also presented by this API, and to determine whether an instance or pipeline is behaving better or worse than it has in the past.
The following flow rates are available for the logstash process as a whole and for each of its pipelines individually.
In addition, pipelines may have <<pipeline-flow-rates, additional flow rates>> depending on their configuration.
[%autowidth.stretch]
|===
|Flow Rate | Definition
@ -533,7 +537,6 @@ It cannot be used to compare one pipeline to another or even one process to _its
A pipeline with only one single-threaded input may contribute up to 1.00, a pipeline whose inputs have hundreds of inbound connections may contribute much higher numbers to this combined value.
Additionally, some amount of back-pressure is both _normal_ and _expected_ for pipelines that are _pulling_ data, as this back-pressure allows them to slow down and pull data at a rate its downstream pipeline can tolerate.
|===
Each flow stat includes rates for one or more recent windows of time:
@ -571,8 +574,7 @@ including:
* stats for each configured filter or output stage
* info about config reload successes and failures
(when <<reloading-config,config reload>> is enabled)
* info about the persistent queue (when
<<persistent-queues,persistent queues>> are enabled)
* info about the persistent queue (when <<persistent-queues,persistent queues>> are enabled)
[source,js]
--------------------------------------------------
@ -613,6 +615,14 @@ Example response:
"worker_concurrency" : {
"current": 1.973,
"lifetime": 1.721
},
"queue_persisted_growth_bytes" : {
"current": 783100,
"lifetime": 17
},
"queue_persisted_growth_events" : {
"current": 11,
"lifetime": 0.003
}
},
"plugins" : {
@ -771,6 +781,14 @@ Example response:
"worker_concurrency" : {
"current": 1.973,
"lifetime": 1.721
},
"queue_persisted_growth_bytes" : {
"current": 783100,
"lifetime": 17
},
"queue_persisted_growth_events" : {
"current": 11,
"lifetime": 0.003
}
},
"plugins" : {
@ -821,13 +839,52 @@ Example response:
"failures" : 0
},
"queue": {
"type" : "memory"
"type" : "persisted",
"capacity": {
"max_unread_events": 0,
"page_capacity_in_bytes": 67108864,
"max_queue_size_in_bytes": 1073741824,
"queue_size_in_bytes": 3885
},
"data": {
"path": "/pipeline/queue/path",
"free_space_in_bytes": 936886480896,
"storage_type": "apfs"
},
"events": 0,
"events_count": 0,
"queue_size_in_bytes": 3885,
"max_queue_size_in_bytes": 1073741824
}
}
}
}
--------------------------------------------------
[discrete]
[[pipeline-flow-rates]]
===== Pipeline flow rates
Each pipeline's entry in the API response includes a number of pipeline-scoped <<flow-stats,_flow_ rates>> such as `input_throughput`, `worker_concurrency`, and `queue_backpressure` to provide visibility into the flow of events through the pipeline.
When configured with a <<persistent-queues,persistent queue>>, the pipeline's `flow` will include additional rates to provide visibility into the health of the pipeline's persistent queue:
[%autowidth.stretch]
|===
|Flow Rate | Definition
| `queue_persisted_growth_events` |
This metric is expressed in events-per-second, and is the rate of change of the number of unacknowleged events in the queue, relative to wall-clock time (`queue.events_count` / second).
A positive number indicates that the queue's event-count is growing, and a negative number indicates that the queue is shrinking.
| `queue_persisted_growth_bytes` |
This metric is expressed in bytes-per-second, and is the rate of change of the size of the persistent queue on disk, relative to wall-clock time (`queue.queue_size_in_bytes` / second).
A positive number indicates that the queue size-on-disk is growing, and a negative number indicates that the queue is shrinking.
NOTE: The size of a PQ on disk includes both unacknowledged events and previously-acknowledged events from pages that contain one or more unprocessed events.
This means it grows gradually as individual events are added, but shrinks in large chunks each time a whole page of processed events is reclaimed (read more: <<garbage-collection, PQ disk garbage collection>>).
|===
[discrete]
[[reload-stats]]
==== Reload stats

View file

@ -18,7 +18,8 @@
require "spec_helper"
describe LogStash::Api::Commands::Stats do
include_context "api setup"
# enable PQ to ensure PQ-related metrics are present
include_context "api setup", {"queue.type" => "persisted"}
let(:report_method) { :run }
let(:extended_pipeline) { nil }
@ -178,9 +179,27 @@ describe LogStash::Api::Commands::Stats do
:filter_throughput,
:queue_backpressure,
:worker_concurrency,
:input_throughput
:input_throughput,
:queue_persisted_growth_bytes,
:queue_persisted_growth_events
)
end
it "returns queue metric information" do
expect(report[:main][:queue].keys).to include(
:capacity,
:events,
:type,
:data)
expect(report[:main][:queue][:capacity].keys).to include(
:page_capacity_in_bytes,
:max_queue_size_in_bytes,
:queue_size_in_bytes,
:max_unread_events)
expect(report[:main][:queue][:data].keys).to include(
:storage_type,
:path,
:free_space_in_bytes)
end
end
context "when using multiple pipelines" do
before(:each) do

View file

@ -21,7 +21,8 @@ require "sinatra"
require "logstash/api/modules/node_stats"
describe LogStash::Api::Modules::NodeStats do
include_context "api setup"
# enable PQ to ensure PQ-related metrics are present
include_context "api setup", {"queue.type" => "persisted"}
include_examples "not found"
extend ResourceDSLMethods
@ -116,7 +117,9 @@ describe LogStash::Api::Modules::NodeStats do
"filter_throughput" => Hash,
"queue_backpressure" => Hash,
"worker_concurrency" => Hash,
"input_throughput" => Hash
"input_throughput" => Hash,
"queue_persisted_growth_bytes" => Hash,
"queue_persisted_growth_events" => Hash
},
"plugins" => {
"inputs" => Array,
@ -124,6 +127,21 @@ describe LogStash::Api::Modules::NodeStats do
"filters" => Array,
"outputs" => Array,
},
"queue" => {
"capacity" => {
"page_capacity_in_bytes" => Numeric,
"max_queue_size_in_bytes" => Numeric,
"queue_size_in_bytes" => Numeric,
"max_unread_events" => Numeric
},
"events" => Numeric,
"type" => String,
"data" => {
"storage_type" => String,
"path" => String,
"free_space_in_bytes" => Numeric
}
}
}
},
"reloads" => {

View file

@ -35,7 +35,7 @@ shared_context "execution_context" do
end
end
shared_context "api setup" do
shared_context "api setup" do |settings_overrides={}|
##
# blocks until the condition returns true, or the limit has passed
@ -56,7 +56,7 @@ shared_context "api setup" do
before :all do
clear_data_dir
settings = mock_settings("config.reload.automatic" => true)
settings = mock_settings({"config.reload.automatic" => true}.merge(settings_overrides))
config_source = make_config_source(settings)
config_source.add_pipeline('main', "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }")

View file

@ -42,6 +42,21 @@ import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
@JRubyClass(name = "QueueFactory")
public final class QueueFactoryExt extends RubyBasicObject {
/**
* A static value to indicate Persistent Queue is enabled.
*/
public static String PERSISTED_TYPE = "persisted";
/**
* A static value to indicate Memory Queue is enabled.
*/
public static String MEMORY_TYPE = "memory";
/**
* A contextual name to expose the queue type.
*/
public static String CONTEXT_NAME = "queue.type";
private static final long serialVersionUID = 1L;
public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
@ -51,8 +66,8 @@ public final class QueueFactoryExt extends RubyBasicObject {
@JRubyMethod(meta = true)
public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
final IRubyObject settings) throws IOException {
final String type = getSetting(context, settings, "queue.type").asJavaString();
if ("persisted".equals(type)) {
final String type = getSetting(context, settings, CONTEXT_NAME).asJavaString();
if (PERSISTED_TYPE.equals(type)) {
final Path queuePath = Paths.get(
getSetting(context, settings, "path.queue").asJavaString(),
getSetting(context, settings, "pipeline.id").asJavaString()
@ -77,7 +92,7 @@ public final class QueueFactoryExt extends RubyBasicObject {
getSetting(context, settings, "queue.max_bytes")
}
);
} else if ("memory".equals(type)) {
} else if (MEMORY_TYPE.equals(type)) {
return new JrubyWrappedSynchronousQueueExt(
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
).initialize(

View file

@ -33,7 +33,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@ -76,9 +79,12 @@ import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.MetricType;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
import org.logstash.instrument.metrics.gauge.NumberGauge;
import org.logstash.instrument.metrics.FlowMetric;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
@ -105,7 +111,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
private static final Logger LOGGER = LogManager.getLogger(AbstractPipelineExt.class);
private static final @SuppressWarnings("rawtypes") RubyArray CAPACITY_NAMESPACE =
RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("capacity"));
RubyArray.newArray(RubyUtil.RUBY, CAPACITY_KEY);
private static final @SuppressWarnings("rawtypes") RubyArray DATA_NAMESPACE =
RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("data"));
@ -515,6 +521,23 @@ public class AbstractPipelineExt extends RubyBasicObject {
this.flowMetrics.add(concurrencyFlow);
storeMetric(context, flowNamespace, concurrencyFlow);
// collect the queue_persisted_growth_events & queue_persisted_growth_bytes metrics if only persisted queue is enabled.
if (getSetting(context, QueueFactoryExt.CONTEXT_NAME).asJavaString()
.equals(QueueFactoryExt.PERSISTED_TYPE)) {
final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY);
final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY);
final Supplier<NumberGauge> eventsGaugeMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY).orElse(null);
final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds);
this.flowMetrics.add(growthEventsFlow);
storeMetric(context, flowNamespace, growthEventsFlow);
final Supplier<NumberGauge> queueSizeInBytesMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY).orElse(null);
final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds);
this.flowMetrics.add(growthBytesFlow);
storeMetric(context, flowNamespace, growthBytesFlow);
}
return context.nil;
}
@ -529,6 +552,11 @@ public class AbstractPipelineExt extends RubyBasicObject {
final Metric<? extends Number> denominatorMetric) {
return FlowMetric.create(name.asJavaString(), numeratorMetric, denominatorMetric);
}
private static FlowMetric createFlowMetric(final RubySymbol name,
final Supplier<? extends Metric<? extends Number>> numeratorMetricSupplier,
final Supplier<? extends Metric<? extends Number>> denominatorMetricSupplier) {
return FlowMetric.create(name.asJavaString(), numeratorMetricSupplier, denominatorMetricSupplier);
}
private LongCounter initOrGetCounterMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
@ -540,6 +568,21 @@ public class AbstractPipelineExt extends RubyBasicObject {
return retrievedMetric.toJava(LongCounter.class);
}
private Optional<NumberGauge> initOrGetNumberGaugeMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol metricName) {
final IRubyObject collector = this.metric.collector(context);
final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath);
final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")});
LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class);
if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_NUMBER) {
return Optional.empty();
}
return Optional.of((NumberGauge) delegatingGauge.getMetric().get());
}
private UptimeMetric initOrGetUptimeMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol uptimeMetricName) {

View file

@ -42,9 +42,9 @@ public interface FlowMetric extends Metric<Map<String,Double>> {
}
}
static <N extends Number, D extends Number> FlowMetric create(final String name,
final Supplier<Metric<N>> numeratorSupplier,
final Supplier<Metric<D>> denominatorSupplier) {
return new LazyInstantiatedFlowMetric<>(name, numeratorSupplier, denominatorSupplier);
static FlowMetric create(final String name,
final Supplier<? extends Metric<? extends Number>> numeratorSupplier,
final Supplier<? extends Metric<? extends Number>> denominatorSupplier) {
return new LazyInstantiatedFlowMetric(name, numeratorSupplier, denominatorSupplier);
}
}

View file

@ -16,27 +16,22 @@ import java.util.function.Supplier;
* and fully initializes when <em>both</em> return non-null values.
*
* @see FlowMetric#create(String, Supplier, Supplier)
*
* @param <N> the numerator metric's value type
* @param <D> the denominator metric's value type
*/
public class LazyInstantiatedFlowMetric<N extends Number, D extends Number> implements FlowMetric {
public class LazyInstantiatedFlowMetric extends AbstractMetric<Map<String, Double>> implements FlowMetric {
static final Logger LOGGER = LogManager.getLogger(LazyInstantiatedFlowMetric.class);
private final String name;
private final AtomicReference<Supplier<Metric<N>>> numeratorSupplier;
private final AtomicReference<Supplier<Metric<D>>> denominatorSupplier;
private final AtomicReference<Supplier<? extends Metric<? extends Number>>> numeratorSupplier;
private final AtomicReference<Supplier<? extends Metric<? extends Number>>> denominatorSupplier;
private final SetOnceReference<FlowMetric> inner = SetOnceReference.unset();
private static final Map<String,Double> EMPTY_MAP = Map.of();
LazyInstantiatedFlowMetric(final String name,
final Supplier<Metric<N>> numeratorSupplier,
final Supplier<Metric<D>> denominatorSupplier) {
this.name = name;
final Supplier<? extends Metric<? extends Number>> numeratorSupplier,
final Supplier<? extends Metric<? extends Number>> denominatorSupplier) {
super(name);
this.numeratorSupplier = new AtomicReference<>(numeratorSupplier);
this.denominatorSupplier = new AtomicReference<>(denominatorSupplier);
}
@ -46,11 +41,6 @@ public class LazyInstantiatedFlowMetric<N extends Number, D extends Number> impl
getInner().ifPresentOrElse(FlowMetric::capture, this::warnNotInitialized);
}
@Override
public String getName() {
return this.name;
}
@Override
public MetricType getType() {
return MetricType.FLOW_RATE;
@ -68,10 +58,10 @@ public class LazyInstantiatedFlowMetric<N extends Number, D extends Number> impl
private Optional<FlowMetric> attemptCreateInner() {
if (inner.isSet()) { return inner.asOptional(); }
final Metric<N> numeratorMetric = numeratorSupplier.getAcquire().get();
final Metric<? extends Number> numeratorMetric = numeratorSupplier.getAcquire().get();
if (Objects.isNull(numeratorMetric)) { return Optional.empty(); }
final Metric<D> denominatorMetric = denominatorSupplier.getAcquire().get();
final Metric<? extends Number> denominatorMetric = denominatorSupplier.getAcquire().get();
if (Objects.isNull(denominatorMetric)) { return Optional.empty(); }
final FlowMetric flowMetric = FlowMetric.create(this.name, numeratorMetric, denominatorMetric);
@ -91,7 +81,7 @@ public class LazyInstantiatedFlowMetric<N extends Number, D extends Number> impl
LOGGER.warn("Underlying metrics for `{}` not yet instantiated, could not capture their rates", this.name);
}
private static <TT extends Number> Supplier<Metric<TT>> constantMetricSupplierFor(final Metric<TT> mm) {
private static Supplier<Metric<? extends Number>> constantMetricSupplierFor(final Metric<? extends Number> mm) {
return () -> mm;
}
}

View file

@ -65,6 +65,8 @@ public final class MetricKeys {
public static final RubySymbol QUEUE_KEY = RubyUtil.RUBY.newSymbol("queue");
public static final RubySymbol CAPACITY_KEY = RubyUtil.RUBY.newSymbol("capacity");
public static final RubySymbol DLQ_KEY = RubyUtil.RUBY.newSymbol("dlq");
public static final RubySymbol STORAGE_POLICY_KEY = RubyUtil.RUBY.newSymbol("storage_policy");
@ -92,7 +94,9 @@ public final class MetricKeys {
public static final RubySymbol WORKER_CONCURRENCY_KEY = RubyUtil.RUBY.newSymbol("worker_concurrency");
public static final RubySymbol UPTIME_IN_SECONDS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_seconds");
public static final RubySymbol UPTIME_IN_MILLIS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_millis");
public static final RubySymbol QUEUE_PERSISTED_GROWTH_EVENTS_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_events");
public static final RubySymbol QUEUE_PERSISTED_GROWTH_BYTES_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_bytes");
}

View file

@ -27,6 +27,8 @@ import org.logstash.ext.JrubyTimestampExtLibrary.RubyTimestamp;
import org.logstash.instrument.metrics.AbstractMetric;
import org.logstash.instrument.metrics.MetricType;
import java.util.Optional;
/**
* A lazy proxy to a more specific typed {@link GaugeMetric}. The metric will only be initialized if the initial value is set, or once the {@code set} operation is called.
* <p><strong>Intended only for use with Ruby's duck typing, Java consumers should use the specific typed {@link GaugeMetric}</strong></p>
@ -71,6 +73,11 @@ public class LazyDelegatingGauge extends AbstractMetric<Object> implements Gauge
return lazyMetric == null ? null : lazyMetric.get();
}
@SuppressWarnings("rawtypes")
public Optional getMetric() {
return Optional.ofNullable(lazyMetric);
}
@Override
public MetricType getType() {
return lazyMetric == null ? null : lazyMetric.getType();

View file

@ -274,6 +274,15 @@ describe "Test Monitoring API" do
'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0),
'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0)
)
if logstash_service.settings.feature_flag == "persistent_queues"
expect(flow_status).to include(
'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)),
'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric))
)
else
expect(flow_status).to_not include('queue_persisted_growth_bytes')
expect(flow_status).to_not include('queue_persisted_growth_events')
end
end
end

View file

@ -124,6 +124,16 @@ describe "Test Logstash service when config reload is enabled" do
'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0),
'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0)
)
if logstash_service.settings.feature_flag == "persistent_queues"
expect(pipeline_flow_stats).to include(
'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)),
'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric))
)
else
expect(pipeline_flow_stats).to_not include('queue_persisted_growth_bytes')
expect(pipeline_flow_stats).to_not include('queue_persisted_growth_events')
end
end
# check reload stats