mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
bd3adca7e2
commit
4091ddfdb4
9 changed files with 29 additions and 515 deletions
|
@ -15,10 +15,8 @@ import org.logstash.ackedqueue.Queue;
|
|||
import org.logstash.ackedqueue.Queueable;
|
||||
import org.logstash.ackedqueue.Settings;
|
||||
import org.logstash.ackedqueue.SettingsImpl;
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MmapPageIO;
|
||||
import org.logstash.ackedqueue.io.PageIOFactory;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
|
@ -44,7 +42,7 @@ import org.openjdk.jmh.infra.Blackhole;
|
|||
public class QueueRWBenchmark {
|
||||
|
||||
private static final int EVENTS_PER_INVOCATION = 500_000;
|
||||
|
||||
|
||||
private static final int BATCH_SIZE = 100;
|
||||
|
||||
private static final int ACK_INTERVAL = 1024;
|
||||
|
@ -54,16 +52,14 @@ public class QueueRWBenchmark {
|
|||
private ArrayBlockingQueue<Event> queueArrayBlocking;
|
||||
|
||||
private Queue queuePersisted;
|
||||
|
||||
private Queue queueMemory;
|
||||
|
||||
private String path;
|
||||
|
||||
|
||||
private ExecutorService exec;
|
||||
|
||||
@Setup
|
||||
public void setUp() throws IOException {
|
||||
final Settings settingsPersisted = settings(true);
|
||||
final Settings settingsPersisted = settings();
|
||||
EVENT.setField("Foo", "Bar");
|
||||
EVENT.setField("Foo1", "Bar1");
|
||||
EVENT.setField("Foo2", "Bar2");
|
||||
|
@ -72,16 +68,13 @@ public class QueueRWBenchmark {
|
|||
path = settingsPersisted.getDirPath();
|
||||
queuePersisted = new Queue(settingsPersisted);
|
||||
queueArrayBlocking = new ArrayBlockingQueue<>(ACK_INTERVAL);
|
||||
queueMemory = new Queue(settings(false));
|
||||
queuePersisted.open();
|
||||
queueMemory.open();
|
||||
exec = Executors.newSingleThreadExecutor();
|
||||
}
|
||||
|
||||
@TearDown
|
||||
public void tearDown() throws IOException {
|
||||
queuePersisted.close();
|
||||
queueMemory.close();
|
||||
queueArrayBlocking.clear();
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
exec.shutdownNow();
|
||||
|
@ -109,28 +102,6 @@ public class QueueRWBenchmark {
|
|||
future.get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
|
||||
public final void readFromMemoryQueue(final Blackhole blackhole) throws Exception {
|
||||
final Future<?> future = exec.submit(() -> {
|
||||
for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) {
|
||||
try {
|
||||
this.queueMemory.write(EVENT);
|
||||
} catch (final IOException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) {
|
||||
try (Batch batch = queueMemory.readBatch(BATCH_SIZE, TimeUnit.SECONDS.toMillis(1))) {
|
||||
for (final Queueable elem : batch.getElements()) {
|
||||
blackhole.consume(elem);
|
||||
}
|
||||
}
|
||||
}
|
||||
future.get();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
|
||||
public final void readFromArrayBlockingQueue(final Blackhole blackhole) throws Exception {
|
||||
|
@ -149,16 +120,11 @@ public class QueueRWBenchmark {
|
|||
future.get();
|
||||
}
|
||||
|
||||
private static Settings settings(final boolean persisted) {
|
||||
private static Settings settings() {
|
||||
final PageIOFactory pageIOFactory;
|
||||
final CheckpointIOFactory checkpointIOFactory;
|
||||
if (persisted) {
|
||||
pageIOFactory = MmapPageIO::new;
|
||||
checkpointIOFactory = FileCheckpointIO::new;
|
||||
} else {
|
||||
pageIOFactory = ByteBufferPageIO::new;
|
||||
checkpointIOFactory = MemoryCheckpointIO::new;
|
||||
}
|
||||
pageIOFactory = MmapPageIO::new;
|
||||
checkpointIOFactory = FileCheckpointIO::new;
|
||||
return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath())
|
||||
.capacity(256 * 1024 * 1024)
|
||||
.queueMaxBytes(Long.MAX_VALUE)
|
||||
|
|
|
@ -16,7 +16,6 @@ import org.logstash.RubyUtil;
|
|||
import org.logstash.ackedqueue.Batch;
|
||||
import org.logstash.ackedqueue.Queue;
|
||||
import org.logstash.ackedqueue.SettingsImpl;
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MmapPageIO;
|
||||
|
@ -162,7 +161,7 @@ public abstract class AbstractJRubyQueue extends RubyObject {
|
|||
.capacity(capacity)
|
||||
.maxUnread(maxUnread)
|
||||
.queueMaxBytes(queueMaxBytes)
|
||||
.elementIOFactory(ByteBufferPageIO::new)
|
||||
.elementIOFactory(MmapPageIO::new)
|
||||
.checkpointIOFactory(MemoryCheckpointIO::new)
|
||||
.elementClass(Event.class)
|
||||
.build()
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
package org.logstash.ackedqueue.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class ByteBufferPageIO extends AbstractByteBufferPageIO {
|
||||
|
||||
private final ByteBuffer buffer;
|
||||
|
||||
public ByteBufferPageIO(int pageNum, int capacity, String path) {
|
||||
this(capacity, new byte[0]);
|
||||
}
|
||||
|
||||
public ByteBufferPageIO(int capacity) {
|
||||
this(capacity, new byte[0]);
|
||||
}
|
||||
|
||||
public ByteBufferPageIO(int capacity, byte[] initialBytes) {
|
||||
super(0, capacity);
|
||||
|
||||
if (initialBytes.length > capacity) {
|
||||
throw new IllegalArgumentException("initial bytes greater than capacity");
|
||||
}
|
||||
|
||||
this.buffer = ByteBuffer.allocate(capacity);
|
||||
this.buffer.put(initialBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deactivate() { /* nothing */ }
|
||||
|
||||
@Override
|
||||
public void activate() { /* nyet */ }
|
||||
|
||||
@Override
|
||||
public void ensurePersisted() { /* nada */ }
|
||||
|
||||
@Override
|
||||
public void purge() { /* zilch */ }
|
||||
|
||||
@Override
|
||||
public void close() { /* don't look here */ }
|
||||
|
||||
|
||||
@Override
|
||||
protected ByteBuffer getBuffer() { return this.buffer; }
|
||||
|
||||
// below public methods only used by tests
|
||||
|
||||
public int getWritePosition() { return this.head; }
|
||||
|
||||
public byte[] dump() { return this.buffer.array(); }
|
||||
}
|
|
@ -2,8 +2,9 @@ package org.logstash.ackedqueue;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
@ -13,13 +14,17 @@ import static org.logstash.ackedqueue.QueueTestHelpers.singleElementCapacityForB
|
|||
|
||||
public class HeadPageTest {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void newHeadPage() throws IOException {
|
||||
Settings s = TestSettings.volatileQueueSettings(100);
|
||||
// Close method on Page requires an instance of Queue that has already been opened.
|
||||
try (Queue q = new Queue(s)) {
|
||||
q.open();
|
||||
PageIO pageIO = s.getPageIOFactory().build(0, 100, "dummy");
|
||||
PageIO pageIO = s.getPageIOFactory()
|
||||
.build(0, 100, temporaryFolder.newFolder().getAbsolutePath());
|
||||
pageIO.create();
|
||||
try (final Page p = PageFactory.newHeadPage(0, q, pageIO)) {
|
||||
assertThat(p.getPageNum(), is(equalTo(0)));
|
||||
|
|
|
@ -383,7 +383,7 @@ public class QueueTest {
|
|||
for (Batch b : batches) {
|
||||
b.close();
|
||||
}
|
||||
|
||||
|
||||
assertThat(q.tailPages.size(), is(0));
|
||||
}
|
||||
}
|
||||
|
@ -400,7 +400,7 @@ public class QueueTest {
|
|||
.build();
|
||||
try (Queue q = new Queue(settings)) {
|
||||
q.open();
|
||||
|
||||
|
||||
long seqNum = q.write(element);
|
||||
assertThat(seqNum, is(1L));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
@ -525,22 +525,22 @@ public class QueueTest {
|
|||
q.write(element);
|
||||
}
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Future<Long> future = executor.submit(() -> q.write(element));
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
assertThat(q.isFull(), is(true));
|
||||
|
||||
|
||||
Batch b = q.readBatch(10, TimeUnit.SECONDS.toMillis(1)); // read 1 page (10 events)
|
||||
b.close(); // purge 1 page
|
||||
|
||||
|
||||
while (q.isFull()) { Thread.sleep(10); }
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
|
||||
assertThat(future.get(), is(elementCount + 1));
|
||||
}
|
||||
}
|
||||
|
@ -557,7 +557,7 @@ public class QueueTest {
|
|||
q.open();
|
||||
// should be able to write 90 + 9 events (9 pages + 1 head-page) before getting full
|
||||
int elementCount = 99;
|
||||
for (int i = 0; i < elementCount; i++) {
|
||||
for (int i = 0; i < elementCount; i++) {
|
||||
q.write(element);
|
||||
}
|
||||
|
||||
|
@ -619,7 +619,7 @@ public class QueueTest {
|
|||
public void queueStableUnderStressHugeCapacity() throws Exception {
|
||||
stableUnderStress(100_000);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void queueStableUnderStressLowCapacity() throws Exception {
|
||||
stableUnderStress(50);
|
||||
|
@ -682,7 +682,7 @@ public class QueueTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
@Test(timeout = 50_000)
|
||||
public void concurrentWritesTest() throws IOException, InterruptedException, ExecutionException {
|
||||
|
||||
final int WRITER_COUNT = 5;
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.logstash.ackedqueue.io.AbstractByteBufferPageIO;
|
||||
|
||||
/**
|
||||
* Class containing common methods to help DRY up acked queue tests.
|
||||
|
@ -10,18 +9,12 @@ import java.io.IOException;
|
|||
public class QueueTestHelpers {
|
||||
|
||||
/**
|
||||
* Returns the minimum capacity required for {@link ByteBufferPageIO}
|
||||
* @return int - minimum capacity required
|
||||
*/
|
||||
public static final int BYTE_BUF_PAGEIO_MIN_CAPACITY = ByteBufferPageIO.WRAPPER_SIZE;
|
||||
|
||||
/**
|
||||
* Returns the {@link ByteBufferPageIO} capacity required for the supplied element
|
||||
* Returns the {@link org.logstash.ackedqueue.io.MmapPageIO} capacity required for the supplied element
|
||||
* @param element
|
||||
* @return int - capacity required for the supplied element
|
||||
* @throws IOException Throws if a serialization error occurs
|
||||
*/
|
||||
public static int singleElementCapacityForByteBufferPageIO(final Queueable element) throws IOException {
|
||||
return ByteBufferPageIO.WRAPPER_SIZE + element.serialize().length;
|
||||
return AbstractByteBufferPageIO.WRAPPER_SIZE + element.serialize().length;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
|
||||
|
@ -11,7 +10,7 @@ public class TestSettings {
|
|||
|
||||
public static Settings volatileQueueSettings(int capacity) {
|
||||
MemoryCheckpointIO.clearSources();
|
||||
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
|
||||
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
|
||||
return SettingsImpl.memorySettingsBuilder().capacity(capacity).elementIOFactory(pageIOFactory)
|
||||
.checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build();
|
||||
|
@ -19,7 +18,7 @@ public class TestSettings {
|
|||
|
||||
public static Settings volatileQueueSettings(int capacity, long size) {
|
||||
MemoryCheckpointIO.clearSources();
|
||||
PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new ByteBufferPageIO(pageNum, pageSize, path);
|
||||
PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new MmapPageIO(pageNum, pageSize, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
|
||||
return SettingsImpl.memorySettingsBuilder().capacity(capacity).queueMaxBytes(size)
|
||||
.elementIOFactory(pageIOFactory).checkpointIOFactory(checkpointIOFactory)
|
||||
|
|
|
@ -1,381 +0,0 @@
|
|||
package org.logstash.ackedqueue.io;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.logstash.ackedqueue.Queueable;
|
||||
import org.logstash.ackedqueue.SequencedList;
|
||||
import org.logstash.ackedqueue.StringElement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.BYTE_BUF_PAGEIO_MIN_CAPACITY;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.singleElementCapacityForByteBufferPageIO;
|
||||
|
||||
public class ByteBufferPageIOTest {
|
||||
|
||||
// convert any checked exceptions into uncheck RuntimeException
|
||||
public static <V> V uncheck(Callable<V> callable) {
|
||||
try {
|
||||
return callable.call();
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private interface BufferGenerator {
|
||||
byte[] generate() throws IOException;
|
||||
}
|
||||
|
||||
private static int CAPACITY = 1024;
|
||||
|
||||
private static ByteBufferPageIO newEmptyPageIO() throws IOException {
|
||||
return newEmptyPageIO(CAPACITY);
|
||||
}
|
||||
|
||||
private static ByteBufferPageIO newEmptyPageIO(int capacity) throws IOException {
|
||||
ByteBufferPageIO io = new ByteBufferPageIO(capacity);
|
||||
io.create();
|
||||
return io;
|
||||
}
|
||||
|
||||
private static ByteBufferPageIO newPageIO(int capacity, byte[] bytes) {
|
||||
return new ByteBufferPageIO(capacity, bytes);
|
||||
}
|
||||
|
||||
private Queueable buildStringElement(String str) {
|
||||
return new StringElement(str);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getWritePosition() throws IOException {
|
||||
assertThat(newEmptyPageIO().getWritePosition(), is(equalTo(1)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getElementCount() throws IOException {
|
||||
assertThat(newEmptyPageIO().getElementCount(), is(equalTo(0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getStartSeqNum() throws IOException {
|
||||
assertThat(newEmptyPageIO().getMinSeqNum(), is(equalTo(0L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void hasSpace() throws IOException {
|
||||
assertThat(newEmptyPageIO(BYTE_BUF_PAGEIO_MIN_CAPACITY).hasSpace(0), is(true));
|
||||
assertThat(newEmptyPageIO(BYTE_BUF_PAGEIO_MIN_CAPACITY).hasSpace(1), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void hasSpaceAfterWrite() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
long seqNum = 1L;
|
||||
|
||||
ByteBufferPageIO io = newEmptyPageIO(singleElementCapacityForByteBufferPageIO(element));
|
||||
|
||||
assertThat(io.hasSpace(element.serialize().length), is(true));
|
||||
io.write(element.serialize(), seqNum);
|
||||
assertThat(io.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(io.hasSpace(1), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void write() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
long seqNum = 42L;
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element.serialize(), seqNum);
|
||||
assertThat(io.getWritePosition(), is(equalTo(singleElementCapacityForByteBufferPageIO(element))));
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
assertThat(io.getMinSeqNum(), is(equalTo(seqNum)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void openValidState() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
long seqNum = 42L;
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element.serialize(), seqNum);
|
||||
|
||||
byte[] initialState = io.dump();
|
||||
io = newPageIO(initialState.length, initialState);
|
||||
io.open(seqNum, 1);
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
assertThat(io.getMinSeqNum(), is(equalTo(seqNum)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recoversValidState() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
long seqNum = 42L;
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element.serialize(), seqNum);
|
||||
|
||||
byte[] initialState = io.dump();
|
||||
io = newPageIO(initialState.length, initialState);
|
||||
io.recover();
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
assertThat(io.getMinSeqNum(), is(equalTo(seqNum)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recoverEmptyWriteRecover() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
long seqNum = 42L;
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] initialState = io.dump();
|
||||
|
||||
io = newPageIO(initialState.length, initialState);
|
||||
io.recover();
|
||||
assertThat(io.getElementCount(), is(equalTo(0)));
|
||||
|
||||
io.write(element.serialize(), seqNum);
|
||||
initialState = io.dump();
|
||||
|
||||
io = newPageIO(initialState.length, initialState);
|
||||
io.recover();
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
assertThat(io.getMinSeqNum(), is(equalTo(seqNum)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recoverNonEmptyWriteRecover() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element.serialize(), 1L);
|
||||
byte[] initialState = io.dump();
|
||||
|
||||
io = newPageIO(initialState.length, initialState);
|
||||
io.recover();
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
|
||||
io.write(element.serialize(), 2L);
|
||||
initialState = io.dump();
|
||||
|
||||
io = newPageIO(initialState.length, initialState);
|
||||
io.recover();
|
||||
assertThat(io.getElementCount(), is(equalTo(2)));
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void openUnexpectedSeqNum() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
long seqNum = 42L;
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element.serialize(), seqNum);
|
||||
|
||||
byte[] initialState = io.dump();
|
||||
newPageIO(initialState.length, initialState);
|
||||
io.open(1L, 1);
|
||||
}
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public static class SingleInvalidElementTest {
|
||||
|
||||
private static final List<BufferGenerator> singleGenerators = Arrays.asList(
|
||||
// invalid length
|
||||
() -> {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
io.write(bytes, 1L, 514, io.checksum(bytes));
|
||||
return io.dump();
|
||||
},
|
||||
|
||||
// invalid checksum
|
||||
() -> {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
io.write(bytes, 1L, bytes.length, 77);
|
||||
return io.dump();
|
||||
},
|
||||
|
||||
// invalid payload
|
||||
() -> {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
int checksum = io.checksum(bytes);
|
||||
bytes[1] = 0x01;
|
||||
io.write(bytes, 1L, bytes.length, checksum);
|
||||
return io.dump();
|
||||
}
|
||||
);
|
||||
|
||||
@Parameters
|
||||
public static Collection<byte[]> singleElementParameters() {
|
||||
return singleGenerators.stream().map(g -> uncheck(g::generate)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Parameter
|
||||
public byte[] singleElementParameter;
|
||||
|
||||
@Test
|
||||
public void openInvalidSingleElement() throws IOException {
|
||||
// none of these should generate an exception with open()
|
||||
|
||||
ByteBufferPageIO io = newPageIO(singleElementParameter.length, singleElementParameter);
|
||||
io.open(1L, 1);
|
||||
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
assertThat(io.getMinSeqNum(), is(equalTo(1L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recoverInvalidSingleElement() throws IOException {
|
||||
for (BufferGenerator generator : singleGenerators) {
|
||||
byte[] bytes = generator.generate();
|
||||
ByteBufferPageIO io = newPageIO(bytes.length, bytes);
|
||||
io.recover();
|
||||
|
||||
assertThat(io.getElementCount(), is(equalTo(0)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public static class DoubleInvalidElementTest {
|
||||
|
||||
private static final List<BufferGenerator> doubleGenerators = Arrays.asList(
|
||||
// invalid length
|
||||
() -> {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
io.write(bytes.clone(), 1L, bytes.length, io.checksum(bytes));
|
||||
io.write(bytes, 2L, 514, io.checksum(bytes));
|
||||
return io.dump();
|
||||
},
|
||||
|
||||
// invalid checksum
|
||||
() -> {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
io.write(bytes.clone(), 1L, bytes.length, io.checksum(bytes));
|
||||
io.write(bytes, 2L, bytes.length, 77);
|
||||
return io.dump();
|
||||
},
|
||||
|
||||
// invalid payload
|
||||
() -> {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
int checksum = io.checksum(bytes);
|
||||
io.write(bytes.clone(), 1L, bytes.length, io.checksum(bytes));
|
||||
bytes[1] = 0x01;
|
||||
io.write(bytes, 2L, bytes.length, checksum);
|
||||
return io.dump();
|
||||
}
|
||||
);
|
||||
|
||||
@Parameters
|
||||
public static Collection<byte[]> doubleElementParameters() {
|
||||
return doubleGenerators.stream().map(g -> uncheck(g::generate)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Parameter
|
||||
public byte[] doubleElementParameter;
|
||||
|
||||
@Test
|
||||
public void openInvalidDoubleElement() throws IOException {
|
||||
// none of these will generate an exception with open()
|
||||
|
||||
ByteBufferPageIO io = newPageIO(doubleElementParameter.length, doubleElementParameter);
|
||||
io.open(1L, 2);
|
||||
|
||||
assertThat(io.getElementCount(), is(equalTo(2)));
|
||||
assertThat(io.getMinSeqNum(), is(equalTo(1L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recoverInvalidDoubleElement() throws IOException {
|
||||
ByteBufferPageIO io = newPageIO(doubleElementParameter.length, doubleElementParameter);
|
||||
io.recover();
|
||||
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = AbstractByteBufferPageIO.PageIOInvalidElementException.class)
|
||||
public void openInvalidDeqNumDoubleElement() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
io.write(bytes.clone(), 1L, bytes.length, io.checksum(bytes));
|
||||
io.write(bytes, 3L, bytes.length, io.checksum(bytes));
|
||||
|
||||
io = newPageIO(io.dump().length, io.dump());
|
||||
io.open(1L, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recoverInvalidDeqNumDoubleElement() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
byte[] bytes = element.serialize();
|
||||
io.write(bytes.clone(), 1L, bytes.length, io.checksum(bytes));
|
||||
io.write(bytes, 3L, bytes.length, io.checksum(bytes));
|
||||
|
||||
io = newPageIO(io.dump().length, io.dump());
|
||||
io.recover();
|
||||
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void writeRead() throws IOException {
|
||||
long seqNum = 42L;
|
||||
Queueable element = buildStringElement("foobarbaz");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element.serialize(), seqNum);
|
||||
SequencedList<byte[]> result = io.read(seqNum, 1);
|
||||
assertThat(result.getElements().size(), is(equalTo(1)));
|
||||
Queueable readElement = StringElement.deserialize(result.getElements().get(0));
|
||||
assertThat(result.getSeqNums().get(0), is(equalTo(seqNum)));
|
||||
assertThat(readElement.toString(), is(equalTo(element.toString())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void writeReadMulti() throws IOException {
|
||||
Queueable element1 = buildStringElement("foo");
|
||||
Queueable element2 = buildStringElement("bar");
|
||||
Queueable element3 = buildStringElement("baz");
|
||||
Queueable element4 = buildStringElement("quux");
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element1.serialize(), 40L);
|
||||
io.write(element2.serialize(), 41L);
|
||||
io.write(element3.serialize(), 42L);
|
||||
io.write(element4.serialize(), 43L);
|
||||
int batchSize = 11;
|
||||
SequencedList<byte[]> result = io.read(40L, batchSize);
|
||||
assertThat(result.getElements().size(), is(equalTo(4)));
|
||||
|
||||
assertThat(result.getSeqNums().get(0), is(equalTo(40L)));
|
||||
assertThat(result.getSeqNums().get(1), is(equalTo(41L)));
|
||||
assertThat(result.getSeqNums().get(2), is(equalTo(42L)));
|
||||
assertThat(result.getSeqNums().get(3), is(equalTo(43L)));
|
||||
|
||||
assertThat(StringElement.deserialize(result.getElements().get(0)).toString(), is(equalTo(element1.toString())));
|
||||
assertThat(StringElement.deserialize(result.getElements().get(1)).toString(), is(equalTo(element2.toString())));
|
||||
assertThat(StringElement.deserialize(result.getElements().get(2)).toString(), is(equalTo(element3.toString())));
|
||||
assertThat(StringElement.deserialize(result.getElements().get(3)).toString(), is(equalTo(element4.toString())));
|
||||
}
|
||||
}
|
|
@ -15,10 +15,8 @@ import org.logstash.ackedqueue.SettingsImpl;
|
|||
import org.logstash.ackedqueue.Queue;
|
||||
import org.logstash.ackedqueue.Settings;
|
||||
import org.logstash.ackedqueue.StringElement;
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
import org.logstash.ackedqueue.io.CheckpointIOFactory;
|
||||
import org.logstash.ackedqueue.io.FileCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
|
||||
import org.logstash.ackedqueue.io.MmapPageIO;
|
||||
import org.logstash.ackedqueue.io.PageIOFactory;
|
||||
|
||||
|
@ -27,13 +25,6 @@ public class Concurrent {
|
|||
final static int BATCH_SIZE = 1000;
|
||||
static Settings settings;
|
||||
|
||||
public static Settings memorySettings(int capacity) {
|
||||
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
|
||||
return SettingsImpl.memorySettingsBuilder().capacity(capacity).elementIOFactory(pageIOFactory)
|
||||
.checkpointIOFactory(checkpointIOFactory).elementClass(StringElement.class).build();
|
||||
}
|
||||
|
||||
public static Settings fileSettings(int capacity) {
|
||||
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
|
||||
|
@ -171,12 +162,6 @@ public class Concurrent {
|
|||
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException {
|
||||
System.out.println(">>> starting in-memory stress test");
|
||||
|
||||
settings = memorySettings(1024 * 1024); // 1MB
|
||||
oneProducersOneConsumer();
|
||||
oneProducersOneMultipleConsumer();
|
||||
|
||||
System.out.println("\n>>> starting file-based stress test in /tmp/queue");
|
||||
|
||||
settings = fileSettings(1024 * 1024); // 1MB
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue