diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java index 3c2b23901..27d19ea0a 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java @@ -3,6 +3,7 @@ package org.logstash.benchmark; import com.google.common.io.Files; import java.io.File; import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -47,6 +48,8 @@ public class QueueRWBenchmark { private static final int BATCH_SIZE = 100; + private static final int ACK_INTERVAL = 1024; + private static final Event EVENT = new Event(); private Queue queue; @@ -100,6 +103,27 @@ public class QueueRWBenchmark { future.get(); } + @Benchmark + @OperationsPerInvocation(EVENTS_PER_INVOCATION) + public final void readFromArrayBlockingQueue(final Blackhole blackhole) throws Exception { + final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(ACK_INTERVAL); + final Future future = exec.submit(() -> { + for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) { + try { + final Event evnt = EVENT.clone(); + evnt.setTimestamp(Timestamp.now()); + arrayBlockingQueue.put(evnt); + } catch (final CloneNotSupportedException | InterruptedException ex) { + throw new IllegalStateException(ex); + } + } + }); + for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) { + blackhole.consume(arrayBlockingQueue.take()); + } + future.get(); + } + public static void main(final String... args) throws RunnerException { Options opt = new OptionsBuilder() .include(QueueRWBenchmark.class.getSimpleName()) @@ -113,8 +137,8 @@ public class QueueRWBenchmark { .capacity(256 * 1024 * 1024) .queueMaxBytes(Long.MAX_VALUE) .elementIOFactory(MmapPageIO::new) - .checkpointMaxWrites(1024) - .checkpointMaxAcks(1024) + .checkpointMaxWrites(ACK_INTERVAL) + .checkpointMaxAcks(ACK_INTERVAL) .checkpointIOFactory(FileCheckpointIO::new) .elementClass(Event.class).build(); }