MINOR: Fixed Iteration Order per Batch Worker Thread

Fixes #8928
This commit is contained in:
Armin 2018-01-10 06:42:56 +01:00 committed by Armin Braun
parent 058c9a6c47
commit 644f536101
2 changed files with 24 additions and 26 deletions

View file

@ -491,15 +491,14 @@ describe "conditionals in filter" do
sample_one({"type" => "original"}) do
expect(subject).to be_an(Array)
expect(subject.length).to eq(2)
subject.sort! {|a, b| a.get("type") <=> b.get("type")}
expect(subject[1].get("type")).to eq("original")
expect(subject[1].get("cond1")).to eq("true")
expect(subject[1].get("cond2")).to eq(nil)
expect(subject[0].get("type")).to eq("clone")
# expect(subject[1].get("cond1")).to eq(nil)
# expect(subject[1].get("cond2")).to eq("true")
original_event = subject[0]
expect(original_event.get("type")).to eq("original")
expect(original_event.get("cond1")).to eq("true")
expect(original_event.get("cond2")).to eq(nil)
cloned_event = subject[1]
expect(cloned_event.get("cond1")).to eq(nil)
expect(cloned_event.get("cond2")).to eq("true")
expect(cloned_event.get("type")).to eq("clone")
end
end
@ -520,20 +519,18 @@ describe "conditionals in filter" do
CONFIG
sample_one({"type" => "original"}) do
expect(subject.length).to eq(3)
subject.sort! {|a, b| a.get("type") <=> b.get("type")}
expect(subject[0].get("type")).to eq("clone1")
expect(subject[0].get("cond1")).to eq("true")
expect(subject[0].get("cond2")).to eq(nil)
expect(subject[1].get("type")).to eq("clone2")
expect(subject[1].get("cond1")).to eq(nil)
expect(subject[1].get("cond2")).to eq("true")
expect(subject[2].get("type")).to eq("original")
expect(subject[2].get("cond1")).to eq(nil)
expect(subject[2].get("cond2")).to eq(nil)
clone_event_1 = subject[0]
expect(clone_event_1.get("type")).to eq("clone1")
expect(clone_event_1.get("cond1")).to eq("true")
expect(clone_event_1.get("cond2")).to eq(nil)
clone_event_2 = subject[1]
expect(clone_event_2.get("type")).to eq("clone2")
expect(clone_event_2.get("cond1")).to eq(nil)
expect(clone_event_2.get("cond2")).to eq("true")
original_event = subject[2]
expect(original_event.get("type")).to eq("original")
expect(original_event.get("cond1")).to eq(nil)
expect(original_event.get("cond2")).to eq(nil)
end
end
@ -586,7 +583,7 @@ describe "conditionals in filter" do
expect(tags[6]).to eq("prev")
expect(tags[7]).to eq("final")
end
sample_one("type" => "original") do
tags = subject.get("tags")
expect(tags[0]).to eq("prev")

View file

@ -1,7 +1,7 @@
package org.logstash.common;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.logstash.ext.JrubyEventExtLibrary;
@ -49,8 +49,9 @@ public final class LsQueueUtils {
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue, final int count, final long nanos
) throws InterruptedException {
int left = count;
//todo: make this an ArrayList once we remove the Ruby pipeline/execution
final Collection<JrubyEventExtLibrary.RubyEvent> collection =
new HashSet<>(4 * count / 3 + 1);
new LinkedHashSet<>(4 * count / 3 + 1);
do {
final int drained = drain(queue, collection, left, nanos);
if (drained == 0) {