From 1a2c0c8597be1a0fee9b740750d87a4ac95b9ab1 Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 18 May 2017 17:15:14 +0200 Subject: [PATCH] #7053 Refactor Queue Settings Class to Using a Builder Pattern Fixes #7062 --- .../logstash/benchmark/QueueBenchmark.java | 19 +- .../org/logstash/ackedqueue/FileSettings.java | 130 --------- .../logstash/ackedqueue/MemorySettings.java | 132 --------- .../java/org/logstash/ackedqueue/Queue.java | 7 +- .../org/logstash/ackedqueue/Settings.java | 39 +-- .../org/logstash/ackedqueue/SettingsImpl.java | 252 ++++++++++++++++++ .../ext/JrubyAckedQueueExtLibrary.java | 42 ++- .../ext/JrubyAckedQueueMemoryExtLibrary.java | 37 ++- .../org/logstash/ackedqueue/QueueTest.java | 33 +-- .../org/logstash/ackedqueue/TestSettings.java | 28 +- .../ackedqueue/io/MemoryCheckpointTest.java | 19 +- .../java/org/logstash/stress/Concurrent.java | 36 ++- 12 files changed, 365 insertions(+), 409 deletions(-) delete mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/FileSettings.java delete mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/MemorySettings.java create mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java index ecee42843..112e9f47d 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java @@ -7,9 +7,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.logstash.Event; import org.logstash.Timestamp; -import org.logstash.ackedqueue.FileSettings; import org.logstash.ackedqueue.Queue; import org.logstash.ackedqueue.Settings; +import org.logstash.ackedqueue.SettingsImpl; import org.logstash.ackedqueue.io.FileCheckpointIO; import org.logstash.ackedqueue.io.MmapPageIO; import org.openjdk.jmh.annotations.Benchmark; @@ -83,14 +83,13 @@ public class QueueBenchmark { } private static Settings settings() { - Settings s = new FileSettings(Files.createTempDir().getPath()); - s.setCapacity(256 * 1024 * 1024); - s.setQueueMaxBytes(Long.MAX_VALUE); - s.setElementIOFactory(MmapPageIO::new); - s.setCheckpointMaxWrites(50_000); - s.setCheckpointMaxAcks(50_000); - s.setCheckpointIOFactory(FileCheckpointIO::new); - s.setElementClass(Event.class); - return s; + return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath()) + .capacity(256 * 1024 * 1024) + .queueMaxBytes(Long.MAX_VALUE) + .elementIOFactory(MmapPageIO::new) + .checkpointMaxWrites(50_000) + .checkpointMaxAcks(50_000) + .checkpointIOFactory(FileCheckpointIO::new) + .elementClass(Event.class).build(); } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/FileSettings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/FileSettings.java deleted file mode 100644 index 10ccd963a..000000000 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/FileSettings.java +++ /dev/null @@ -1,130 +0,0 @@ -package org.logstash.ackedqueue; - -import org.logstash.ackedqueue.io.CheckpointIOFactory; -import org.logstash.ackedqueue.io.PageIOFactory; - -public class FileSettings implements Settings { - private String dirForFiles; - private CheckpointIOFactory checkpointIOFactory; - private PageIOFactory pageIOFactory; - private Class elementClass; - private int capacity; - private long queueMaxBytes; - private int maxUnread; - private int checkpointMaxAcks; - private int checkpointMaxWrites; - private int checkpointMaxInterval; - - private FileSettings() { this(""); } - - public FileSettings(String dirPath) { - this.dirForFiles = dirPath; - this.maxUnread = 0; - this.checkpointMaxAcks = 1024; - this.checkpointMaxWrites = 1024; - this.checkpointMaxInterval = 1000; // millisec - } - - @Override - public Settings setCheckpointIOFactory(CheckpointIOFactory factory) { - this.checkpointIOFactory = factory; - return this; - } - - @Override - public Settings setElementIOFactory(PageIOFactory factory) { - this.pageIOFactory = factory; - return this; - } - - @Override - public Settings setElementClass(Class elementClass) { - this.elementClass = elementClass; - return this; - } - - @Override - public Settings setQueueMaxBytes(long size) { - this.queueMaxBytes = size; - return this; - } - - @Override - public Settings setCapacity(int capacity) { - this.capacity = capacity; - return this; - } - - @Override - public Settings setMaxUnread(int maxUnread) { - this.maxUnread = maxUnread; - return this; - } - - @Override - public Settings setCheckpointMaxAcks(int checkpointMaxAcks) { - this.checkpointMaxAcks = checkpointMaxAcks; - return this; - } - - @Override - public Settings setCheckpointMaxWrites(int checkpointMaxWrites) { - this.checkpointMaxWrites = checkpointMaxWrites; - return this; - } - - @Override - public Settings setCheckpointMaxInterval(int checkpointMaxInterval) { - this.checkpointMaxInterval = checkpointMaxInterval; - return this; - } - - @Override - public int getCheckpointMaxAcks() { - return checkpointMaxAcks; - } - - @Override - public int getCheckpointMaxWrites() { - return checkpointMaxWrites; - } - - @Override - public int getCheckpointMaxInterval() { - return checkpointMaxInterval; - } - - @Override - public CheckpointIOFactory getCheckpointIOFactory() { - return checkpointIOFactory; - } - - public PageIOFactory getPageIOFactory() { - return pageIOFactory; - } - - @Override - public Class getElementClass() { - return this.elementClass; - } - - @Override - public String getDirPath() { - return dirForFiles; - } - - @Override - public long getQueueMaxBytes() { - return queueMaxBytes; - } - - @Override - public int getCapacity() { - return capacity; - } - - @Override - public int getMaxUnread() { - return this.maxUnread; - } -} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/MemorySettings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/MemorySettings.java deleted file mode 100644 index d0b5503c6..000000000 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/MemorySettings.java +++ /dev/null @@ -1,132 +0,0 @@ -package org.logstash.ackedqueue; - -import org.logstash.ackedqueue.io.CheckpointIOFactory; -import org.logstash.ackedqueue.io.PageIOFactory; - -public class MemorySettings implements Settings { - private CheckpointIOFactory checkpointIOFactory; - private PageIOFactory pageIOFactory; - private Class elementClass; - private int capacity; - private long queueMaxBytes; - private final String dirPath; - private int maxUnread; - private int checkpointMaxAcks; - private int checkpointMaxWrites; - private int checkpointMaxInterval; - - public MemorySettings() { - this(""); - } - - public MemorySettings(String dirPath) { - this.dirPath = dirPath; - this.maxUnread = 0; - this.checkpointMaxAcks = 1; - this.checkpointMaxWrites = 1; - this.checkpointMaxInterval = 0; - } - - @Override - public Settings setCheckpointIOFactory(CheckpointIOFactory factory) { - this.checkpointIOFactory = factory; - return this; - } - - @Override - public Settings setElementIOFactory(PageIOFactory factory) { - this.pageIOFactory = factory; - return this; - } - - @Override - public Settings setElementClass(Class elementClass) { - this.elementClass = elementClass; - return this; - } - - @Override - public Settings setCapacity(int capacity) { - this.capacity = capacity; - return this; - } - - @Override - public Settings setQueueMaxBytes(long size) { - this.queueMaxBytes = size; - return this; - } - - @Override - public Settings setMaxUnread(int maxUnread) { - this.maxUnread = maxUnread; - return this; - } - - @Override - public Settings setCheckpointMaxAcks(int checkpointMaxAcks) { - this.checkpointMaxAcks = checkpointMaxAcks; - return this; - } - - @Override - public Settings setCheckpointMaxWrites(int checkpointMaxWrites) { - this.checkpointMaxWrites = checkpointMaxWrites; - return this; - } - - @Override - public Settings setCheckpointMaxInterval(int checkpointMaxInterval) { - this.checkpointMaxInterval = checkpointMaxInterval; - return this; - } - - @Override - public int getCheckpointMaxAcks() { - return checkpointMaxAcks; - } - - @Override - public int getCheckpointMaxWrites() { - return checkpointMaxWrites; - } - - @Override - public int getCheckpointMaxInterval() { - return checkpointMaxInterval; - } - - @Override - public CheckpointIOFactory getCheckpointIOFactory() { - return checkpointIOFactory; - } - - public PageIOFactory getPageIOFactory() { - return pageIOFactory; - } - - @Override - public Class getElementClass() { - return this.elementClass; - } - - @Override - public String getDirPath() { - return this.dirPath; - } - - @Override - public long getQueueMaxBytes() { - return this.queueMaxBytes; - } - - @Override - public int getCapacity() { - return this.capacity; - } - - @Override - public int getMaxUnread() { - return this.maxUnread; - } -} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index c4903a117..7225dfd22 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -61,7 +61,6 @@ public class Queue implements Closeable { private final int maxUnread; private final int checkpointMaxAcks; private final int checkpointMaxWrites; - private final int checkpointMaxInterval; private final AtomicBoolean closed; @@ -90,12 +89,11 @@ public class Queue implements Closeable { settings.getElementClass(), settings.getMaxUnread(), settings.getCheckpointMaxWrites(), - settings.getCheckpointMaxAcks(), - settings.getCheckpointMaxInterval() + settings.getCheckpointMaxAcks() ); } - public Queue(String dirPath, int pageCapacity, long maxBytes, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) { + private Queue(String dirPath, int pageCapacity, long maxBytes, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks) { this.dirPath = dirPath; this.pageCapacity = pageCapacity; this.maxBytes = maxBytes; @@ -109,7 +107,6 @@ public class Queue implements Closeable { this.maxUnread = maxUnread; this.checkpointMaxAcks = checkpointMaxAcks; this.checkpointMaxWrites = checkpointMaxWrites; - this.checkpointMaxInterval = checkpointMaxInterval; this.unreadCount = 0; this.currentByteSize = 0; diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java index 386358459..8a40aa521 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java @@ -4,23 +4,6 @@ import org.logstash.ackedqueue.io.CheckpointIOFactory; import org.logstash.ackedqueue.io.PageIOFactory; public interface Settings { - Settings setCheckpointIOFactory(CheckpointIOFactory factory); - - Settings setElementIOFactory(PageIOFactory factory); - - Settings setElementClass(Class elementClass); - - Settings setCapacity(int capacity); - - Settings setQueueMaxBytes(long size); - - Settings setMaxUnread(int maxUnread); - - Settings setCheckpointMaxAcks(int checkpointMaxAcks); - - Settings setCheckpointMaxWrites(int checkpointMaxWrites); - - Settings setCheckpointMaxInterval(int checkpointMaxInterval); CheckpointIOFactory getCheckpointIOFactory(); @@ -39,6 +22,26 @@ public interface Settings { int getCheckpointMaxAcks(); int getCheckpointMaxWrites(); + + interface Builder { - int getCheckpointMaxInterval(); + Builder checkpointIOFactory(CheckpointIOFactory factory); + + Builder elementIOFactory(PageIOFactory factory); + + Builder elementClass(Class elementClass); + + Builder capacity(int capacity); + + Builder queueMaxBytes(long size); + + Builder maxUnread(int maxUnread); + + Builder checkpointMaxAcks(int checkpointMaxAcks); + + Builder checkpointMaxWrites(int checkpointMaxWrites); + + Settings build(); + + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java new file mode 100644 index 000000000..abc7b1e98 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java @@ -0,0 +1,252 @@ +package org.logstash.ackedqueue; + +import org.logstash.ackedqueue.io.CheckpointIOFactory; +import org.logstash.ackedqueue.io.PageIOFactory; + +public class SettingsImpl implements Settings { + private String dirForFiles; + private CheckpointIOFactory checkpointIOFactory; + private PageIOFactory pageIOFactory; + private Class elementClass; + private int capacity; + private long queueMaxBytes; + private int maxUnread; + private int checkpointMaxAcks; + private int checkpointMaxWrites; + + public static Builder builder(final Settings settings) { + return new BuilderImpl(settings.getDirPath(), + settings.getCheckpointIOFactory(), + settings.getPageIOFactory(), settings.getElementClass(), settings.getCapacity(), + settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(), + settings.getCheckpointMaxWrites() + ); + } + + public static Builder fileSettingsBuilder(final String dirForFiles) { + return new BuilderImpl(dirForFiles); + } + + public static Builder memorySettingsBuilder() { + return memorySettingsBuilder(""); + } + + public static Builder memorySettingsBuilder(final String dirForFiles) { + return new BuilderImpl(dirForFiles).checkpointMaxAcks(1) + .checkpointMaxWrites(1); + } + + private SettingsImpl(final String dirForFiles, + final CheckpointIOFactory checkpointIOFactory, + final PageIOFactory pageIOFactory, final Class elementClass, final int capacity, + final long queueMaxBytes, final int maxUnread, final int checkpointMaxAcks, + final int checkpointMaxWrites) { + this.dirForFiles = dirForFiles; + this.checkpointIOFactory = checkpointIOFactory; + this.pageIOFactory = pageIOFactory; + this.elementClass = elementClass; + this.capacity = capacity; + this.queueMaxBytes = queueMaxBytes; + this.maxUnread = maxUnread; + this.checkpointMaxAcks = checkpointMaxAcks; + this.checkpointMaxWrites = checkpointMaxWrites; + } + + @Override + public int getCheckpointMaxAcks() { + return checkpointMaxAcks; + } + + @Override + public int getCheckpointMaxWrites() { + return checkpointMaxWrites; + } + + @Override + public CheckpointIOFactory getCheckpointIOFactory() { + return checkpointIOFactory; + } + + public PageIOFactory getPageIOFactory() { + return pageIOFactory; + } + + @Override + public Class getElementClass() { + return this.elementClass; + } + + @Override + public String getDirPath() { + return dirForFiles; + } + + @Override + public long getQueueMaxBytes() { + return queueMaxBytes; + } + + @Override + public int getCapacity() { + return capacity; + } + + @Override + public int getMaxUnread() { + return this.maxUnread; + } + + private static final class BuilderImpl implements Builder { + + /** + * The default Queue has a capacity of 0 events, meaning infinite capacity. + * todo: Remove the ability to set infinite capacity. + */ + private static final int DEFAULT_CAPACITY = 0; + + /** + * The default Queue has a capacity of 0 bytes, meaning infinite capacity. + * todo: Remove the ability to set infinite capacity. + */ + private static final long DEFAULT_MAX_QUEUE_BYTES = 0L; + + /** + * The default max unread count 0, meaning infinite. + * todo: Remove the ability to set infinite capacity. + */ + private static final int DEFAULT_MAX_UNREAD = 0; + + /** + * Default max number of writes after which we checkpoint. + */ + private static final int DEFAULT_CHECKPOINT_MAX_ACKS = 1024; + + /** + * Default number of acknowledgements after which we checkpoint. + */ + private static final int DEFAULT_CHECKPOINT_MAX_WRITES = 1024; + + private final String dirForFiles; + + private final CheckpointIOFactory checkpointIOFactory; + + private final PageIOFactory pageIOFactory; + + private final Class elementClass; + + private final int capacity; + + private final long queueMaxBytes; + + private final int maxUnread; + + private final int checkpointMaxAcks; + + private final int checkpointMaxWrites; + + private BuilderImpl(final String dirForFiles) { + this(dirForFiles, null, null, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES, + DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES + ); + } + + private BuilderImpl(final String dirForFiles, + final CheckpointIOFactory checkpointIOFactory, + final PageIOFactory pageIOFactory, final Class elementClass, final int capacity, + final long queueMaxBytes, final int maxUnread, final int checkpointMaxAcks, + final int checkpointMaxWrites) { + this.dirForFiles = dirForFiles; + this.checkpointIOFactory = checkpointIOFactory; + this.pageIOFactory = pageIOFactory; + this.elementClass = elementClass; + this.capacity = capacity; + this.queueMaxBytes = queueMaxBytes; + this.maxUnread = maxUnread; + this.checkpointMaxAcks = checkpointMaxAcks; + this.checkpointMaxWrites = checkpointMaxWrites; + } + + @Override + public Builder checkpointIOFactory(final CheckpointIOFactory factory) { + return new BuilderImpl( + this.dirForFiles, factory, this.pageIOFactory, this.elementClass, this.capacity, + this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + + @Override + public Builder elementIOFactory(final PageIOFactory factory) { + return new BuilderImpl( + this.dirForFiles, this.checkpointIOFactory, factory, this.elementClass, + this.capacity, + this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + + @Override + public Builder elementClass(final Class elementClass) { + return new BuilderImpl( + this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, elementClass, + this.capacity, + this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + + @Override + public Builder capacity(final int capacity) { + return new BuilderImpl( + this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass, + capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + + @Override + public Builder queueMaxBytes(final long size) { + return new BuilderImpl( + this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass, + this.capacity, size, this.maxUnread, this.checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + + @Override + public Builder maxUnread(final int maxUnread) { + return new BuilderImpl( + this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass, + this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + + @Override + public Builder checkpointMaxAcks(final int checkpointMaxAcks) { + return new BuilderImpl( + this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass, + this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + + @Override + public Builder checkpointMaxWrites(final int checkpointMaxWrites) { + return new BuilderImpl( + this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass, + this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks, + checkpointMaxWrites + ); + } + + @Override + public Settings build() { + return new SettingsImpl( + this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass, + this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks, + this.checkpointMaxWrites + ); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java index 406dd1d33..6e17f2fdf 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java @@ -1,13 +1,12 @@ package org.logstash.ackedqueue.ext; -import org.logstash.Event; -import org.logstash.ext.JrubyEventExtLibrary; +import java.io.IOException; import org.jruby.Ruby; +import org.jruby.RubyBoolean; import org.jruby.RubyClass; import org.jruby.RubyFixnum; import org.jruby.RubyModule; import org.jruby.RubyObject; -import org.jruby.RubyBoolean; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.Arity; @@ -15,16 +14,13 @@ import org.jruby.runtime.ObjectAllocator; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.jruby.runtime.load.Library; +import org.logstash.Event; import org.logstash.ackedqueue.Batch; -import org.logstash.ackedqueue.FileSettings; import org.logstash.ackedqueue.Queue; -import org.logstash.ackedqueue.Settings; -import org.logstash.ackedqueue.io.CheckpointIOFactory; +import org.logstash.ackedqueue.SettingsImpl; import org.logstash.ackedqueue.io.FileCheckpointIO; import org.logstash.ackedqueue.io.MmapPageIO; -import org.logstash.ackedqueue.io.PageIOFactory; - -import java.io.IOException; +import org.logstash.ext.JrubyEventExtLibrary; public class JrubyAckedQueueExtLibrary implements Library { @@ -66,24 +62,20 @@ public class JrubyAckedQueueExtLibrary implements Library { int maxUnread = RubyFixnum.num2int(args[2]); int checkpointMaxAcks = RubyFixnum.num2int(args[3]); int checkpointMaxWrites = RubyFixnum.num2int(args[4]); - int checkpointMaxInterval = RubyFixnum.num2int(args[5]); long queueMaxBytes = RubyFixnum.num2long(args[6]); - Settings s = new FileSettings(args[0].asJavaString()); - PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path); - CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source); - s.setCapacity(capacity); - s.setMaxUnread(maxUnread); - s.setQueueMaxBytes(queueMaxBytes); - s.setCheckpointMaxAcks(checkpointMaxAcks); - s.setCheckpointMaxWrites(checkpointMaxWrites); - s.setCheckpointMaxInterval(checkpointMaxInterval); - s.setElementIOFactory(pageIOFactory); - s.setCheckpointIOFactory(checkpointIOFactory); - s.setElementClass(Event.class); - - this.queue = new Queue(s); - + this.queue = new Queue( + SettingsImpl.fileSettingsBuilder(args[0].asJavaString()) + .capacity(capacity) + .maxUnread(maxUnread) + .queueMaxBytes(queueMaxBytes) + .checkpointMaxAcks(checkpointMaxAcks) + .checkpointMaxWrites(checkpointMaxWrites) + .elementIOFactory(MmapPageIO::new) + .checkpointIOFactory(FileCheckpointIO::new) + .elementClass(Event.class) + .build() + ); return context.nil; } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java index 342b5551c..830681333 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java @@ -1,13 +1,12 @@ package org.logstash.ackedqueue.ext; -import org.logstash.Event; -import org.logstash.ext.JrubyEventExtLibrary; +import java.io.IOException; import org.jruby.Ruby; +import org.jruby.RubyBoolean; import org.jruby.RubyClass; import org.jruby.RubyFixnum; import org.jruby.RubyModule; import org.jruby.RubyObject; -import org.jruby.RubyBoolean; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.Arity; @@ -15,16 +14,13 @@ import org.jruby.runtime.ObjectAllocator; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.jruby.runtime.load.Library; +import org.logstash.Event; import org.logstash.ackedqueue.Batch; -import org.logstash.ackedqueue.MemorySettings; import org.logstash.ackedqueue.Queue; -import org.logstash.ackedqueue.Settings; +import org.logstash.ackedqueue.SettingsImpl; import org.logstash.ackedqueue.io.ByteBufferPageIO; -import org.logstash.ackedqueue.io.CheckpointIOFactory; import org.logstash.ackedqueue.io.MemoryCheckpointIO; -import org.logstash.ackedqueue.io.PageIOFactory; - -import java.io.IOException; +import org.logstash.ext.JrubyEventExtLibrary; public class JrubyAckedQueueMemoryExtLibrary implements Library { @@ -66,19 +62,16 @@ public class JrubyAckedQueueMemoryExtLibrary implements Library { int capacity = RubyFixnum.num2int(args[1]); int maxUnread = RubyFixnum.num2int(args[2]); long queueMaxBytes = RubyFixnum.num2long(args[3]); - - Settings s = new MemorySettings(args[0].asJavaString()); - PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path); - CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source); - s.setCapacity(capacity); - s.setMaxUnread(maxUnread); - s.setQueueMaxBytes(queueMaxBytes); - s.setElementIOFactory(pageIOFactory); - s.setCheckpointIOFactory(checkpointIOFactory); - s.setElementClass(Event.class); - - this.queue = new Queue(s); - + this.queue = new Queue( + SettingsImpl.memorySettingsBuilder(args[0].asJavaString()) + .capacity(capacity) + .maxUnread(maxUnread) + .queueMaxBytes(queueMaxBytes) + .elementIOFactory(ByteBufferPageIO::new) + .checkpointIOFactory(MemoryCheckpointIO::new) + .elementClass(Event.class) + .build() + ); return context.nil; } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index b58aad0c4..f7bd516ff 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -1,12 +1,5 @@ package org.logstash.ackedqueue; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import static org.junit.Assert.fail; -import org.junit.rules.TemporaryFolder; -import org.logstash.ackedqueue.io.ByteBufferPageIO; - import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; @@ -21,12 +14,17 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.logstash.ackedqueue.io.ByteBufferPageIO; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; public class QueueTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -200,8 +198,10 @@ public class QueueTest { List elements2 = Arrays.asList(new StringElement("foobarbaz3"), new StringElement("foobarbaz4")); int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements1.get(0).serialize().length); - Settings settings = TestSettings.volatileQueueSettings(2 * singleElementCapacity); - settings.setCheckpointMaxWrites(1024); // arbitrary high enough threshold so that it's not reached (default for TestSettings is 1) + Settings settings = SettingsImpl.builder( + TestSettings.volatileQueueSettings(2 * singleElementCapacity) + ).checkpointMaxWrites(1024) // arbitrary high enough threshold so that it's not reached (default for TestSettings is 1) + .build(); TestQueue q = new TestQueue(settings); q.open(); @@ -330,8 +330,10 @@ public class QueueTest { Queueable element = new StringElement("foobarbaz"); int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length); - Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity); - settings.setMaxUnread(2); // 2 so we know the first write should not block and the second should + Settings settings = SettingsImpl.builder( + TestSettings.volatileQueueSettings(singleElementCapacity) + ).maxUnread(2) // 2 so we know the first write should not block and the second should + .build(); TestQueue q = new TestQueue(settings); q.open(); @@ -380,9 +382,10 @@ public class QueueTest { Queueable element = new StringElement("foobarbaz"); // TODO: add randomized testing on the page size (but must be > single element size) - Settings settings = TestSettings.volatileQueueSettings(256); // 256 is arbitrary, large enough to hold a few elements - - settings.setMaxUnread(2); // 2 so we know the first write should not block and the second should + Settings settings = SettingsImpl.builder( + TestSettings.volatileQueueSettings(256) // 256 is arbitrary, large enough to hold a few elements + ).maxUnread(2) + .build(); // 2 so we know the first write should not block and the second should TestQueue q = new TestQueue(settings); q.open(); diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/TestSettings.java b/logstash-core/src/test/java/org/logstash/ackedqueue/TestSettings.java index ada4c80c6..dac914b43 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/TestSettings.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/TestSettings.java @@ -11,38 +11,26 @@ public class TestSettings { public static Settings volatileQueueSettings(int capacity) { MemoryCheckpointIO.clearSources(); - Settings s = new MemorySettings(); PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path); CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source); - s.setCapacity(capacity); - s.setElementIOFactory(pageIOFactory); - s.setCheckpointIOFactory(checkpointIOFactory); - s.setElementClass(StringElement.class); - return s; + return SettingsImpl.memorySettingsBuilder().capacity(capacity).elementIOFactory(pageIOFactory) + .checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build(); } public static Settings volatileQueueSettings(int capacity, long size) { MemoryCheckpointIO.clearSources(); - Settings s = new MemorySettings(); PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new ByteBufferPageIO(pageNum, pageSize, path); CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source); - s.setCapacity(capacity); - s.setQueueMaxBytes(size); - s.setElementIOFactory(pageIOFactory); - s.setCheckpointIOFactory(checkpointIOFactory); - s.setElementClass(StringElement.class); - return s; + return SettingsImpl.memorySettingsBuilder().capacity(capacity).queueMaxBytes(size) + .elementIOFactory(pageIOFactory).checkpointIOFactory(checkpointIOFactory) + .elementClass(StringElement.class).build(); } public static Settings persistedQueueSettings(int capacity, String folder) { - Settings s = new FileSettings(folder); PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path); CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source); - s.setCapacity(capacity); - s.setElementIOFactory(pageIOFactory); - s.setCheckpointMaxWrites(1); - s.setCheckpointIOFactory(checkpointIOFactory); - s.setElementClass(StringElement.class); - return s; + return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity).elementIOFactory(pageIOFactory) + .checkpointMaxWrites(1).checkpointIOFactory(checkpointIOFactory) + .elementClass(StringElement.class).build(); } } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/io/MemoryCheckpointTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/io/MemoryCheckpointTest.java index a2bcd9b41..4acf329d9 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/io/MemoryCheckpointTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/io/MemoryCheckpointTest.java @@ -1,17 +1,12 @@ package org.logstash.ackedqueue.io; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.fail; -import org.logstash.ackedqueue.Checkpoint; -import org.logstash.ackedqueue.MemorySettings; -import org.logstash.ackedqueue.Settings; -import org.logstash.ackedqueue.io.CheckpointIO; -import org.logstash.ackedqueue.io.CheckpointIOFactory; -import org.logstash.ackedqueue.io.MemoryCheckpointIO; - import java.io.IOException; import java.nio.file.NoSuchFileException; +import org.junit.Before; +import org.junit.Test; +import org.logstash.ackedqueue.Checkpoint; +import org.logstash.ackedqueue.SettingsImpl; +import org.logstash.ackedqueue.Settings; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -23,9 +18,9 @@ public class MemoryCheckpointTest { @Before public void setUp() { - Settings settings = new MemorySettings(); CheckpointIOFactory factory = (dirPath) -> new MemoryCheckpointIO(dirPath); - settings.setCheckpointIOFactory(factory); + Settings settings = + SettingsImpl.memorySettingsBuilder().checkpointIOFactory(factory).build(); this.io = settings.getCheckpointIOFactory().build(settings.getDirPath()); } diff --git a/logstash-core/src/test/java/org/logstash/stress/Concurrent.java b/logstash-core/src/test/java/org/logstash/stress/Concurrent.java index 68c0d5168..3fc97fbae 100644 --- a/logstash-core/src/test/java/org/logstash/stress/Concurrent.java +++ b/logstash-core/src/test/java/org/logstash/stress/Concurrent.java @@ -1,13 +1,5 @@ package org.logstash.stress; -import org.logstash.ackedqueue.*; -import org.logstash.ackedqueue.io.ByteBufferPageIO; -import org.logstash.ackedqueue.io.CheckpointIOFactory; -import org.logstash.ackedqueue.io.FileCheckpointIO; -import org.logstash.ackedqueue.io.MemoryCheckpointIO; -import org.logstash.ackedqueue.io.MmapPageIO; -import org.logstash.ackedqueue.io.PageIOFactory; - import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -17,6 +9,17 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; +import org.logstash.ackedqueue.Batch; +import org.logstash.ackedqueue.SettingsImpl; +import org.logstash.ackedqueue.Queue; +import org.logstash.ackedqueue.Settings; +import org.logstash.ackedqueue.StringElement; +import org.logstash.ackedqueue.io.ByteBufferPageIO; +import org.logstash.ackedqueue.io.CheckpointIOFactory; +import org.logstash.ackedqueue.io.FileCheckpointIO; +import org.logstash.ackedqueue.io.MemoryCheckpointIO; +import org.logstash.ackedqueue.io.MmapPageIO; +import org.logstash.ackedqueue.io.PageIOFactory; public class Concurrent { final static int ELEMENT_COUNT = 2000000; @@ -24,25 +27,18 @@ public class Concurrent { static Settings settings; public static Settings memorySettings(int capacity) { - Settings s = new MemorySettings(); PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path); CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source); - s.setCapacity(capacity); - s.setElementIOFactory(pageIOFactory); - s.setCheckpointIOFactory(checkpointIOFactory); - s.setElementClass(StringElement.class); - return s; + return SettingsImpl.memorySettingsBuilder().capacity(capacity).elementIOFactory(pageIOFactory) + .checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build(); } public static Settings fileSettings(int capacity) { - Settings s = new MemorySettings("/tmp/queue"); PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path); CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source); - s.setCapacity(capacity); - s.setElementIOFactory(pageIOFactory); - s.setCheckpointIOFactory(checkpointIOFactory); - s.setElementClass(StringElement.class); - return s; + return SettingsImpl.memorySettingsBuilder("/tmp/queue").capacity(capacity) + .elementIOFactory(pageIOFactory) + .checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build(); } public static Thread producer(Queue q, List input) {