mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
5dfd0c60a2
commit
0b83d4fc86
3 changed files with 27 additions and 43 deletions
|
@ -164,7 +164,6 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
|
||||
@input_queue_client = @queue.write_client
|
||||
@filter_queue_client = @queue.read_client
|
||||
@signal_queue = java.util.concurrent.LinkedBlockingQueue.new
|
||||
# Note that @inflight_batches as a central mechanism for tracking inflight
|
||||
# batches will fail if we have multiple read clients here.
|
||||
@filter_queue_client.set_events_metric(metric.namespace([:stats, :events]))
|
||||
|
@ -181,6 +180,8 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
@ready = Concurrent::AtomicBoolean.new(false)
|
||||
@running = Concurrent::AtomicBoolean.new(false)
|
||||
@flushing = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
||||
@flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
||||
@shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
||||
@outputs_registered = Concurrent::AtomicBoolean.new(false)
|
||||
@finished_execution = Concurrent::AtomicBoolean.new(false)
|
||||
end # def initialize
|
||||
|
@ -369,10 +370,9 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
|
||||
pipeline_workers.times do |t|
|
||||
thread = Thread.new do
|
||||
org.logstash.execution.WorkerLoop.new(@lir_execution, @signal_queue,
|
||||
@filter_queue_client, @events_filtered,
|
||||
@events_consumed, @flushing,
|
||||
@drain_queue).run
|
||||
org.logstash.execution.WorkerLoop.new(
|
||||
@lir_execution, @filter_queue_client, @events_filtered, @events_consumed,
|
||||
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
|
||||
end
|
||||
thread.name="[#{pipeline_id}]>worker#{t}"
|
||||
@worker_threads << thread
|
||||
|
@ -491,11 +491,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
# tell the worker threads to stop and then block until they've fully stopped
|
||||
# This also stops all filter and output plugins
|
||||
def shutdown_workers
|
||||
# Each worker thread will receive this exactly once!
|
||||
@worker_threads.each do |t|
|
||||
@logger.debug("Pushing shutdown", default_logging_keys(:thread => t.inspect))
|
||||
@signal_queue.put(SHUTDOWN)
|
||||
end
|
||||
@shutdownRequested.set(true)
|
||||
|
||||
@worker_threads.each do |t|
|
||||
@logger.debug("Shutdown waiting for worker thread" , default_logging_keys(:thread => t.inspect))
|
||||
|
@ -519,7 +515,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
def start_flusher
|
||||
# Invariant to help detect improper initialization
|
||||
raise "Attempted to start flusher on a stopped pipeline!" if stopped?
|
||||
@flusher_thread = org.logstash.execution.PeriodicFlush.new(@signal_queue, FLUSH, @flushing)
|
||||
@flusher_thread = org.logstash.execution.PeriodicFlush.new(@flushRequested, @flushing)
|
||||
@flusher_thread.start
|
||||
end
|
||||
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
package org.logstash.execution;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
|
||||
public final class PeriodicFlush implements AutoCloseable {
|
||||
|
||||
|
@ -17,16 +15,12 @@ public final class PeriodicFlush implements AutoCloseable {
|
|||
r -> new Thread(r, "logstash-pipeline-flush")
|
||||
);
|
||||
|
||||
private final BlockingQueue<IRubyObject> queue;
|
||||
|
||||
private final IRubyObject signal;
|
||||
private final AtomicBoolean flushRequested;
|
||||
|
||||
private final AtomicBoolean flushing;
|
||||
|
||||
public PeriodicFlush(final BlockingQueue<IRubyObject> queue, final IRubyObject signal,
|
||||
final AtomicBoolean flushing) {
|
||||
this.queue = queue;
|
||||
this.signal = signal;
|
||||
public PeriodicFlush(final AtomicBoolean flushRequested, final AtomicBoolean flushing) {
|
||||
this.flushRequested = flushRequested;
|
||||
this.flushing = flushing;
|
||||
}
|
||||
|
||||
|
@ -34,11 +28,7 @@ public final class PeriodicFlush implements AutoCloseable {
|
|||
executor.scheduleAtFixedRate(() -> {
|
||||
if (flushing.compareAndSet(false, true)) {
|
||||
LOGGER.debug("Pushing flush onto pipeline.");
|
||||
try {
|
||||
queue.put(signal);
|
||||
} catch (final InterruptedException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
flushRequested.set(true);
|
||||
}
|
||||
}, 0L, 5L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
|
|
@ -1,13 +1,9 @@
|
|||
package org.logstash.execution;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.config.ir.CompiledPipeline;
|
||||
import org.logstash.config.ir.compiler.Dataset;
|
||||
|
||||
|
@ -17,42 +13,43 @@ public final class WorkerLoop implements Runnable {
|
|||
|
||||
private final Dataset execution;
|
||||
|
||||
private final BlockingQueue<IRubyObject> signalQueue;
|
||||
|
||||
private final QueueReadClient readClient;
|
||||
|
||||
private final AtomicBoolean flushRequested;
|
||||
|
||||
private final AtomicBoolean flushing;
|
||||
|
||||
private final AtomicBoolean shutdownRequested;
|
||||
|
||||
private final LongAdder consumedCounter;
|
||||
|
||||
private final LongAdder filteredCounter;
|
||||
|
||||
private final boolean drainQueue;
|
||||
|
||||
public WorkerLoop(final CompiledPipeline pipeline, final BlockingQueue<IRubyObject> signalQueue,
|
||||
final QueueReadClient readClient, final LongAdder filteredCounter,
|
||||
final LongAdder consumedCounter, final AtomicBoolean flushing, final boolean drainQueue) {
|
||||
public WorkerLoop(final CompiledPipeline pipeline, final QueueReadClient readClient,
|
||||
final LongAdder filteredCounter, final LongAdder consumedCounter,
|
||||
final AtomicBoolean flushRequested, final AtomicBoolean flushing,
|
||||
final AtomicBoolean shutdownRequested, final boolean drainQueue) {
|
||||
this.consumedCounter = consumedCounter;
|
||||
this.filteredCounter = filteredCounter;
|
||||
this.execution = pipeline.buildExecution();
|
||||
this.signalQueue = signalQueue;
|
||||
this.drainQueue = drainQueue;
|
||||
this.readClient = readClient;
|
||||
this.flushRequested = flushRequested;
|
||||
this.flushing = flushing;
|
||||
this.shutdownRequested = shutdownRequested;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean shutdownRequested = false;
|
||||
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
|
||||
boolean isShutdown = false;
|
||||
do {
|
||||
final IRubyObject signal = signalQueue.poll();
|
||||
shutdownRequested = shutdownRequested
|
||||
|| signal != null && signal.callMethod(context, "shutdown?").isTrue();
|
||||
isShutdown = isShutdown || shutdownRequested.get();
|
||||
final QueueBatch batch = readClient.readBatch();
|
||||
consumedCounter.add(batch.filteredSize());
|
||||
final boolean isFlush = signal != null && signal.callMethod(context, "flush?").isTrue();
|
||||
final boolean isFlush = flushRequested.get();
|
||||
readClient.startMetrics(batch);
|
||||
execution.compute(batch.to_a(), isFlush, false);
|
||||
int filteredCount = batch.filteredSize();
|
||||
|
@ -62,13 +59,14 @@ public final class WorkerLoop implements Runnable {
|
|||
readClient.closeBatch(batch);
|
||||
if (isFlush) {
|
||||
flushing.set(false);
|
||||
flushRequested.set(false);
|
||||
}
|
||||
} while (!shutdownRequested || isDraining());
|
||||
} while (!isShutdown || isDraining());
|
||||
//we are shutting down, queue is drained if it was required, now perform a final flush.
|
||||
//for this we need to create a new empty batch to contain the final flushed events
|
||||
final QueueBatch batch = readClient.newBatch();
|
||||
readClient.startMetrics(batch);
|
||||
execution.compute(batch.to_a(), true, false);
|
||||
execution.compute(batch.to_a(), true, true);
|
||||
readClient.closeBatch(batch);
|
||||
} catch (final Exception ex) {
|
||||
LOGGER.error(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue