From 941446a6dc384c514b256e3413ae35af0a3fc29a Mon Sep 17 00:00:00 2001 From: Armin Date: Sat, 10 Jun 2017 11:46:02 +0200 Subject: [PATCH] PERFORMANCE: Queue RW benchmark for in memory queue Fixes #7410 --- .../logstash/benchmark/QueueRWBenchmark.java | 80 ++++++++++++++----- 1 file changed, 59 insertions(+), 21 deletions(-) diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java index 27d19ea0a..e5b3c3d7b 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueRWBenchmark.java @@ -10,14 +10,17 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.logstash.Event; -import org.logstash.Timestamp; import org.logstash.ackedqueue.Batch; import org.logstash.ackedqueue.Queue; import org.logstash.ackedqueue.Queueable; import org.logstash.ackedqueue.Settings; import org.logstash.ackedqueue.SettingsImpl; +import org.logstash.ackedqueue.io.ByteBufferPageIO; +import org.logstash.ackedqueue.io.CheckpointIOFactory; import org.logstash.ackedqueue.io.FileCheckpointIO; +import org.logstash.ackedqueue.io.MemoryCheckpointIO; import org.logstash.ackedqueue.io.MmapPageIO; +import org.logstash.ackedqueue.io.PageIOFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -52,7 +55,11 @@ public class QueueRWBenchmark { private static final Event EVENT = new Event(); - private Queue queue; + private ArrayBlockingQueue queueArrayBlocking; + + private Queue queuePersisted; + + private Queue queueMemory; private String path; @@ -60,21 +67,26 @@ public class QueueRWBenchmark { @Setup public void setUp() throws IOException, CloneNotSupportedException { - final Settings settings = settings(); + final Settings settingsPersisted = settings(true); EVENT.setField("Foo", "Bar"); EVENT.setField("Foo1", "Bar1"); EVENT.setField("Foo2", "Bar2"); EVENT.setField("Foo3", "Bar3"); EVENT.setField("Foo4", "Bar4"); - path = settings.getDirPath(); - queue = new Queue(settings); - queue.open(); + path = settingsPersisted.getDirPath(); + queuePersisted = new Queue(settingsPersisted); + queueArrayBlocking = new ArrayBlockingQueue<>(ACK_INTERVAL); + queueMemory = new Queue(settings(false)); + queuePersisted.open(); + queueMemory.open(); exec = Executors.newSingleThreadExecutor(); } @TearDown public void tearDown() throws IOException { - queue.close(); + queuePersisted.close(); + queueMemory.close(); + queueArrayBlocking.clear(); FileUtils.deleteDirectory(new File(path)); exec.shutdownNow(); } @@ -85,16 +97,36 @@ public class QueueRWBenchmark { final Future future = exec.submit(() -> { for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) { try { - final Event evnt = EVENT.clone(); - evnt.setTimestamp(Timestamp.now()); - this.queue.write(evnt); - } catch (final IOException | CloneNotSupportedException ex) { + this.queuePersisted.write(EVENT); + } catch (final IOException ex) { throw new IllegalStateException(ex); } } }); for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) { - try (Batch batch = queue.readBatch(BATCH_SIZE)) { + try (Batch batch = queuePersisted.readBatch(BATCH_SIZE)) { + for (final Queueable elem : batch.getElements()) { + blackhole.consume(elem); + } + } + } + future.get(); + } + + @Benchmark + @OperationsPerInvocation(EVENTS_PER_INVOCATION) + public final void readFromMemoryQueue(final Blackhole blackhole) throws Exception { + final Future future = exec.submit(() -> { + for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) { + try { + this.queueMemory.write(EVENT); + } catch (final IOException ex) { + throw new IllegalStateException(ex); + } + } + }); + for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) { + try (Batch batch = queueMemory.readBatch(BATCH_SIZE)) { for (final Queueable elem : batch.getElements()) { blackhole.consume(elem); } @@ -106,20 +138,17 @@ public class QueueRWBenchmark { @Benchmark @OperationsPerInvocation(EVENTS_PER_INVOCATION) public final void readFromArrayBlockingQueue(final Blackhole blackhole) throws Exception { - final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(ACK_INTERVAL); final Future future = exec.submit(() -> { for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) { try { - final Event evnt = EVENT.clone(); - evnt.setTimestamp(Timestamp.now()); - arrayBlockingQueue.put(evnt); - } catch (final CloneNotSupportedException | InterruptedException ex) { + queueArrayBlocking.put(EVENT); + } catch (final InterruptedException ex) { throw new IllegalStateException(ex); } } }); for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) { - blackhole.consume(arrayBlockingQueue.take()); + blackhole.consume(queueArrayBlocking.take()); } future.get(); } @@ -132,14 +161,23 @@ public class QueueRWBenchmark { new Runner(opt).run(); } - private static Settings settings() { + private static Settings settings(final boolean persisted) { + final PageIOFactory pageIOFactory; + final CheckpointIOFactory checkpointIOFactory; + if (persisted) { + pageIOFactory = MmapPageIO::new; + checkpointIOFactory = FileCheckpointIO::new; + } else { + pageIOFactory = ByteBufferPageIO::new; + checkpointIOFactory = MemoryCheckpointIO::new; + } return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath()) .capacity(256 * 1024 * 1024) .queueMaxBytes(Long.MAX_VALUE) - .elementIOFactory(MmapPageIO::new) + .elementIOFactory(pageIOFactory) .checkpointMaxWrites(ACK_INTERVAL) .checkpointMaxAcks(ACK_INTERVAL) - .checkpointIOFactory(FileCheckpointIO::new) + .checkpointIOFactory(checkpointIOFactory) .elementClass(Event.class).build(); } }