JAVAFICATION: Use Longadder instead of Ruby Concurrent AtomicLong for pipeline counters

Fixes #9067
This commit is contained in:
Armin 2018-01-30 10:22:45 +01:00 committed by Armin Braun
parent 0fc9c1f342
commit 89a9616705
4 changed files with 22 additions and 21 deletions

View file

@ -173,8 +173,8 @@ module LogStash; class JavaPipeline < JavaBasePipeline
)
@drain_queue = @settings.get_value("queue.drain") || settings.get("queue.type") == "memory"
@events_filtered = Concurrent::AtomicFixnum.new(0)
@events_consumed = Concurrent::AtomicFixnum.new(0)
@events_filtered = java.util.concurrent.atomic.LongAdder.new
@events_consumed = java.util.concurrent.atomic.LongAdder.new
@input_threads = []
# @ready requires thread safety since it is typically polled from outside the pipeline thread
@ -368,9 +368,8 @@ module LogStash; class JavaPipeline < JavaBasePipeline
@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
pipeline_workers.times do |t|
batched_execution = @lir_execution.buildExecution
thread = Thread.new(self, batched_execution) do |_pipeline, _batched_execution|
org.logstash.execution.WorkerLoop.new(_batched_execution, @signal_queue,
thread = Thread.new do
org.logstash.execution.WorkerLoop.new(@lir_execution, @signal_queue,
@filter_queue_client, @events_filtered,
@events_consumed, @flushing,
@drain_queue).run

View file

@ -186,8 +186,8 @@ module LogStash; class Pipeline < BasePipeline
@drain_queue = @settings.get_value("queue.drain") || settings.get("queue.type") == "memory"
@events_filtered = Concurrent::AtomicFixnum.new(0)
@events_consumed = Concurrent::AtomicFixnum.new(0)
@events_filtered = java.util.concurrent.atomic.LongAdder.new
@events_consumed = java.util.concurrent.atomic.LongAdder.new
@input_threads = []
# @ready requires thread safety since it is typically polled from outside the pipeline thread
@ -421,7 +421,7 @@ module LogStash; class Pipeline < BasePipeline
batch = @filter_queue_client.read_batch # metrics are started in read_batch
batch_size = batch.size
if batch_size > 0
@events_consumed.increment(batch_size)
@events_consumed.add(batch_size)
filter_batch(batch)
end
flush_filters_to_batch(batch, :final => false) if signal.flush?
@ -448,7 +448,7 @@ module LogStash; class Pipeline < BasePipeline
batch.merge(e) unless e.cancelled?
end
@filter_queue_client.add_filtered_metrics(batch.filtered_size)
@events_filtered.increment(batch.size)
@events_filtered.add(batch.size)
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
# but if an exception is raised up to the worker thread they are considered

View file

@ -70,11 +70,11 @@ module LogStash; class PipelineReporter
private
def events_filtered
pipeline.events_filtered.value
pipeline.events_filtered.sum
end
def events_consumed
pipeline.events_consumed.value
pipeline.events_consumed.sum
end
def plugin_threads

View file

@ -2,12 +2,14 @@ package org.logstash.execution;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyArray;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.config.ir.compiler.Dataset;
public final class WorkerLoop implements Runnable {
@ -22,18 +24,18 @@ public final class WorkerLoop implements Runnable {
private final AtomicBoolean flushing;
private final IRubyObject consumedCounter;
private final LongAdder consumedCounter;
private final IRubyObject filteredCounter;
private final LongAdder filteredCounter;
private final boolean drainQueue;
public WorkerLoop(final Dataset execution, final BlockingQueue<IRubyObject> signalQueue,
final IRubyObject readClient, final IRubyObject filteredCounter,
final IRubyObject consumedCounter, final AtomicBoolean flushing, final boolean drainQueue) {
public WorkerLoop(final CompiledPipeline pipeline, final BlockingQueue<IRubyObject> signalQueue,
final IRubyObject readClient, final LongAdder filteredCounter,
final LongAdder consumedCounter, final AtomicBoolean flushing, final boolean drainQueue) {
this.consumedCounter = consumedCounter;
this.filteredCounter = filteredCounter;
this.execution = execution;
this.execution = pipeline.buildExecution();
this.signalQueue = signalQueue;
this.drainQueue = drainQueue;
this.readClient = readClient;
@ -50,14 +52,14 @@ public final class WorkerLoop implements Runnable {
shutdownRequested = shutdownRequested
|| signal != null && signal.callMethod(context, "shutdown?").isTrue();
final IRubyObject batch = readClient.callMethod(context, "read_batch");
consumedCounter.callMethod(
context, "increment", batch.callMethod(context, "size")
consumedCounter.add(
(long) batch.callMethod(context, "size").convertToInteger().getIntValue()
);
final boolean isFlush = signal != null && signal.callMethod(context, "flush?").isTrue();
readClient.callMethod(context, "start_metrics", batch);
execution.compute((RubyArray) batch.callMethod(context, "to_a"), isFlush, false);
filteredCounter.callMethod(
context, "increment", batch.callMethod(context, "size")
filteredCounter.add(
(long) batch.callMethod(context, "size").convertToInteger().getIntValue()
);
final IRubyObject filteredSize = batch.callMethod(context, "filtered_size");
readClient.callMethod(context, "add_output_metrics", filteredSize);