mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
d7cd3b29cc
commit
ba0a4ee075
2 changed files with 91 additions and 45 deletions
|
@ -374,7 +374,10 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
pipeline_workers.times do |t|
|
||||
batched_execution = @lir_execution.buildExecution
|
||||
thread = Thread.new(self, batched_execution) do |_pipeline, _batched_execution|
|
||||
_pipeline.worker_loop(_batched_execution)
|
||||
org.logstash.execution.WorkerLoop.new(_batched_execution, @signal_queue,
|
||||
@filter_queue_client, @events_filtered,
|
||||
@events_consumed, @flushing,
|
||||
@drain_queue).run
|
||||
end
|
||||
thread.name="[#{pipeline_id}]>worker#{t}"
|
||||
@worker_threads << thread
|
||||
|
@ -400,30 +403,6 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
@settings.get("dead_letter_queue.enable")
|
||||
end
|
||||
|
||||
# Main body of what a worker thread does
|
||||
# Repeatedly takes batches off the queue, filters, then outputs them
|
||||
def worker_loop(batched_execution)
|
||||
shutdown_requested = false
|
||||
while true
|
||||
signal = @signal_queue.poll || NO_SIGNAL
|
||||
shutdown_requested |= signal.shutdown? # latch on shutdown signal
|
||||
|
||||
batch = @filter_queue_client.read_batch # metrics are started in read_batch
|
||||
@events_consumed.increment(batch.size)
|
||||
execute_batch(batched_execution, batch, signal.flush?)
|
||||
@filter_queue_client.close_batch(batch)
|
||||
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
|
||||
break if (shutdown_requested && !draining_queue?)
|
||||
end
|
||||
|
||||
# we are shutting down, queue is drained if it was required, now perform a final flush.
|
||||
# for this we need to create a new empty batch to contain the final flushed events
|
||||
batch = @filter_queue_client.new_batch
|
||||
@filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
|
||||
batched_execution.compute(batch.to_a, true, true)
|
||||
@filter_queue_client.close_batch(batch)
|
||||
end
|
||||
|
||||
def wait_inputs
|
||||
@input_threads.each(&:join)
|
||||
end
|
||||
|
@ -647,26 +626,6 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
|
||||
private
|
||||
|
||||
def execute_batch(batched_execution, batch, flush)
|
||||
batched_execution.compute(batch.to_a, flush, false)
|
||||
@events_filtered.increment(batch.size)
|
||||
filtered_size = batch.filtered_size
|
||||
@filter_queue_client.add_output_metrics(filtered_size)
|
||||
@filter_queue_client.add_filtered_metrics(filtered_size)
|
||||
@flushing.set(false) if flush
|
||||
rescue Exception => e
|
||||
# Plugins authors should manage their own exceptions in the plugin code
|
||||
# but if an exception is raised up to the worker thread they are considered
|
||||
# fatal and logstash will not recover from this situation.
|
||||
#
|
||||
# Users need to check their configuration or see if there is a bug in the
|
||||
# plugin.
|
||||
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
|
||||
default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
|
||||
|
||||
raise e
|
||||
end
|
||||
|
||||
def maybe_setup_out_plugins
|
||||
if @outputs_registered.make_true
|
||||
register_plugins(@outputs)
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
package org.logstash.execution;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.config.ir.compiler.Dataset;
|
||||
|
||||
public final class WorkerLoop implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
|
||||
|
||||
private final Dataset execution;
|
||||
|
||||
private final BlockingQueue<IRubyObject> signalQueue;
|
||||
|
||||
private final IRubyObject readClient;
|
||||
|
||||
private final IRubyObject flushing;
|
||||
|
||||
private final IRubyObject consumedCounter;
|
||||
|
||||
private final IRubyObject filteredCounter;
|
||||
|
||||
private final boolean drainQueue;
|
||||
|
||||
public WorkerLoop(final Dataset execution, final BlockingQueue<IRubyObject> signalQueue,
|
||||
final IRubyObject readClient, final IRubyObject filteredCounter,
|
||||
final IRubyObject consumedCounter, final IRubyObject flushing, final boolean drainQueue) {
|
||||
this.consumedCounter = consumedCounter;
|
||||
this.filteredCounter = filteredCounter;
|
||||
this.execution = execution;
|
||||
this.signalQueue = signalQueue;
|
||||
this.drainQueue = drainQueue;
|
||||
this.readClient = readClient;
|
||||
this.flushing = flushing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean shutdownRequested = false;
|
||||
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
|
||||
do {
|
||||
final IRubyObject signal = signalQueue.poll();
|
||||
shutdownRequested = shutdownRequested
|
||||
|| signal != null && signal.callMethod(context, "shutdown?").isTrue();
|
||||
final IRubyObject batch = readClient.callMethod(context, "read_batch");
|
||||
consumedCounter.callMethod(
|
||||
context, "increment", batch.callMethod(context, "size")
|
||||
);
|
||||
final boolean isFlush = signal != null && signal.callMethod(context, "flush?").isTrue();
|
||||
readClient.callMethod(context, "start_metrics", batch);
|
||||
execution.compute((RubyArray) batch.callMethod(context, "to_a"), isFlush, false);
|
||||
filteredCounter.callMethod(
|
||||
context, "increment", batch.callMethod(context, "size")
|
||||
);
|
||||
final IRubyObject filteredSize = batch.callMethod(context, "filtered_size");
|
||||
readClient.callMethod(context, "add_output_metrics", filteredSize);
|
||||
readClient.callMethod(context, "add_filtered_metrics", filteredSize);
|
||||
readClient.callMethod(context, "close_batch", batch);
|
||||
if (isFlush) {
|
||||
flushing.callMethod(context, "set", context.fals);
|
||||
}
|
||||
} while (!shutdownRequested && !isDraining(context));
|
||||
//we are shutting down, queue is drained if it was required, now perform a final flush.
|
||||
//for this we need to create a new empty batch to contain the final flushed events
|
||||
final IRubyObject batch = readClient.callMethod(context, "new_batch");
|
||||
readClient.callMethod(context, "start_metrics", batch);
|
||||
execution.compute((RubyArray) batch.callMethod(context, "to_a"), true, false);
|
||||
readClient.callMethod(context, "close_batch", batch);
|
||||
} catch (final Exception ex) {
|
||||
LOGGER.error(
|
||||
"Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
|
||||
ex
|
||||
);
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDraining(final ThreadContext context) {
|
||||
return drainQueue && !readClient.callMethod(context, "empty?").isTrue();
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue