diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java new file mode 100644 index 000000000..3c2b23901 --- /dev/null +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java @@ -0,0 +1,121 @@ +package org.logstash.benchmark; + +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.logstash.Event; +import org.logstash.Timestamp; +import org.logstash.ackedqueue.Batch; +import org.logstash.ackedqueue.Queue; +import org.logstash.ackedqueue.Queueable; +import org.logstash.ackedqueue.Settings; +import org.logstash.ackedqueue.SettingsImpl; +import org.logstash.ackedqueue.io.FileCheckpointIO; +import org.logstash.ackedqueue.io.MmapPageIO; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +public class QueueRWBenchmark { + + private static final int EVENTS_PER_INVOCATION = 500_000; + + private static final int BATCH_SIZE = 100; + + private static final Event EVENT = new Event(); + + private Queue queue; + + private String path; + + private ExecutorService exec; + + @Setup + public void setUp() throws IOException, CloneNotSupportedException { + final Settings settings = settings(); + EVENT.setField("Foo", "Bar"); + EVENT.setField("Foo1", "Bar1"); + EVENT.setField("Foo2", "Bar2"); + EVENT.setField("Foo3", "Bar3"); + EVENT.setField("Foo4", "Bar4"); + path = settings.getDirPath(); + queue = new Queue(settings); + queue.open(); + exec = Executors.newSingleThreadExecutor(); + } + + @TearDown + public void tearDown() throws IOException { + queue.close(); + FileUtils.deleteDirectory(new File(path)); + exec.shutdownNow(); + } + + @Benchmark + @OperationsPerInvocation(EVENTS_PER_INVOCATION) + public final void readFromPersistedQueue(final Blackhole blackhole) throws Exception { + final Future future = exec.submit(() -> { + for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) { + try { + final Event evnt = EVENT.clone(); + evnt.setTimestamp(Timestamp.now()); + this.queue.write(evnt); + } catch (final IOException | CloneNotSupportedException ex) { + throw new IllegalStateException(ex); + } + } + }); + for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) { + try (Batch batch = queue.readBatch(BATCH_SIZE)) { + for (final Queueable elem : batch.getElements()) { + blackhole.consume(elem); + } + } + } + future.get(); + } + + public static void main(final String... args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(QueueRWBenchmark.class.getSimpleName()) + .forks(2) + .build(); + new Runner(opt).run(); + } + + private static Settings settings() { + return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath()) + .capacity(256 * 1024 * 1024) + .queueMaxBytes(Long.MAX_VALUE) + .elementIOFactory(MmapPageIO::new) + .checkpointMaxWrites(1024) + .checkpointMaxAcks(1024) + .checkpointIOFactory(FileCheckpointIO::new) + .elementClass(Event.class).build(); + } +} diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueWriteBenchmark.java similarity index 96% rename from logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java rename to logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueWriteBenchmark.java index 7489258bf..a81bf3429 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueWriteBenchmark.java @@ -35,7 +35,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder; @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Thread) -public class QueueBenchmark { +public class QueueWriteBenchmark { private static final int EVENTS_PER_INVOCATION = 500_000; @@ -76,7 +76,7 @@ public class QueueBenchmark { public static void main(final String... args) throws RunnerException { Options opt = new OptionsBuilder() - .include(QueueBenchmark.class.getSimpleName()) + .include(QueueWriteBenchmark.class.getSimpleName()) .forks(2) .build(); new Runner(opt).run();