CLEANUP: Remove redundant PageIOFactory

Fixes #9012
This commit is contained in:
Armin 2018-01-23 11:25:25 +01:00 committed by Armin Braun
parent 54da9f9024
commit 0a1a761442
10 changed files with 36 additions and 90 deletions

View file

@ -17,8 +17,6 @@ import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -121,14 +119,10 @@ public class QueueRWBenchmark {
}
private static Settings settings() {
final PageIOFactory pageIOFactory;
final CheckpointIOFactory checkpointIOFactory;
pageIOFactory = MmapPageIO::new;
checkpointIOFactory = FileCheckpointIO::new;
final CheckpointIOFactory checkpointIOFactory = FileCheckpointIO::new;
return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath())
.capacity(256 * 1024 * 1024)
.queueMaxBytes(Long.MAX_VALUE)
.elementIOFactory(pageIOFactory)
.checkpointMaxWrites(ACK_INTERVAL)
.checkpointMaxAcks(ACK_INTERVAL)
.checkpointIOFactory(checkpointIOFactory)

View file

@ -11,7 +11,6 @@ import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -74,7 +73,6 @@ public class QueueWriteBenchmark {
return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath())
.capacity(256 * 1024 * 1024)
.queueMaxBytes(Long.MAX_VALUE)
.elementIOFactory(MmapPageIO::new)
.checkpointMaxWrites(1024)
.checkpointMaxAcks(1024)
.checkpointIOFactory(FileCheckpointIO::new)

View file

@ -1,14 +1,5 @@
package org.logstash.ackedqueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.FileLockFactory;
import org.logstash.LockException;
import org.logstash.ackedqueue.io.CheckpointIO;
import org.logstash.ackedqueue.io.LongVector;
import org.logstash.ackedqueue.io.PageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@ -24,6 +15,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.FileLockFactory;
import org.logstash.LockException;
import org.logstash.ackedqueue.io.CheckpointIO;
import org.logstash.ackedqueue.io.LongVector;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIO;
public final class Queue implements Closeable {
@ -47,7 +46,6 @@ public final class Queue implements Closeable {
private volatile long currentByteSize;
private final CheckpointIO checkpointIO;
private final PageIOFactory pageIOFactory;
private final int pageCapacity;
private final long maxBytes;
private final String dirPath;
@ -77,7 +75,6 @@ public final class Queue implements Closeable {
this.pageCapacity = settings.getCapacity();
this.maxBytes = settings.getQueueMaxBytes();
this.checkpointIO = settings.getCheckpointIOFactory().build(dirPath);
this.pageIOFactory = settings.getPageIOFactory();
this.elementClass = settings.getElementClass();
this.tailPages = new ArrayList<>();
this.unreadTailPages = new ArrayList<>();
@ -180,7 +177,7 @@ public final class Queue implements Closeable {
logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString());
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
PageIO pageIO = new MmapPageIO(pageNum, this.pageCapacity, this.dirPath);
// important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case
// we can purge it and we don't care about its integrity for example if it is of zero-byte file size.
if (cp.isFullyAcked()) {
@ -201,7 +198,7 @@ public final class Queue implements Closeable {
logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString());
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
PageIO pageIO = new MmapPageIO(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
@ -308,7 +305,7 @@ public final class Queue implements Closeable {
* @throws IOException
*/
private void newCheckpointedHeadpage(int pageNum) throws IOException {
PageIO headPageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
PageIO headPageIO = new MmapPageIO(pageNum, this.pageCapacity, this.dirPath);
headPageIO.create();
this.headPage = PageFactory.newHeadPage(pageNum, this, headPageIO);
this.headPage.forceCheckpoint();
@ -363,7 +360,7 @@ public final class Queue implements Closeable {
long seqNum = this.seqNum += 1;
this.headPage.write(data, seqNum, this.checkpointMaxWrites);
this.unreadCount++;
notEmpty.signal();
// now check if we reached a queue full state and block here until it is not full
@ -420,14 +417,14 @@ public final class Queue implements Closeable {
* <p>Checks if the Queue is full, with "full" defined as either of:</p>
* <p>Assuming a maximum size of the queue larger than 0 is defined:</p>
* <ul>
* <li>The sum of the size of all allocated pages is more than the allowed maximum Queue
* <li>The sum of the size of all allocated pages is more than the allowed maximum Queue
* size</li>
* <li>The sum of the size of all allocated pages equal to the allowed maximum Queue size
* <li>The sum of the size of all allocated pages equal to the allowed maximum Queue size
* and the current head page has no remaining capacity.</li>
* </ul>
* <p>or assuming a max unread count larger than 0, is defined "full" is also defined as:</p>
* <ul>
* <li>The current number of unread events exceeds or is equal to the configured maximum
* <li>The current number of unread events exceeds or is equal to the configured maximum
* number of allowed unread events.</li>
* </ul>
* @return True iff the queue is full
@ -719,7 +716,7 @@ public final class Queue implements Closeable {
public CheckpointIO getCheckpointIO() {
return this.checkpointIO;
}
/**
* deserialize a byte array into the required element class.
*

View file

@ -1,14 +1,11 @@
package org.logstash.ackedqueue;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.PageIOFactory;
public interface Settings {
CheckpointIOFactory getCheckpointIOFactory();
PageIOFactory getPageIOFactory();
Class<? extends Queueable> getElementClass();
String getDirPath();
@ -22,13 +19,11 @@ public interface Settings {
int getCheckpointMaxAcks();
int getCheckpointMaxWrites();
interface Builder {
Builder checkpointIOFactory(CheckpointIOFactory factory);
Builder elementIOFactory(PageIOFactory factory);
Builder elementClass(Class<? extends Queueable> elementClass);
Builder capacity(int capacity);
@ -40,7 +35,7 @@ public interface Settings {
Builder checkpointMaxAcks(int checkpointMaxAcks);
Builder checkpointMaxWrites(int checkpointMaxWrites);
Settings build();
}

View file

@ -1,12 +1,10 @@
package org.logstash.ackedqueue;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.PageIOFactory;
public class SettingsImpl implements Settings {
private String dirForFiles;
private CheckpointIOFactory checkpointIOFactory;
private PageIOFactory pageIOFactory;
private Class<? extends Queueable> elementClass;
private int capacity;
private long queueMaxBytes;
@ -16,8 +14,7 @@ public class SettingsImpl implements Settings {
public static Builder builder(final Settings settings) {
return new BuilderImpl(settings.getDirPath(),
settings.getCheckpointIOFactory(),
settings.getPageIOFactory(), settings.getElementClass(), settings.getCapacity(),
settings.getCheckpointIOFactory(), settings.getElementClass(), settings.getCapacity(),
settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxWrites()
);
@ -28,12 +25,11 @@ public class SettingsImpl implements Settings {
}
private SettingsImpl(final String dirForFiles, final CheckpointIOFactory checkpointIOFactory,
final PageIOFactory pageIOFactory, final Class<? extends Queueable> elementClass,
final Class<? extends Queueable> elementClass,
final int capacity, final long queueMaxBytes, final int maxUnread,
final int checkpointMaxAcks, final int checkpointMaxWrites) {
this.dirForFiles = dirForFiles;
this.checkpointIOFactory = checkpointIOFactory;
this.pageIOFactory = pageIOFactory;
this.elementClass = elementClass;
this.capacity = capacity;
this.queueMaxBytes = queueMaxBytes;
@ -57,10 +53,6 @@ public class SettingsImpl implements Settings {
return checkpointIOFactory;
}
public PageIOFactory getPageIOFactory() {
return pageIOFactory;
}
@Override
public Class<? extends Queueable> getElementClass() {
return this.elementClass;
@ -120,8 +112,6 @@ public class SettingsImpl implements Settings {
private final CheckpointIOFactory checkpointIOFactory;
private final PageIOFactory pageIOFactory;
private final Class<? extends Queueable> elementClass;
private final int capacity;
@ -135,18 +125,17 @@ public class SettingsImpl implements Settings {
private final int checkpointMaxWrites;
private BuilderImpl(final String dirForFiles) {
this(dirForFiles, null, null, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
this(dirForFiles, null, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES
);
}
private BuilderImpl(final String dirForFiles, final CheckpointIOFactory checkpointIOFactory,
final PageIOFactory pageIOFactory, final Class<? extends Queueable> elementClass,
final Class<? extends Queueable> elementClass,
final int capacity, final long queueMaxBytes, final int maxUnread,
final int checkpointMaxAcks, final int checkpointMaxWrites) {
this.dirForFiles = dirForFiles;
this.checkpointIOFactory = checkpointIOFactory;
this.pageIOFactory = pageIOFactory;
this.elementClass = elementClass;
this.capacity = capacity;
this.queueMaxBytes = queueMaxBytes;
@ -158,17 +147,7 @@ public class SettingsImpl implements Settings {
@Override
public Builder checkpointIOFactory(final CheckpointIOFactory factory) {
return new BuilderImpl(
this.dirForFiles, factory, this.pageIOFactory, this.elementClass, this.capacity,
this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@Override
public Builder elementIOFactory(final PageIOFactory factory) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, factory, this.elementClass,
this.capacity,
this.dirForFiles, factory, this.elementClass, this.capacity,
this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
@ -177,9 +156,8 @@ public class SettingsImpl implements Settings {
@Override
public Builder elementClass(final Class<? extends Queueable> elementClass) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, elementClass,
this.capacity,
this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.dirForFiles, this.checkpointIOFactory, elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
}
@ -187,7 +165,7 @@ public class SettingsImpl implements Settings {
@Override
public Builder capacity(final int capacity) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
@ -196,7 +174,7 @@ public class SettingsImpl implements Settings {
@Override
public Builder queueMaxBytes(final long size) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
this.capacity, size, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
@ -205,7 +183,7 @@ public class SettingsImpl implements Settings {
@Override
public Builder maxUnread(final int maxUnread) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);
@ -214,7 +192,7 @@ public class SettingsImpl implements Settings {
@Override
public Builder checkpointMaxAcks(final int checkpointMaxAcks) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks,
this.checkpointMaxWrites
);
@ -223,7 +201,7 @@ public class SettingsImpl implements Settings {
@Override
public Builder checkpointMaxWrites(final int checkpointMaxWrites) {
return new BuilderImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
checkpointMaxWrites
);
@ -232,7 +210,7 @@ public class SettingsImpl implements Settings {
@Override
public Settings build() {
return new SettingsImpl(
this.dirForFiles, this.checkpointIOFactory, this.pageIOFactory, this.elementClass,
this.dirForFiles, this.checkpointIOFactory, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
);

View file

@ -17,7 +17,6 @@ import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ext.JrubyEventExtLibrary;
@JRubyClass(name = "AckedQueue")
@ -50,7 +49,6 @@ public final class JRubyAckedQueueExt extends RubyObject {
.queueMaxBytes(queueMaxBytes)
.checkpointMaxAcks(checkpointMaxAcks)
.checkpointMaxWrites(checkpointMaxWrites)
.elementIOFactory(MmapPageIO::new)
.checkpointIOFactory(FileCheckpointIO::new)
.elementClass(Event.class)
.build()

View file

@ -1,6 +0,0 @@
package org.logstash.ackedqueue.io;
@FunctionalInterface
public interface PageIOFactory {
PageIO build(int pageNum, int capacity, String dirPath);
}

View file

@ -6,6 +6,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIO;
import static org.hamcrest.CoreMatchers.equalTo;
@ -31,8 +32,7 @@ public class HeadPageTest {
// Close method on Page requires an instance of Queue that has already been opened.
try (Queue q = new Queue(s)) {
q.open();
PageIO pageIO = s.getPageIOFactory()
.build(0, 100, dataPath);
PageIO pageIO = new MmapPageIO(0, 100, dataPath);
pageIO.create();
try (final Page p = PageFactory.newHeadPage(0, q, pageIO)) {
assertThat(p.getPageNum(), is(equalTo(0)));

View file

@ -2,23 +2,19 @@ package org.logstash.ackedqueue;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
public class TestSettings {
public static Settings persistedQueueSettings(int capacity, String folder) {
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity).elementIOFactory(pageIOFactory)
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity)
.checkpointMaxWrites(1).checkpointIOFactory(checkpointIOFactory)
.elementClass(StringElement.class).build();
}
public static Settings persistedQueueSettings(int capacity, long size, String folder) {
PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new MmapPageIO(pageNum, pageSize, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity).elementIOFactory(pageIOFactory)
return SettingsImpl.fileSettingsBuilder(folder).capacity(capacity)
.queueMaxBytes(size).checkpointMaxWrites(1).checkpointIOFactory(checkpointIOFactory)
.elementClass(StringElement.class).build();
}

View file

@ -17,8 +17,6 @@ import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.StringElement;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
public class Concurrent {
final static int ELEMENT_COUNT = 2000000;
@ -26,10 +24,8 @@ public class Concurrent {
static Settings settings;
public static Settings fileSettings(int capacity) {
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
return SettingsImpl.fileSettingsBuilder("/tmp/queue").capacity(capacity)
.elementIOFactory(pageIOFactory)
.checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build();
}