mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
19a317b5c4
commit
26fe96c038
9 changed files with 21 additions and 78 deletions
|
@ -15,8 +15,6 @@ import org.logstash.ackedqueue.Queue;
|
|||
import org.logstash.ackedqueue.Queueable;
|
||||
import org.logstash.ackedqueue.Settings;
|
||||
import org.logstash.ackedqueue.SettingsImpl;
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
|
@ -119,13 +117,11 @@ public class QueueRWBenchmark {
|
|||
}
|
||||
|
||||
private static Settings settings() {
|
||||
final CheckpointIOFactory checkpointIOFactory = FileCheckpointIO::new;
|
||||
return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath())
|
||||
.capacity(256 * 1024 * 1024)
|
||||
.queueMaxBytes(Long.MAX_VALUE)
|
||||
.checkpointMaxWrites(ACK_INTERVAL)
|
||||
.checkpointMaxAcks(ACK_INTERVAL)
|
||||
.checkpointIOFactory(checkpointIOFactory)
|
||||
.elementClass(Event.class).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import org.logstash.Timestamp;
|
|||
import org.logstash.ackedqueue.Queue;
|
||||
import org.logstash.ackedqueue.Settings;
|
||||
import org.logstash.ackedqueue.SettingsImpl;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
|
@ -75,7 +74,6 @@ public class QueueWriteBenchmark {
|
|||
.queueMaxBytes(Long.MAX_VALUE)
|
||||
.checkpointMaxWrites(1024)
|
||||
.checkpointMaxAcks(1024)
|
||||
.checkpointIOFactory(FileCheckpointIO::new)
|
||||
.elementClass(Event.class).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.logstash.FileLockFactory;
|
||||
import org.logstash.LockException;
|
||||
import org.logstash.ackedqueue.io.CheckpointIO;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
import org.logstash.ackedqueue.io.MmapPageIO;
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
|
@ -74,7 +75,7 @@ public final class Queue implements Closeable {
|
|||
this.dirPath = settings.getDirPath();
|
||||
this.pageCapacity = settings.getCapacity();
|
||||
this.maxBytes = settings.getQueueMaxBytes();
|
||||
this.checkpointIO = settings.getCheckpointIOFactory().build(dirPath);
|
||||
this.checkpointIO = new FileCheckpointIO(dirPath);
|
||||
this.elementClass = settings.getElementClass();
|
||||
this.tailPages = new ArrayList<>();
|
||||
this.unreadTailPages = new ArrayList<>();
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
|
||||
public interface Settings {
|
||||
|
||||
CheckpointIOFactory getCheckpointIOFactory();
|
||||
|
||||
Class<? extends Queueable> getElementClass();
|
||||
|
||||
String getDirPath();
|
||||
|
@ -22,8 +18,6 @@ public interface Settings {
|
|||
|
||||
interface Builder {
|
||||
|
||||
Builder checkpointIOFactory(CheckpointIOFactory factory);
|
||||
|
||||
Builder elementClass(Class<? extends Queueable> elementClass);
|
||||
|
||||
Builder capacity(int capacity);
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
|
||||
public class SettingsImpl implements Settings {
|
||||
private String dirForFiles;
|
||||
private CheckpointIOFactory checkpointIOFactory;
|
||||
private Class<? extends Queueable> elementClass;
|
||||
private int capacity;
|
||||
private long queueMaxBytes;
|
||||
|
@ -13,8 +10,7 @@ public class SettingsImpl implements Settings {
|
|||
private int checkpointMaxWrites;
|
||||
|
||||
public static Builder builder(final Settings settings) {
|
||||
return new BuilderImpl(settings.getDirPath(),
|
||||
settings.getCheckpointIOFactory(), settings.getElementClass(), settings.getCapacity(),
|
||||
return new BuilderImpl(settings.getDirPath(), settings.getElementClass(), settings.getCapacity(),
|
||||
settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
|
||||
settings.getCheckpointMaxWrites()
|
||||
);
|
||||
|
@ -24,12 +20,10 @@ public class SettingsImpl implements Settings {
|
|||
return new BuilderImpl(dirForFiles);
|
||||
}
|
||||
|
||||
private SettingsImpl(final String dirForFiles, final CheckpointIOFactory checkpointIOFactory,
|
||||
final Class<? extends Queueable> elementClass,
|
||||
private SettingsImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
|
||||
final int capacity, final long queueMaxBytes, final int maxUnread,
|
||||
final int checkpointMaxAcks, final int checkpointMaxWrites) {
|
||||
this.dirForFiles = dirForFiles;
|
||||
this.checkpointIOFactory = checkpointIOFactory;
|
||||
this.elementClass = elementClass;
|
||||
this.capacity = capacity;
|
||||
this.queueMaxBytes = queueMaxBytes;
|
||||
|
@ -48,11 +42,6 @@ public class SettingsImpl implements Settings {
|
|||
return checkpointMaxWrites;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckpointIOFactory getCheckpointIOFactory() {
|
||||
return checkpointIOFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Queueable> getElementClass() {
|
||||
return this.elementClass;
|
||||
|
@ -110,8 +99,6 @@ public class SettingsImpl implements Settings {
|
|||
|
||||
private final String dirForFiles;
|
||||
|
||||
private final CheckpointIOFactory checkpointIOFactory;
|
||||
|
||||
private final Class<? extends Queueable> elementClass;
|
||||
|
||||
private final int capacity;
|
||||
|
@ -125,17 +112,15 @@ public class SettingsImpl implements Settings {
|
|||
private final int checkpointMaxWrites;
|
||||
|
||||
private BuilderImpl(final String dirForFiles) {
|
||||
this(dirForFiles, null, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
|
||||
this(dirForFiles, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
|
||||
DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES
|
||||
);
|
||||
}
|
||||
|
||||
private BuilderImpl(final String dirForFiles, final CheckpointIOFactory checkpointIOFactory,
|
||||
final Class<? extends Queueable> elementClass,
|
||||
private BuilderImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
|
||||
final int capacity, final long queueMaxBytes, final int maxUnread,
|
||||
final int checkpointMaxAcks, final int checkpointMaxWrites) {
|
||||
this.dirForFiles = dirForFiles;
|
||||
this.checkpointIOFactory = checkpointIOFactory;
|
||||
this.elementClass = elementClass;
|
||||
this.capacity = capacity;
|
||||
this.queueMaxBytes = queueMaxBytes;
|
||||
|
@ -144,20 +129,11 @@ public class SettingsImpl implements Settings {
|
|||
this.checkpointMaxWrites = checkpointMaxWrites;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder checkpointIOFactory(final CheckpointIOFactory factory) {
|
||||
return new BuilderImpl(
|
||||
this.dirForFiles, factory, this.elementClass, this.capacity,
|
||||
this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
|
||||
this.checkpointMaxWrites
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder elementClass(final Class<? extends Queueable> elementClass) {
|
||||
return new BuilderImpl(
|
||||
this.dirForFiles, this.checkpointIOFactory, elementClass,
|
||||
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
|
||||
this.dirForFiles, elementClass, this.capacity, this.queueMaxBytes, this.maxUnread,
|
||||
this.checkpointMaxAcks,
|
||||
this.checkpointMaxWrites
|
||||
);
|
||||
}
|
||||
|
@ -165,25 +141,23 @@ public class SettingsImpl implements Settings {
|
|||
@Override
|
||||
public Builder capacity(final int capacity) {
|
||||
return new BuilderImpl(
|
||||
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
|
||||
capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
|
||||
this.checkpointMaxWrites
|
||||
this.dirForFiles, this.elementClass, capacity, this.queueMaxBytes, this.maxUnread,
|
||||
this.checkpointMaxAcks, this.checkpointMaxWrites
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder queueMaxBytes(final long size) {
|
||||
return new BuilderImpl(
|
||||
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
|
||||
this.capacity, size, this.maxUnread, this.checkpointMaxAcks,
|
||||
this.checkpointMaxWrites
|
||||
this.dirForFiles, this.elementClass, this.capacity, size, this.maxUnread,
|
||||
this.checkpointMaxAcks, this.checkpointMaxWrites
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder maxUnread(final int maxUnread) {
|
||||
return new BuilderImpl(
|
||||
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
|
||||
this.dirForFiles, this.elementClass,
|
||||
this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks,
|
||||
this.checkpointMaxWrites
|
||||
);
|
||||
|
@ -192,7 +166,7 @@ public class SettingsImpl implements Settings {
|
|||
@Override
|
||||
public Builder checkpointMaxAcks(final int checkpointMaxAcks) {
|
||||
return new BuilderImpl(
|
||||
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
|
||||
this.dirForFiles, this.elementClass,
|
||||
this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks,
|
||||
this.checkpointMaxWrites
|
||||
);
|
||||
|
@ -201,18 +175,16 @@ public class SettingsImpl implements Settings {
|
|||
@Override
|
||||
public Builder checkpointMaxWrites(final int checkpointMaxWrites) {
|
||||
return new BuilderImpl(
|
||||
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
|
||||
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
|
||||
checkpointMaxWrites
|
||||
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
|
||||
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings build() {
|
||||
return new SettingsImpl(
|
||||
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
|
||||
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
|
||||
this.checkpointMaxWrites
|
||||
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
|
||||
this.maxUnread, this.checkpointMaxAcks, this.checkpointMaxWrites
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import org.logstash.RubyUtil;
|
|||
import org.logstash.ackedqueue.Batch;
|
||||
import org.logstash.ackedqueue.Queue;
|
||||
import org.logstash.ackedqueue.SettingsImpl;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
@JRubyClass(name = "AckedQueue")
|
||||
|
@ -49,7 +48,6 @@ public final class JRubyAckedQueueExt extends RubyObject {
|
|||
.queueMaxBytes(queueMaxBytes)
|
||||
.checkpointMaxAcks(checkpointMaxAcks)
|
||||
.checkpointMaxWrites(checkpointMaxWrites)
|
||||
.checkpointIOFactory(FileCheckpointIO::new)
|
||||
.elementClass(Event.class)
|
||||
.build()
|
||||
);
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
package org.logstash.ackedqueue.io;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface CheckpointIOFactory {
|
||||
CheckpointIO build(String dirPath);
|
||||
}
|
|
@ -1,21 +1,14 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
|
||||
public class TestSettings {
|
||||
|
||||
public static Settings persistedQueueSettings(int capacity, String folder) {
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
|
||||
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity)
|
||||
.checkpointMaxWrites(1).checkpointIOFactory(checkpointIOFactory)
|
||||
.elementClass(StringElement.class).build();
|
||||
.checkpointMaxWrites(1).elementClass(StringElement.class).build();
|
||||
}
|
||||
|
||||
public static Settings persistedQueueSettings(int capacity, long size, String folder) {
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
|
||||
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity)
|
||||
.queueMaxBytes(size).checkpointMaxWrites(1).checkpointIOFactory(checkpointIOFactory)
|
||||
.elementClass(StringElement.class).build();
|
||||
.queueMaxBytes(size).elementClass(StringElement.class).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,6 @@ import org.logstash.ackedqueue.SettingsImpl;
|
|||
import org.logstash.ackedqueue.Queue;
|
||||
import org.logstash.ackedqueue.Settings;
|
||||
import org.logstash.ackedqueue.StringElement;
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
|
||||
public class Concurrent {
|
||||
final static int ELEMENT_COUNT = 2000000;
|
||||
|
@ -24,9 +22,8 @@ public class Concurrent {
|
|||
static Settings settings;
|
||||
|
||||
public static Settings fileSettings(int capacity) {
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
|
||||
return SettingsImpl.fileSettingsBuilder("/tmp/queue").capacity(capacity)
|
||||
.checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build();
|
||||
.elementClass(StringElement.class).build();
|
||||
}
|
||||
|
||||
public static Thread producer(Queue q, List<StringElement> input) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue