#7053 Refactor Queue Settings Class to Using a Builder Pattern

Fixes #7062
This commit is contained in:
Armin 2017-05-18 17:15:14 +02:00 committed by Armin Braun
parent 78f70634ed
commit 1a2c0c8597
12 changed files with 365 additions and 409 deletions

View file

@ -7,9 +7,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.logstash.Event;
import org.logstash.Timestamp;
import org.logstash.ackedqueue.FileSettings;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.openjdk.jmh.annotations.Benchmark;
@ -83,14 +83,13 @@ public class QueueBenchmark {
}
private static Settings settings() {
Settings s = new FileSettings(Files.createTempDir().getPath());
s.setCapacity(256 * 1024 * 1024);
s.setQueueMaxBytes(Long.MAX_VALUE);
s.setElementIOFactory(MmapPageIO::new);
s.setCheckpointMaxWrites(50_000);
s.setCheckpointMaxAcks(50_000);
s.setCheckpointIOFactory(FileCheckpointIO::new);
s.setElementClass(Event.class);
return s;
return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath())
.capacity(256 * 1024 * 1024)
.queueMaxBytes(Long.MAX_VALUE)
.elementIOFactory(MmapPageIO::new)
.checkpointMaxWrites(50_000)
.checkpointMaxAcks(50_000)
.checkpointIOFactory(FileCheckpointIO::new)
.elementClass(Event.class).build();
}
}

View file

@ -1,130 +0,0 @@
package org.logstash.ackedqueue;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.PageIOFactory;
public class FileSettings implements Settings {
private String dirForFiles;
private CheckpointIOFactory checkpointIOFactory;
private PageIOFactory pageIOFactory;
private Class elementClass;
private int capacity;
private long queueMaxBytes;
private int maxUnread;
private int checkpointMaxAcks;
private int checkpointMaxWrites;
private int checkpointMaxInterval;
private FileSettings() { this(""); }
public FileSettings(String dirPath) {
this.dirForFiles = dirPath;
this.maxUnread = 0;
this.checkpointMaxAcks = 1024;
this.checkpointMaxWrites = 1024;
this.checkpointMaxInterval = 1000; // millisec
}
@Override
public Settings setCheckpointIOFactory(CheckpointIOFactory factory) {
this.checkpointIOFactory = factory;
return this;
}
@Override
public Settings setElementIOFactory(PageIOFactory factory) {
this.pageIOFactory = factory;
return this;
}
@Override
public Settings setElementClass(Class elementClass) {
this.elementClass = elementClass;
return this;
}
@Override
public Settings setQueueMaxBytes(long size) {
this.queueMaxBytes = size;
return this;
}
@Override
public Settings setCapacity(int capacity) {
this.capacity = capacity;
return this;
}
@Override
public Settings setMaxUnread(int maxUnread) {
this.maxUnread = maxUnread;
return this;
}
@Override
public Settings setCheckpointMaxAcks(int checkpointMaxAcks) {
this.checkpointMaxAcks = checkpointMaxAcks;
return this;
}
@Override
public Settings setCheckpointMaxWrites(int checkpointMaxWrites) {
this.checkpointMaxWrites = checkpointMaxWrites;
return this;
}
@Override
public Settings setCheckpointMaxInterval(int checkpointMaxInterval) {
this.checkpointMaxInterval = checkpointMaxInterval;
return this;
}
@Override
public int getCheckpointMaxAcks() {
return checkpointMaxAcks;
}
@Override
public int getCheckpointMaxWrites() {
return checkpointMaxWrites;
}
@Override
public int getCheckpointMaxInterval() {
return checkpointMaxInterval;
}
@Override
public CheckpointIOFactory getCheckpointIOFactory() {
return checkpointIOFactory;
}
public PageIOFactory getPageIOFactory() {
return pageIOFactory;
}
@Override
public Class getElementClass() {
return this.elementClass;
}
@Override
public String getDirPath() {
return dirForFiles;
}
@Override
public long getQueueMaxBytes() {
return queueMaxBytes;
}
@Override
public int getCapacity() {
return capacity;
}
@Override
public int getMaxUnread() {
return this.maxUnread;
}
}

View file

@ -1,132 +0,0 @@
package org.logstash.ackedqueue;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.PageIOFactory;
public class MemorySettings implements Settings {
private CheckpointIOFactory checkpointIOFactory;
private PageIOFactory pageIOFactory;
private Class elementClass;
private int capacity;
private long queueMaxBytes;
private final String dirPath;
private int maxUnread;
private int checkpointMaxAcks;
private int checkpointMaxWrites;
private int checkpointMaxInterval;
public MemorySettings() {
this("");
}
public MemorySettings(String dirPath) {
this.dirPath = dirPath;
this.maxUnread = 0;
this.checkpointMaxAcks = 1;
this.checkpointMaxWrites = 1;
this.checkpointMaxInterval = 0;
}
@Override
public Settings setCheckpointIOFactory(CheckpointIOFactory factory) {
this.checkpointIOFactory = factory;
return this;
}
@Override
public Settings setElementIOFactory(PageIOFactory factory) {
this.pageIOFactory = factory;
return this;
}
@Override
public Settings setElementClass(Class elementClass) {
this.elementClass = elementClass;
return this;
}
@Override
public Settings setCapacity(int capacity) {
this.capacity = capacity;
return this;
}
@Override
public Settings setQueueMaxBytes(long size) {
this.queueMaxBytes = size;
return this;
}
@Override
public Settings setMaxUnread(int maxUnread) {
this.maxUnread = maxUnread;
return this;
}
@Override
public Settings setCheckpointMaxAcks(int checkpointMaxAcks) {
this.checkpointMaxAcks = checkpointMaxAcks;
return this;
}
@Override
public Settings setCheckpointMaxWrites(int checkpointMaxWrites) {
this.checkpointMaxWrites = checkpointMaxWrites;
return this;
}
@Override
public Settings setCheckpointMaxInterval(int checkpointMaxInterval) {
this.checkpointMaxInterval = checkpointMaxInterval;
return this;
}
@Override
public int getCheckpointMaxAcks() {
return checkpointMaxAcks;
}
@Override
public int getCheckpointMaxWrites() {
return checkpointMaxWrites;
}
@Override
public int getCheckpointMaxInterval() {
return checkpointMaxInterval;
}
@Override
public CheckpointIOFactory getCheckpointIOFactory() {
return checkpointIOFactory;
}
public PageIOFactory getPageIOFactory() {
return pageIOFactory;
}
@Override
public Class getElementClass() {
return this.elementClass;
}
@Override
public String getDirPath() {
return this.dirPath;
}
@Override
public long getQueueMaxBytes() {
return this.queueMaxBytes;
}
@Override
public int getCapacity() {
return this.capacity;
}
@Override
public int getMaxUnread() {
return this.maxUnread;
}
}

View file

@ -61,7 +61,6 @@ public class Queue implements Closeable {
private final int maxUnread;
private final int checkpointMaxAcks;
private final int checkpointMaxWrites;
private final int checkpointMaxInterval;
private final AtomicBoolean closed;
@ -90,12 +89,11 @@ public class Queue implements Closeable {
settings.getElementClass(),
settings.getMaxUnread(),
settings.getCheckpointMaxWrites(),
settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxInterval()
settings.getCheckpointMaxAcks()
);
}
public Queue(String dirPath, int pageCapacity, long maxBytes, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
private Queue(String dirPath, int pageCapacity, long maxBytes, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks) {
this.dirPath = dirPath;
this.pageCapacity = pageCapacity;
this.maxBytes = maxBytes;
@ -109,7 +107,6 @@ public class Queue implements Closeable {
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
this.checkpointMaxInterval = checkpointMaxInterval;
this.unreadCount = 0;
this.currentByteSize = 0;

View file

@ -4,23 +4,6 @@ import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.PageIOFactory;
public interface Settings {
Settings setCheckpointIOFactory(CheckpointIOFactory factory);
Settings setElementIOFactory(PageIOFactory factory);
Settings setElementClass(Class elementClass);
Settings setCapacity(int capacity);
Settings setQueueMaxBytes(long size);
Settings setMaxUnread(int maxUnread);
Settings setCheckpointMaxAcks(int checkpointMaxAcks);
Settings setCheckpointMaxWrites(int checkpointMaxWrites);
Settings setCheckpointMaxInterval(int checkpointMaxInterval);
CheckpointIOFactory getCheckpointIOFactory();
@ -40,5 +23,25 @@ public interface Settings {
int getCheckpointMaxWrites();
int getCheckpointMaxInterval();
interface Builder {
Builder checkpointIOFactory(CheckpointIOFactory factory);
Builder elementIOFactory(PageIOFactory factory);
Builder elementClass(Class elementClass);
Builder capacity(int capacity);
Builder queueMaxBytes(long size);
Builder maxUnread(int maxUnread);
Builder checkpointMaxAcks(int checkpointMaxAcks);
Builder checkpointMaxWrites(int checkpointMaxWrites);
Settings build();
}
}

View file

@ -0,0 +1,252 @@
package org.logstash.ackedqueue;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.PageIOFactory;
public class SettingsImpl implements Settings {
private String dirForFiles;
private CheckpointIOFactory checkpointIOFactory;
private PageIOFactory pageIOFactory;
private Class elementClass;
private int capacity;
private long queueMaxBytes;
private int maxUnread;
private int checkpointMaxAcks;
private int checkpointMaxWrites;
public static Builder builder(final Settings settings) {
return new BuilderImpl(settings.getDirPath(),
settings.getCheckpointIOFactory(),
settings.getPageIOFactory(), settings.getElementClass(), settings.getCapacity(),
settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxWrites()
);
}
public static Builder fileSettingsBuilder(final String dirForFiles) {
return new BuilderImpl(dirForFiles);
}
public static Builder memorySettingsBuilder() {
return memorySettingsBuilder("");
}
public static Builder memorySettingsBuilder(final String dirForFiles) {
return new BuilderImpl(dirForFiles).checkpointMaxAcks(1)
.checkpointMaxWrites(1);
}
private SettingsImpl(final String dirForFiles,
final CheckpointIOFactory checkpointIOFactory,
final PageIOFactory pageIOFactory, final Class elementClass, final int capacity,
final long queueMaxBytes, final int maxUnread, final int checkpointMaxAcks,
final int checkpointMaxWrites) {
this.dirForFiles = dirForFiles;
this.checkpointIOFactory = checkpointIOFactory;
this.pageIOFactory = pageIOFactory;
this.elementClass = elementClass;
this.capacity = capacity;
this.queueMaxBytes = queueMaxBytes;
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
}
@Override
public int getCheckpointMaxAcks() {
return checkpointMaxAcks;
}
@Override
public int getCheckpointMaxWrites() {
return checkpointMaxWrites;
}
@Override
public CheckpointIOFactory getCheckpointIOFactory() {
return checkpointIOFactory;
}
public PageIOFactory getPageIOFactory() {
return pageIOFactory;
}
@Override
public Class getElementClass() {
return this.elementClass;
}
@Override
public String getDirPath() {
return dirForFiles;
}
@Override
public long getQueueMaxBytes() {
return queueMaxBytes;
}
@Override
public int getCapacity() {
return capacity;
}
@Override
public int getMaxUnread() {
return this.maxUnread;
}
private static final class BuilderImpl implements Builder {
/**
* The default Queue has a capacity of 0 events, meaning infinite capacity.
* todo: Remove the ability to set infinite capacity.
*/
private static final int DEFAULT_CAPACITY = 0;
/**
* The default Queue has a capacity of 0 bytes, meaning infinite capacity.
* todo: Remove the ability to set infinite capacity.
*/
private static final long DEFAULT_MAX_QUEUE_BYTES = 0L;
/**
* The default max unread count 0, meaning infinite.
* todo: Remove the ability to set infinite capacity.
*/
private static final int DEFAULT_MAX_UNREAD = 0;
/**
* Default max number of writes after which we checkpoint.
*/
private static final int DEFAULT_CHECKPOINT_MAX_ACKS = 1024;
/**
* Default number of acknowledgements after which we checkpoint.
*/
private static final int DEFAULT_CHECKPOINT_MAX_WRITES = 1024;
private final String dirForFiles;
private final CheckpointIOFactory checkpointIOFactory;
private final PageIOFactory pageIOFactory;
private final Class elementClass;
private final int capacity;
private final long queueMaxBytes;
private final int maxUnread;
private final int checkpointMaxAcks;
private final int checkpointMaxWrites;
private BuilderImpl(final String dirForFiles) {
this(dirForFiles, null, null, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES
);
}
private BuilderImpl(final String dirForFiles,
final CheckpointIOFactory checkpointIOFactory,
final PageIOFactory pageIOFactory, final Class elementClass, final int capacity,
final long queueMaxBytes, final int maxUnread, final int checkpointMaxAcks,
final int checkpointMaxWrites) {
this.dirForFiles = dirForFiles;
this.checkpointIOFactory = checkpointIOFactory;
this.pageIOFactory = pageIOFactory;
this.elementClass = elementClass;
this.capacity = capacity;
this.queueMaxBytes = queueMaxBytes;
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
}
@Override
public Builder checkpointIOFactory(final CheckpointIOFactory factory) {
return new BuilderImpl(
this.dirForFiles, factory, this.pageIOFactory, this.elementClass, this.capacity,
this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder elementIOFactory(final PageIOFactory factory) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, factory, this.elementClass,
this.capacity,
this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder elementClass(final Class elementClass) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, elementClass,
this.capacity,
this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder capacity(final int capacity) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder queueMaxBytes(final long size) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.capacity, size, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder maxUnread(final int maxUnread) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder checkpointMaxAcks(final int checkpointMaxAcks) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder checkpointMaxWrites(final int checkpointMaxWrites) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
checkpointMaxWrites
);
}
@Override
public Settings build() {
return new SettingsImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
}
}

View file

@ -1,13 +1,12 @@
package org.logstash.ackedqueue.ext;
import org.logstash.Event;
import org.logstash.ext.JrubyEventExtLibrary;
import java.io.IOException;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyBoolean;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
@ -15,16 +14,13 @@ import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.load.Library;
import org.logstash.Event;
import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.FileSettings;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
import java.io.IOException;
import org.logstash.ext.JrubyEventExtLibrary;
public class JrubyAckedQueueExtLibrary implements Library {
@ -66,24 +62,20 @@ public class JrubyAckedQueueExtLibrary implements Library {
int maxUnread = RubyFixnum.num2int(args[2]);
int checkpointMaxAcks = RubyFixnum.num2int(args[3]);
int checkpointMaxWrites = RubyFixnum.num2int(args[4]);
int checkpointMaxInterval = RubyFixnum.num2int(args[5]);
long queueMaxBytes = RubyFixnum.num2long(args[6]);
Settings s = new FileSettings(args[0].asJavaString());
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
s.setCapacity(capacity);
s.setMaxUnread(maxUnread);
s.setQueueMaxBytes(queueMaxBytes);
s.setCheckpointMaxAcks(checkpointMaxAcks);
s.setCheckpointMaxWrites(checkpointMaxWrites);
s.setCheckpointMaxInterval(checkpointMaxInterval);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(Event.class);
this.queue = new Queue(s);
this.queue = new Queue(
SettingsImpl.fileSettingsBuilder(args[0].asJavaString())
.capacity(capacity)
.maxUnread(maxUnread)
.queueMaxBytes(queueMaxBytes)
.checkpointMaxAcks(checkpointMaxAcks)
.checkpointMaxWrites(checkpointMaxWrites)
.elementIOFactory(MmapPageIO::new)
.checkpointIOFactory(FileCheckpointIO::new)
.elementClass(Event.class)
.build()
);
return context.nil;
}

View file

@ -1,13 +1,12 @@
package org.logstash.ackedqueue.ext;
import org.logstash.Event;
import org.logstash.ext.JrubyEventExtLibrary;
import java.io.IOException;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyBoolean;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
@ -15,16 +14,13 @@ import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.load.Library;
import org.logstash.Event;
import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.MemorySettings;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.io.ByteBufferPageIO;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
import org.logstash.ackedqueue.io.PageIOFactory;
import java.io.IOException;
import org.logstash.ext.JrubyEventExtLibrary;
public class JrubyAckedQueueMemoryExtLibrary implements Library {
@ -66,19 +62,16 @@ public class JrubyAckedQueueMemoryExtLibrary implements Library {
int capacity = RubyFixnum.num2int(args[1]);
int maxUnread = RubyFixnum.num2int(args[2]);
long queueMaxBytes = RubyFixnum.num2long(args[3]);
Settings s = new MemorySettings(args[0].asJavaString());
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
s.setCapacity(capacity);
s.setMaxUnread(maxUnread);
s.setQueueMaxBytes(queueMaxBytes);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(Event.class);
this.queue = new Queue(s);
this.queue = new Queue(
SettingsImpl.memorySettingsBuilder(args[0].asJavaString())
.capacity(capacity)
.maxUnread(maxUnread)
.queueMaxBytes(queueMaxBytes)
.elementIOFactory(ByteBufferPageIO::new)
.checkpointIOFactory(MemoryCheckpointIO::new)
.elementClass(Event.class)
.build()
);
return context.nil;
}

View file

@ -1,12 +1,5 @@
package org.logstash.ackedqueue;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.fail;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.io.ByteBufferPageIO;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
@ -21,12 +14,17 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.io.ByteBufferPageIO;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
public class QueueTest {
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -200,8 +198,10 @@ public class QueueTest {
List<Queueable> elements2 = Arrays.asList(new StringElement("foobarbaz3"), new StringElement("foobarbaz4"));
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements1.get(0).serialize().length);
Settings settings = TestSettings.volatileQueueSettings(2 * singleElementCapacity);
settings.setCheckpointMaxWrites(1024); // arbitrary high enough threshold so that it's not reached (default for TestSettings is 1)
Settings settings = SettingsImpl.builder(
TestSettings.volatileQueueSettings(2 * singleElementCapacity)
).checkpointMaxWrites(1024) // arbitrary high enough threshold so that it's not reached (default for TestSettings is 1)
.build();
TestQueue q = new TestQueue(settings);
q.open();
@ -330,8 +330,10 @@ public class QueueTest {
Queueable element = new StringElement("foobarbaz");
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity);
settings.setMaxUnread(2); // 2 so we know the first write should not block and the second should
Settings settings = SettingsImpl.builder(
TestSettings.volatileQueueSettings(singleElementCapacity)
).maxUnread(2) // 2 so we know the first write should not block and the second should
.build();
TestQueue q = new TestQueue(settings);
q.open();
@ -380,9 +382,10 @@ public class QueueTest {
Queueable element = new StringElement("foobarbaz");
// TODO: add randomized testing on the page size (but must be > single element size)
Settings settings = TestSettings.volatileQueueSettings(256); // 256 is arbitrary, large enough to hold a few elements
settings.setMaxUnread(2); // 2 so we know the first write should not block and the second should
Settings settings = SettingsImpl.builder(
TestSettings.volatileQueueSettings(256) // 256 is arbitrary, large enough to hold a few elements
).maxUnread(2)
.build(); // 2 so we know the first write should not block and the second should
TestQueue q = new TestQueue(settings);
q.open();

View file

@ -11,38 +11,26 @@ public class TestSettings {
public static Settings volatileQueueSettings(int capacity) {
MemoryCheckpointIO.clearSources();
Settings s = new MemorySettings();
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
s.setCapacity(capacity);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(StringElement.class);
return s;
return SettingsImpl.memorySettingsBuilder().capacity(capacity).elementIOFactory(pageIOFactory)
.checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build();
}
public static Settings volatileQueueSettings(int capacity, long size) {
MemoryCheckpointIO.clearSources();
Settings s = new MemorySettings();
PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new ByteBufferPageIO(pageNum, pageSize, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
s.setCapacity(capacity);
s.setQueueMaxBytes(size);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(StringElement.class);
return s;
return SettingsImpl.memorySettingsBuilder().capacity(capacity).queueMaxBytes(size)
.elementIOFactory(pageIOFactory).checkpointIOFactory(checkpointIOFactory)
.elementClass(StringElement.class).build();
}
public static Settings persistedQueueSettings(int capacity, String folder) {
Settings s = new FileSettings(folder);
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
s.setCapacity(capacity);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointMaxWrites(1);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(StringElement.class);
return s;
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity).elementIOFactory(pageIOFactory)
.checkpointMaxWrites(1).checkpointIOFactory(checkpointIOFactory)
.elementClass(StringElement.class).build();
}
}

View file

@ -1,17 +1,12 @@
package org.logstash.ackedqueue.io;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.fail;
import org.logstash.ackedqueue.Checkpoint;
import org.logstash.ackedqueue.MemorySettings;
import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.io.CheckpointIO;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import org.junit.Before;
import org.junit.Test;
import org.logstash.ackedqueue.Checkpoint;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.Settings;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -23,9 +18,9 @@ public class MemoryCheckpointTest {
@Before
public void setUp() {
Settings settings = new MemorySettings();
CheckpointIOFactory factory = (dirPath) -> new MemoryCheckpointIO(dirPath);
settings.setCheckpointIOFactory(factory);
Settings settings =
SettingsImpl.memorySettingsBuilder().checkpointIOFactory(factory).build();
this.io = settings.getCheckpointIOFactory().build(settings.getDirPath());
}

View file

@ -1,13 +1,5 @@
package org.logstash.stress;
import org.logstash.ackedqueue.*;
import org.logstash.ackedqueue.io.ByteBufferPageIO;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
@ -17,6 +9,17 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.StringElement;
import org.logstash.ackedqueue.io.ByteBufferPageIO;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
public class Concurrent {
final static int ELEMENT_COUNT = 2000000;
@ -24,25 +27,18 @@ public class Concurrent {
static Settings settings;
public static Settings memorySettings(int capacity) {
Settings s = new MemorySettings();
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
s.setCapacity(capacity);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(StringElement.class);
return s;
return SettingsImpl.memorySettingsBuilder().capacity(capacity).elementIOFactory(pageIOFactory)
.checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build();
}
public static Settings fileSettings(int capacity) {
Settings s = new MemorySettings("/tmp/queue");
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
s.setCapacity(capacity);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(StringElement.class);
return s;
return SettingsImpl.memorySettingsBuilder("/tmp/queue").capacity(capacity)
.elementIOFactory(pageIOFactory)
.checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build();
}
public static Thread producer(Queue q, List<StringElement> input) {