mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
54fa8d06a0
commit
b6e051acbc
2 changed files with 123 additions and 2 deletions
|
@ -0,0 +1,121 @@
|
|||
package org.logstash.benchmark;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.Timestamp;
|
||||
import org.logstash.ackedqueue.Batch;
|
||||
import org.logstash.ackedqueue.Queue;
|
||||
import org.logstash.ackedqueue.Queueable;
|
||||
import org.logstash.ackedqueue.Settings;
|
||||
import org.logstash.ackedqueue.SettingsImpl;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MmapPageIO;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OperationsPerInvocation;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.TearDown;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
import org.openjdk.jmh.runner.Runner;
|
||||
import org.openjdk.jmh.runner.RunnerException;
|
||||
import org.openjdk.jmh.runner.options.Options;
|
||||
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
||||
|
||||
@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS)
|
||||
@Measurement(iterations = 10, time = 100, timeUnit = TimeUnit.MILLISECONDS)
|
||||
@Fork(1)
|
||||
@BenchmarkMode(Mode.Throughput)
|
||||
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||
@State(Scope.Thread)
|
||||
public class QueueRWBenchmark {
|
||||
|
||||
private static final int EVENTS_PER_INVOCATION = 500_000;
|
||||
|
||||
private static final int BATCH_SIZE = 100;
|
||||
|
||||
private static final Event EVENT = new Event();
|
||||
|
||||
private Queue queue;
|
||||
|
||||
private String path;
|
||||
|
||||
private ExecutorService exec;
|
||||
|
||||
@Setup
|
||||
public void setUp() throws IOException, CloneNotSupportedException {
|
||||
final Settings settings = settings();
|
||||
EVENT.setField("Foo", "Bar");
|
||||
EVENT.setField("Foo1", "Bar1");
|
||||
EVENT.setField("Foo2", "Bar2");
|
||||
EVENT.setField("Foo3", "Bar3");
|
||||
EVENT.setField("Foo4", "Bar4");
|
||||
path = settings.getDirPath();
|
||||
queue = new Queue(settings);
|
||||
queue.open();
|
||||
exec = Executors.newSingleThreadExecutor();
|
||||
}
|
||||
|
||||
@TearDown
|
||||
public void tearDown() throws IOException {
|
||||
queue.close();
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
exec.shutdownNow();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
|
||||
public final void readFromPersistedQueue(final Blackhole blackhole) throws Exception {
|
||||
final Future<?> future = exec.submit(() -> {
|
||||
for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) {
|
||||
try {
|
||||
final Event evnt = EVENT.clone();
|
||||
evnt.setTimestamp(Timestamp.now());
|
||||
this.queue.write(evnt);
|
||||
} catch (final IOException | CloneNotSupportedException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) {
|
||||
try (Batch batch = queue.readBatch(BATCH_SIZE)) {
|
||||
for (final Queueable elem : batch.getElements()) {
|
||||
blackhole.consume(elem);
|
||||
}
|
||||
}
|
||||
}
|
||||
future.get();
|
||||
}
|
||||
|
||||
public static void main(final String... args) throws RunnerException {
|
||||
Options opt = new OptionsBuilder()
|
||||
.include(QueueRWBenchmark.class.getSimpleName())
|
||||
.forks(2)
|
||||
.build();
|
||||
new Runner(opt).run();
|
||||
}
|
||||
|
||||
private static Settings settings() {
|
||||
return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath())
|
||||
.capacity(256 * 1024 * 1024)
|
||||
.queueMaxBytes(Long.MAX_VALUE)
|
||||
.elementIOFactory(MmapPageIO::new)
|
||||
.checkpointMaxWrites(1024)
|
||||
.checkpointMaxAcks(1024)
|
||||
.checkpointIOFactory(FileCheckpointIO::new)
|
||||
.elementClass(Event.class).build();
|
||||
}
|
||||
}
|
|
@ -35,7 +35,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
|
|||
@BenchmarkMode(Mode.Throughput)
|
||||
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||
@State(Scope.Thread)
|
||||
public class QueueBenchmark {
|
||||
public class QueueWriteBenchmark {
|
||||
|
||||
private static final int EVENTS_PER_INVOCATION = 500_000;
|
||||
|
||||
|
@ -76,7 +76,7 @@ public class QueueBenchmark {
|
|||
|
||||
public static void main(final String... args) throws RunnerException {
|
||||
Options opt = new OptionsBuilder()
|
||||
.include(QueueBenchmark.class.getSimpleName())
|
||||
.include(QueueWriteBenchmark.class.getSimpleName())
|
||||
.forks(2)
|
||||
.build();
|
||||
new Runner(opt).run();
|
Loading…
Add table
Add a link
Reference in a new issue