mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
simplify batch classes, do not compute JE empty batches, refactor RE worker loop (#11746)
7.x clean backport or #11737 cleanup RubyArray "rawtypes" remove all LinkedHashSet from batch and queue classes avoid processing empty batches in Java worker loop cleanup AckedReadBatch and MemoryReadBatch refactor Ruby worker loop similar to Java Execution to not use batch merge remove QueueBatch merge and replace LinkedHashSet with ArrayList
This commit is contained in:
parent
bd7d5b9b0f
commit
f4b9349145
15 changed files with 193 additions and 186 deletions
|
@ -363,13 +363,17 @@ module LogStash; class Pipeline < BasePipeline
|
|||
|
||||
batch = filter_queue_client.read_batch.to_java # metrics are started in read_batch
|
||||
batch_size = batch.filteredSize
|
||||
events = batch.to_a
|
||||
if batch_size > 0
|
||||
@events_consumed.add(batch_size)
|
||||
filter_batch(batch)
|
||||
events = filter_batch(events)
|
||||
end
|
||||
flush_filters_to_batch(batch, :final => false) if signal.flush?
|
||||
if batch.filteredSize > 0
|
||||
output_batch(batch, output_events_map)
|
||||
|
||||
if signal.flush?
|
||||
events = flush_filters_to_batch(events, :final => false)
|
||||
end
|
||||
if events.size > 0
|
||||
output_batch(events, output_events_map)
|
||||
filter_queue_client.close_batch(batch)
|
||||
end
|
||||
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
|
||||
|
@ -380,18 +384,17 @@ module LogStash; class Pipeline < BasePipeline
|
|||
# for this we need to create a new empty batch to contain the final flushed events
|
||||
batch = filter_queue_client.to_java.newBatch
|
||||
filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
|
||||
flush_filters_to_batch(batch, :final => true)
|
||||
output_batch(batch, output_events_map)
|
||||
events = batch.to_a
|
||||
events = flush_filters_to_batch(events, :final => true)
|
||||
output_batch(events, output_events_map)
|
||||
filter_queue_client.close_batch(batch)
|
||||
end
|
||||
|
||||
def filter_batch(batch)
|
||||
filter_func(batch.to_a).each do |e|
|
||||
#these are both original and generated events
|
||||
batch.merge(e) unless e.cancelled?
|
||||
end
|
||||
filter_queue_client.add_filtered_metrics(batch.filtered_size)
|
||||
@events_filtered.add(batch.filteredSize)
|
||||
def filter_batch(events)
|
||||
result = filter_func(events)
|
||||
filter_queue_client.add_filtered_metrics(result.size)
|
||||
@events_filtered.add(result.size)
|
||||
result
|
||||
rescue Exception => e
|
||||
# Plugins authors should manage their own exceptions in the plugin code
|
||||
# but if an exception is raised up to the worker thread they are considered
|
||||
|
@ -406,13 +409,15 @@ module LogStash; class Pipeline < BasePipeline
|
|||
end
|
||||
|
||||
# Take an array of events and send them to the correct output
|
||||
def output_batch(batch, output_events_map)
|
||||
def output_batch(events, output_events_map)
|
||||
# Build a mapping of { output_plugin => [events...]}
|
||||
batch.to_a.each do |event|
|
||||
# We ask the AST to tell us which outputs to send each event to
|
||||
# Then, we stick it in the correct bin
|
||||
output_func(event).each do |output|
|
||||
output_events_map[output].push(event)
|
||||
events.each do |event|
|
||||
unless event.cancelled?
|
||||
# We ask the AST to tell us which outputs to send each event to
|
||||
# Then, we stick it in the correct bin
|
||||
output_func(event).each do |output|
|
||||
output_events_map[output].push(event)
|
||||
end
|
||||
end
|
||||
end
|
||||
# Now that we have our output to event mapping we can just invoke each output
|
||||
|
@ -422,7 +427,7 @@ module LogStash; class Pipeline < BasePipeline
|
|||
events.clear
|
||||
end
|
||||
|
||||
filter_queue_client.add_output_metrics(batch.filtered_size)
|
||||
filter_queue_client.add_output_metrics(events.size)
|
||||
end
|
||||
|
||||
def resolve_cluster_uuids
|
||||
|
@ -600,15 +605,16 @@ module LogStash; class Pipeline < BasePipeline
|
|||
#
|
||||
# @param batch [ReadClient::ReadBatch]
|
||||
# @param options [Hash]
|
||||
def flush_filters_to_batch(batch, options = {})
|
||||
def flush_filters_to_batch(events, options = {})
|
||||
result = events
|
||||
flush_filters(options) do |event|
|
||||
unless event.cancelled?
|
||||
@logger.debug? and @logger.debug("Pushing flushed events", default_logging_keys(:event => event))
|
||||
batch.merge(event)
|
||||
result << event
|
||||
end
|
||||
end
|
||||
|
||||
@flushing.set(false)
|
||||
result
|
||||
end # flush_filters_to_batch
|
||||
|
||||
def plugin_threads_info
|
||||
|
|
|
@ -80,10 +80,13 @@ describe LogStash::Pipeline do
|
|||
EOS
|
||||
end
|
||||
|
||||
let(:pipeline_settings) { { "queue.type" => queue_type, "pipeline.workers" => worker_thread_count, "pipeline.id" => pipeline_id} }
|
||||
let(:pipeline_settings) {{
|
||||
"queue.type" => queue_type,
|
||||
"pipeline.workers" => worker_thread_count,
|
||||
"pipeline.id" => pipeline_id
|
||||
}}
|
||||
|
||||
let(:pipeline_config) { mock_pipeline_config(pipeline_id, config, pipeline_settings_obj) }
|
||||
subject { described_class.new(pipeline_config, metric) }
|
||||
|
||||
let(:counting_output) { PipelinePqFileOutput.new({ "id" => output_id }) }
|
||||
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
|
||||
|
@ -95,7 +98,6 @@ describe LogStash::Pipeline do
|
|||
let(:number_of_events) { 100_000 }
|
||||
let(:page_capacity) { 1 * 1024 * 512 } # 1 128
|
||||
let(:max_bytes) { 1024 * 1024 * 1024 } # 1 gb
|
||||
let(:queue_type) { "persisted" } # "memory" "memory_acked"
|
||||
let(:times) { [] }
|
||||
|
||||
let(:pipeline_thread) do
|
||||
|
@ -105,6 +107,8 @@ describe LogStash::Pipeline do
|
|||
Thread.new { s.run }
|
||||
end
|
||||
|
||||
let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") }
|
||||
|
||||
before :each do
|
||||
FileUtils.mkdir_p(this_queue_folder)
|
||||
|
||||
|
@ -139,19 +143,43 @@ describe LogStash::Pipeline do
|
|||
# Dir.rm_rf(this_queue_folder)
|
||||
end
|
||||
|
||||
let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") }
|
||||
shared_examples "a well behaved pipeline" do
|
||||
it "populates the core metrics" do
|
||||
_metric = collected_metric[:stats][:pipelines][:main][:events]
|
||||
expect(_metric[:duration_in_millis].value).not_to be_nil
|
||||
expect(_metric[:in].value).to eq(number_of_events)
|
||||
expect(_metric[:filtered].value).to eq(number_of_events)
|
||||
expect(_metric[:out].value).to eq(number_of_events)
|
||||
STDOUT.puts " pipeline: #{subject.class}"
|
||||
STDOUT.puts " queue.type: #{pipeline_settings_obj.get("queue.type")}"
|
||||
STDOUT.puts " queue.page_capacity: #{pipeline_settings_obj.get("queue.page_capacity") / 1024}KB"
|
||||
STDOUT.puts " queue.max_bytes: #{pipeline_settings_obj.get("queue.max_bytes") / 1024}KB"
|
||||
STDOUT.puts " workers: #{worker_thread_count}"
|
||||
STDOUT.puts " events: #{number_of_events}"
|
||||
STDOUT.puts " took: #{times.first}s"
|
||||
end
|
||||
end
|
||||
|
||||
it "populates the pipelines core metrics" do
|
||||
_metric = collected_metric[:stats][:pipelines][:main][:events]
|
||||
expect(_metric[:duration_in_millis].value).not_to be_nil
|
||||
expect(_metric[:in].value).to eq(number_of_events)
|
||||
expect(_metric[:filtered].value).to eq(number_of_events)
|
||||
expect(_metric[:out].value).to eq(number_of_events)
|
||||
STDOUT.puts " queue.type: #{pipeline_settings_obj.get("queue.type")}"
|
||||
STDOUT.puts " queue.page_capacity: #{pipeline_settings_obj.get("queue.page_capacity") / 1024}KB"
|
||||
STDOUT.puts " queue.max_bytes: #{pipeline_settings_obj.get("queue.max_bytes") / 1024}KB"
|
||||
STDOUT.puts " workers: #{worker_thread_count}"
|
||||
STDOUT.puts " events: #{number_of_events}"
|
||||
STDOUT.puts " took: #{times.first}s"
|
||||
context "using PQ" do
|
||||
let(:queue_type) { "persisted" } # "memory", "persisted"
|
||||
context "with Ruby execution" do
|
||||
subject { LogStash::Pipeline.new(pipeline_config, metric) }
|
||||
it_behaves_like "a well behaved pipeline"
|
||||
end
|
||||
context "with Java execution" do
|
||||
subject { LogStash::JavaPipeline.new(pipeline_config, metric) }
|
||||
it_behaves_like "a well behaved pipeline"
|
||||
end
|
||||
end
|
||||
context "using MQ" do
|
||||
let(:queue_type) { "memory" } # "memory", "persisted"
|
||||
context "with Ruby execution" do
|
||||
subject { LogStash::Pipeline.new(pipeline_config, metric) }
|
||||
it_behaves_like "a well behaved pipeline"
|
||||
end
|
||||
context "with Java execution" do
|
||||
subject { LogStash::JavaPipeline.new(pipeline_config, metric) }
|
||||
it_behaves_like "a well behaved pipeline"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -119,19 +119,16 @@ describe LogStash::WrappedSynchronousQueue do
|
|||
message = data.get("message")
|
||||
expect(messages).to include(message)
|
||||
messages.delete(message)
|
||||
# read_batch.cancel("value-#{i}") if i > 2 # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
|
||||
if message.match /value-[3-4]/
|
||||
data.cancel
|
||||
read_batch.merge(LogStash::Event.new({ "message" => message.gsub(/value/, 'generated') }))
|
||||
end
|
||||
end
|
||||
# expect(read_batch.cancelled_size).to eq(2) # disabled for https://github.com/elastic/logstash/issues/6055
|
||||
received = []
|
||||
read_batch.to_a.each do |data|
|
||||
received << data.get("message")
|
||||
end
|
||||
expect(received.size).to eq(3)
|
||||
(0..2).each {|i| expect(received).to include("value-#{i}")}
|
||||
(3..4).each {|i| expect(received).to include("generated-#{i}")}
|
||||
end
|
||||
|
||||
it "handles Java proxied read-batch object" do
|
||||
|
|
|
@ -21,11 +21,12 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyBoolean;
|
||||
import org.jruby.RubyHash;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
|
||||
|
||||
import static org.logstash.RubyUtil.RUBY;
|
||||
|
||||
public final class AckedBatch {
|
||||
private Batch batch;
|
||||
|
@ -36,14 +37,11 @@ public final class AckedBatch {
|
|||
return ackedBatch;
|
||||
}
|
||||
|
||||
public RubyHash toRubyHash(final Ruby runtime) {
|
||||
final RubyBoolean trueValue = runtime.getTrue();
|
||||
final RubyHash result = RubyHash.newHash(runtime);
|
||||
this.batch.getElements().forEach(e -> result.fastASet(
|
||||
JrubyEventExtLibrary.RubyEvent.newRubyEvent(runtime, (Event) e),
|
||||
trueValue
|
||||
)
|
||||
);
|
||||
public Collection<RubyEvent> events() {
|
||||
final ArrayList<RubyEvent> result = new ArrayList<>(this.batch.size());
|
||||
for (final Queueable e : batch.getElements()) {
|
||||
result.add(RubyEvent.newRubyEvent(RUBY, (Event) e));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,13 +21,12 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyHash;
|
||||
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
|
||||
import org.logstash.execution.MemoryReadBatch;
|
||||
import org.logstash.execution.QueueBatch;
|
||||
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.logstash.RubyUtil.RUBY;
|
||||
|
@ -36,74 +35,55 @@ public final class AckedReadBatch implements QueueBatch {
|
|||
|
||||
private AckedBatch ackedBatch;
|
||||
|
||||
private RubyHash originals;
|
||||
|
||||
private RubyHash generated;
|
||||
private Collection<RubyEvent> events;
|
||||
|
||||
public static AckedReadBatch create(
|
||||
final JRubyAckedQueueExt queue,
|
||||
final int size,
|
||||
final long timeout)
|
||||
{
|
||||
return new AckedReadBatch(queue, size, timeout);
|
||||
}
|
||||
|
||||
private AckedReadBatch(
|
||||
final JRubyAckedQueueExt queue,
|
||||
final int size,
|
||||
final long timeout)
|
||||
{
|
||||
AckedBatch batch;
|
||||
try {
|
||||
batch = queue.readBatch(size, timeout);
|
||||
final AckedBatch batch = queue.readBatch(size, timeout);
|
||||
return (batch == null) ? new AckedReadBatch() : new AckedReadBatch(batch);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
if (batch == null) {
|
||||
originals = RubyHash.newHash(RUBY);
|
||||
ackedBatch = null;
|
||||
} else {
|
||||
ackedBatch = batch;
|
||||
originals = ackedBatch.toRubyHash(RUBY);
|
||||
}
|
||||
generated = RubyHash.newHash(RUBY);
|
||||
}
|
||||
|
||||
public static AckedReadBatch create() {
|
||||
return new AckedReadBatch();
|
||||
}
|
||||
|
||||
private AckedReadBatch() {
|
||||
ackedBatch = null;
|
||||
events = new ArrayList<>();
|
||||
}
|
||||
|
||||
private AckedReadBatch(AckedBatch batch) {
|
||||
ackedBatch = batch;
|
||||
events = batch.events();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(final RubyEvent event) {
|
||||
if (!event.isNil() && !originals.containsKey(event)) {
|
||||
generated.put(event, RUBY.getTrue());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
@Override
|
||||
public RubyArray to_a() {
|
||||
final RubyArray result = RUBY.newArray(filteredSize());
|
||||
for (final RubyEvent event : (Collection<RubyEvent>) originals.keys()) {
|
||||
if (!MemoryReadBatch.isCancelled(event)) {
|
||||
result.append(event);
|
||||
}
|
||||
}
|
||||
for (final RubyEvent event : (Collection<RubyEvent>) generated.keys()) {
|
||||
if (!MemoryReadBatch.isCancelled(event)) {
|
||||
result.append(event);
|
||||
public RubyArray<RubyEvent> to_a() {
|
||||
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> result = RUBY.newArray(events.size());
|
||||
for (final RubyEvent e : events) {
|
||||
if (!MemoryReadBatch.isCancelled(e)) {
|
||||
result.append(e);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public Collection<RubyEvent> collection() {
|
||||
// This only returns the originals and does not filter cancelled one
|
||||
// because it is only used in the WorkerLoop where only originals
|
||||
// non-cancelled exists. We should revisit this AckedReadBatch
|
||||
// implementation and get rid of this dual original/generated idea.
|
||||
// The MemoryReadBatch does not use such a strategy.
|
||||
return originals.directKeySet();
|
||||
public Collection<RubyEvent> events() {
|
||||
// This does not filter cancelled events because it is
|
||||
// only used in the WorkerLoop where there are no cancelled
|
||||
// events yet.
|
||||
return events;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (ackedBatch != null) {
|
||||
ackedBatch.close();
|
||||
|
@ -112,6 +92,6 @@ public final class AckedReadBatch implements QueueBatch {
|
|||
|
||||
@Override
|
||||
public int filteredSize() {
|
||||
return originals.size() + generated.size();
|
||||
return events.size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,11 @@
|
|||
|
||||
package org.logstash.common;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
|
||||
|
||||
/**
|
||||
* Utilities around {@link BlockingQueue}.
|
||||
|
@ -42,9 +42,12 @@ public final class LsQueueUtils {
|
|||
* @param events Events to add to Queue
|
||||
* @throws InterruptedException On interrupt during blocking queue add
|
||||
*/
|
||||
public static void addAll(final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue,
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> events) throws InterruptedException {
|
||||
for (final JrubyEventExtLibrary.RubyEvent event : events) {
|
||||
public static void addAll(
|
||||
final BlockingQueue<RubyEvent> queue,
|
||||
final Collection<RubyEvent> events)
|
||||
throws InterruptedException
|
||||
{
|
||||
for (final RubyEvent event : events) {
|
||||
queue.put(event);
|
||||
}
|
||||
}
|
||||
|
@ -65,13 +68,14 @@ public final class LsQueueUtils {
|
|||
* @throws InterruptedException On Interrupt during {@link BlockingQueue#poll()} or
|
||||
* {@link BlockingQueue#drainTo(Collection)}
|
||||
*/
|
||||
public static LinkedHashSet<JrubyEventExtLibrary.RubyEvent> drain(
|
||||
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue, final int count, final long nanos
|
||||
) throws InterruptedException {
|
||||
public static Collection<RubyEvent> drain(
|
||||
final BlockingQueue<RubyEvent> queue,
|
||||
final int count,
|
||||
final long nanos)
|
||||
throws InterruptedException
|
||||
{
|
||||
int left = count;
|
||||
//todo: make this an ArrayList once we remove the Ruby pipeline/execution
|
||||
final LinkedHashSet<JrubyEventExtLibrary.RubyEvent> collection =
|
||||
new LinkedHashSet<>(4 * count / 3 + 1);
|
||||
final ArrayList<RubyEvent> collection = new ArrayList<>(4 * count / 3 + 1);
|
||||
do {
|
||||
final int drained = drain(queue, collection, left, nanos);
|
||||
if (drained == 0) {
|
||||
|
@ -95,15 +99,18 @@ public final class LsQueueUtils {
|
|||
* @throws InterruptedException On Interrupt during {@link BlockingQueue#poll()} or
|
||||
* {@link BlockingQueue#drainTo(Collection)}
|
||||
*/
|
||||
private static int drain(final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue,
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> collection, final int count,
|
||||
final long nanos) throws InterruptedException {
|
||||
private static int drain(
|
||||
final BlockingQueue<RubyEvent> queue,
|
||||
final Collection<RubyEvent> collection,
|
||||
final int count,
|
||||
final long nanos)
|
||||
throws InterruptedException
|
||||
{
|
||||
int added = 0;
|
||||
do {
|
||||
added += queue.drainTo(collection, count - added);
|
||||
if (added < count) {
|
||||
final JrubyEventExtLibrary.RubyEvent event =
|
||||
queue.poll(nanos, TimeUnit.NANOSECONDS);
|
||||
final RubyEvent event = queue.poll(nanos, TimeUnit.NANOSECONDS);
|
||||
if (event == null) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -48,14 +48,7 @@ import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
|
|||
import org.logstash.plugins.ConfigVariableExpander;
|
||||
import org.logstash.secret.store.SecretStore;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -315,20 +308,14 @@ public final class CompiledPipeline {
|
|||
|
||||
@Override
|
||||
public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
|
||||
compute(batch.collection(), flush, shutdown);
|
||||
compute(batch.events(), flush, shutdown);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
@Override
|
||||
public void compute(final RubyArray batch, final boolean flush, final boolean shutdown) {
|
||||
compute((Collection<RubyEvent>) batch, flush, shutdown);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
|
||||
final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
|
||||
public void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
|
||||
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
|
||||
// send batch one-by-one as single-element batches down the filters
|
||||
final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
|
||||
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
|
||||
for (final RubyEvent e : batch) {
|
||||
filterBatch.set(0, e);
|
||||
final Collection<RubyEvent> result = compiledFilters.compute(filterBatch, flush, shutdown);
|
||||
|
@ -343,14 +330,14 @@ public final class CompiledPipeline {
|
|||
|
||||
@Override
|
||||
public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
|
||||
compute(batch.to_a(), flush, shutdown);
|
||||
compute(batch.events(), flush, shutdown);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
@Override
|
||||
public void compute(final RubyArray batch, final boolean flush, final boolean shutdown) {
|
||||
final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
|
||||
final Collection<RubyEvent> result = compiledFilters.compute(batch, flush, shutdown);
|
||||
public void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
|
||||
// we know for now this comes from batch.collection() which returns a LinkedHashSet
|
||||
final Collection<RubyEvent> result = compiledFilters.compute(RubyArray.newArray(RubyUtil.RUBY, batch), flush, shutdown);
|
||||
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray(result.size());
|
||||
copyNonCancelledEvents(result, outputBatch);
|
||||
compiledFilters.clear();
|
||||
compiledOutputs.compute(outputBatch, flush, shutdown);
|
||||
|
@ -385,8 +372,7 @@ public final class CompiledPipeline {
|
|||
|
||||
public abstract void compute(final QueueBatch batch, final boolean flush, final boolean shutdown);
|
||||
|
||||
@SuppressWarnings({"rawtypes"})
|
||||
public abstract void compute(final RubyArray batch, final boolean flush, final boolean shutdown);
|
||||
public abstract void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown);
|
||||
|
||||
/**
|
||||
* Instantiates the graph of compiled filter section {@link Dataset}.
|
||||
|
|
|
@ -21,55 +21,50 @@ package org.logstash.execution;
|
|||
|
||||
import org.jruby.RubyArray;
|
||||
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
|
||||
import static org.logstash.RubyUtil.RUBY;
|
||||
|
||||
public final class MemoryReadBatch implements QueueBatch {
|
||||
|
||||
private final LinkedHashSet<RubyEvent> events;
|
||||
|
||||
public MemoryReadBatch(final LinkedHashSet<RubyEvent> events) {
|
||||
this.events = events;
|
||||
}
|
||||
private final Collection<RubyEvent> events;
|
||||
|
||||
public static boolean isCancelled(final RubyEvent event) {
|
||||
return event.getEvent().isCancelled();
|
||||
}
|
||||
|
||||
public static MemoryReadBatch create(LinkedHashSet<RubyEvent> events) {
|
||||
public static MemoryReadBatch create(Collection<RubyEvent> events) {
|
||||
return new MemoryReadBatch(events);
|
||||
}
|
||||
|
||||
public static MemoryReadBatch create() {
|
||||
return create(new LinkedHashSet<>());
|
||||
return new MemoryReadBatch(new ArrayList<>());
|
||||
}
|
||||
|
||||
private MemoryReadBatch(final Collection<RubyEvent> events) {
|
||||
this.events = events;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({"rawtypes"})
|
||||
public RubyArray to_a() {
|
||||
final RubyArray result = RUBY.newArray(events.size());
|
||||
for (final RubyEvent event : events) {
|
||||
if (!isCancelled(event)) {
|
||||
result.append(event);
|
||||
public RubyArray<RubyEvent> to_a() {
|
||||
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> result = RUBY.newArray(events.size());
|
||||
for (final RubyEvent e : events) {
|
||||
if (!isCancelled(e)) {
|
||||
result.append(e);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<RubyEvent> collection() {
|
||||
public Collection<RubyEvent> events() {
|
||||
// This does not filter cancelled events because it is
|
||||
// only used in the WorkerLoop where there are no cancelled
|
||||
// events yet.
|
||||
return events;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(final RubyEvent event) {
|
||||
events.add(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int filteredSize() {
|
||||
return events.size();
|
||||
|
|
|
@ -26,8 +26,7 @@ import java.util.Collection;
|
|||
|
||||
public interface QueueBatch {
|
||||
int filteredSize();
|
||||
@SuppressWarnings({"rawtypes"}) RubyArray to_a();
|
||||
Collection<RubyEvent> collection();
|
||||
void merge(RubyEvent event);
|
||||
RubyArray<RubyEvent> to_a();
|
||||
Collection<RubyEvent> events();
|
||||
void close() throws IOException;
|
||||
}
|
||||
|
|
|
@ -76,17 +76,19 @@ public final class WorkerLoop implements Runnable {
|
|||
do {
|
||||
isShutdown = isShutdown || shutdownRequested.get();
|
||||
final QueueBatch batch = readClient.readBatch();
|
||||
consumedCounter.add(batch.filteredSize());
|
||||
final boolean isFlush = flushRequested.compareAndSet(true, false);
|
||||
readClient.startMetrics(batch);
|
||||
execution.compute(batch, isFlush, false);
|
||||
int filteredCount = batch.filteredSize();
|
||||
filteredCounter.add(filteredCount);
|
||||
readClient.addOutputMetrics(filteredCount);
|
||||
readClient.addFilteredMetrics(filteredCount);
|
||||
readClient.closeBatch(batch);
|
||||
if (isFlush) {
|
||||
flushing.set(false);
|
||||
if (batch.filteredSize() > 0 || isFlush) {
|
||||
consumedCounter.add(batch.filteredSize());
|
||||
readClient.startMetrics(batch);
|
||||
execution.compute(batch, isFlush, false);
|
||||
int filteredCount = batch.filteredSize();
|
||||
filteredCounter.add(filteredCount);
|
||||
readClient.addOutputMetrics(filteredCount);
|
||||
readClient.addFilteredMetrics(filteredCount);
|
||||
readClient.closeBatch(batch);
|
||||
if (isFlush) {
|
||||
flushing.set(false);
|
||||
}
|
||||
}
|
||||
} while (!isShutdown || isDraining());
|
||||
//we are shutting down, queue is drained if it was required, now perform a final flush.
|
||||
|
|
|
@ -76,13 +76,12 @@ public final class JrubyAckedReadClientExt extends QueueReadClientBase implement
|
|||
|
||||
@Override
|
||||
public QueueBatch newBatch() {
|
||||
return AckedReadBatch.create(queue, 0, 0);
|
||||
return AckedReadBatch.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueBatch readBatch() {
|
||||
AckedReadBatch batch =
|
||||
AckedReadBatch.create(queue, batchSize, waitForMillis);
|
||||
final AckedReadBatch batch = AckedReadBatch.create(queue, batchSize, waitForMillis);
|
||||
startMetrics(batch);
|
||||
return batch;
|
||||
}
|
||||
|
|
|
@ -77,8 +77,7 @@ public final class JrubyMemoryReadClientExt extends QueueReadClientBase {
|
|||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public QueueBatch readBatch() throws InterruptedException {
|
||||
MemoryReadBatch batch = MemoryReadBatch.create(
|
||||
LsQueueUtils.drain(queue, batchSize, waitForNanos));
|
||||
final MemoryReadBatch batch = MemoryReadBatch.create(LsQueueUtils.drain(queue, batchSize, waitForNanos));
|
||||
startMetrics(batch);
|
||||
return batch;
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Supplier;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyObject;
|
||||
import org.jruby.RubyString;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
|
@ -115,6 +116,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
EVENT_SINKS.remove(runId);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Test
|
||||
public void buildsTrivialPipeline() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
|
@ -134,6 +136,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true));
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Test
|
||||
public void buildsStraightPipeline() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
|
@ -155,6 +158,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true));
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Test
|
||||
public void buildsForkedPipeline() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(IRHelpers.toSourceWithMetadata(
|
||||
|
@ -280,6 +284,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
verifyRegex("!~", 0);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void verifyRegex(String operator, int expectedEvents)
|
||||
throws IncompleteSourceWithMetadataException {
|
||||
final Event event = new Event();
|
||||
|
@ -307,6 +312,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
outputEvents.clear();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Test
|
||||
public void equalityCheckOnCompositeField() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
|
@ -338,6 +344,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
MatcherAssert.assertThat(testEvent.getEvent().getField("foo"), CoreMatchers.nullValue());
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Test
|
||||
public void conditionalWithNullField() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
|
@ -362,6 +369,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
MatcherAssert.assertThat(testEvent.getEvent().getField("foo"), CoreMatchers.is("bar"));
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Test
|
||||
public void conditionalNestedMetaFieldPipeline() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
|
@ -387,6 +395,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
MatcherAssert.assertThat(testEvent.getEvent().getField("foo"), CoreMatchers.nullValue());
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Test
|
||||
public void moreThan255Parents() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
|
@ -440,6 +449,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
verifyComparison(expected, String.format("[brr] %s [baz]", op), event);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void verifyComparison(final boolean expected, final String conditional,
|
||||
final Event event) throws IncompleteSourceWithMetadataException {
|
||||
final JrubyEventExtLibrary.RubyEvent testEvent =
|
||||
|
|
|
@ -72,7 +72,7 @@ public final class EventConditionTest extends RubyEnvTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("rawtypes")
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public void testInclusionWithFieldInField() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
IRHelpers.toSourceWithMetadata("input {mockinput{}} filter { " +
|
||||
|
@ -154,6 +154,7 @@ public final class EventConditionTest extends RubyEnvTestCase {
|
|||
testConditionWithConstantValue("\"\"", 0);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void testConditionWithConstantValue(String condition, int expectedMatches) throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
IRHelpers.toSourceWithMetadata("input {mockinput{}} filter { " +
|
||||
|
|
|
@ -2,11 +2,11 @@
|
|||
# or more contributor license agreements. Licensed under the Elastic License;
|
||||
# you may not use this file except in compliance with the Elastic License.
|
||||
|
||||
require 'spec_helper'
|
||||
require "logstash-core"
|
||||
require "logstash/agent"
|
||||
require "monitoring/inputs/metrics"
|
||||
require "rspec/wait"
|
||||
require 'spec_helper'
|
||||
require "json"
|
||||
require "json-schema"
|
||||
require 'monitoring/monitoring'
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue