mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Refactor ByteBufferPageIO._persistedByteCount() #6594
Remove static method in ByteBufferPageIO, replacing with static test helper method to move the required functionality to a single place. Cleans up HeadPageTest, using try... with resources in place of explicit, and potentially missed, close() calls. Adds elastic/securemock mocking framework dependency Resolves #6594 Fixes #7268
This commit is contained in:
parent
e2ab1f57b4
commit
58c0a39ff9
7 changed files with 96 additions and 75 deletions
|
@ -120,6 +120,7 @@ dependencies {
|
|||
testCompile 'org.apache.logging.log4j:log4j-api:2.6.2:tests'
|
||||
testCompile 'junit:junit:4.12'
|
||||
testCompile 'net.javacrumbs.json-unit:json-unit:1.9.0'
|
||||
testCompile 'org.elasticsearch:securemock:1.2'
|
||||
provided 'org.jruby:jruby-core:1.7.25'
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,9 @@ public abstract class AbstractByteBufferPageIO implements PageIO {
|
|||
public static final int HEADER_SIZE = 1; // version byte
|
||||
public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element
|
||||
|
||||
// Size of: Header + Sequence Number + Length + Checksum
|
||||
public static final int WRAPPER_SIZE = HEADER_SIZE + SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;
|
||||
|
||||
public static final boolean VERIFY_CHECKSUM = true;
|
||||
public static final boolean STRICT_CAPACITY = true;
|
||||
|
||||
|
|
|
@ -47,9 +47,6 @@ public class ByteBufferPageIO extends AbstractByteBufferPageIO {
|
|||
|
||||
// below public methods only used by tests
|
||||
|
||||
// TODO: static method for tests - should refactor
|
||||
public static int _persistedByteCount(int byteCount) { return SEQNUM_SIZE + LENGTH_SIZE + byteCount + CHECKSUM_SIZE; }
|
||||
|
||||
public int getWritePosition() { return this.head; }
|
||||
|
||||
public byte[] dump() { return this.buffer.array(); }
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -9,96 +8,92 @@ import java.io.IOException;
|
|||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.singleElementCapacityForByteBufferPageIO;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class HeadPageTest {
|
||||
|
||||
@Test
|
||||
public void newHeadPage() throws IOException {
|
||||
Settings s = TestSettings.volatileQueueSettings(100);
|
||||
Queue q = new Queue(s);
|
||||
// Close method on HeadPage requires an instance of Queue that has already been opened.
|
||||
Queue q = mock(Queue.class);
|
||||
PageIO pageIO = s.getPageIOFactory().build(0, 100, "dummy");
|
||||
pageIO.create();
|
||||
HeadPage p = new HeadPage(0, q, pageIO);
|
||||
|
||||
assertThat(p.getPageNum(), is(equalTo(0)));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
assertThat(p.hasSpace(10), is(true));
|
||||
assertThat(p.hasSpace(100), is(false));
|
||||
|
||||
q.close();
|
||||
try(final HeadPage p = new HeadPage(0, q, pageIO)) {
|
||||
assertThat(p.getPageNum(), is(equalTo(0)));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
assertThat(p.hasSpace(10), is(true));
|
||||
assertThat(p.hasSpace(100), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pageWrite() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
|
||||
Settings s = TestSettings.volatileQueueSettings(singleElementCapacity);
|
||||
Queue q = new Queue(s);
|
||||
q.open();
|
||||
HeadPage p = q.headPage;
|
||||
Settings s = TestSettings.volatileQueueSettings(singleElementCapacityForByteBufferPageIO(element));
|
||||
try(Queue q = new Queue(s)) {
|
||||
q.open();
|
||||
HeadPage p = q.headPage;
|
||||
|
||||
assertThat(p.hasSpace(element.serialize().length), is(true));
|
||||
p.write(element.serialize(), 0, 1);
|
||||
assertThat(p.hasSpace(element.serialize().length), is(true));
|
||||
p.write(element.serialize(), 0, 1);
|
||||
|
||||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(false));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
|
||||
q.close();
|
||||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(false));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pageWriteAndReadSingle() throws IOException {
|
||||
long seqNum = 1L;
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
|
||||
Settings s = TestSettings.volatileQueueSettings(singleElementCapacity);
|
||||
Queue q = new Queue(s);
|
||||
q.open();
|
||||
HeadPage p = q.headPage;
|
||||
try(Queue q = new Queue(s)) {
|
||||
q.open();
|
||||
HeadPage p = q.headPage;
|
||||
|
||||
assertThat(p.hasSpace(element.serialize().length), is(true));
|
||||
p.write(element.serialize(), seqNum, 1);
|
||||
assertThat(p.hasSpace(element.serialize().length), is(true));
|
||||
p.write(element.serialize(), seqNum, 1);
|
||||
|
||||
Batch b = p.readBatch(1);
|
||||
Batch b = p.readBatch(1);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
|
||||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
|
||||
q.close();
|
||||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pageWriteAndReadMulti() throws IOException {
|
||||
long seqNum = 1L;
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
|
||||
Settings s = TestSettings.volatileQueueSettings(singleElementCapacity);
|
||||
Queue q = new Queue(s);
|
||||
q.open();
|
||||
HeadPage p = q.headPage;
|
||||
Settings s = TestSettings.volatileQueueSettings(singleElementCapacityForByteBufferPageIO(element));
|
||||
try(Queue q = new Queue(s)) {
|
||||
q.open();
|
||||
HeadPage p = q.headPage;
|
||||
|
||||
assertThat(p.hasSpace(element.serialize().length), is(true));
|
||||
p.write(element.serialize(), seqNum, 1);
|
||||
assertThat(p.hasSpace(element.serialize().length), is(true));
|
||||
p.write(element.serialize(), seqNum, 1);
|
||||
|
||||
Batch b = p.readBatch(10);
|
||||
Batch b = p.readBatch(10);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
|
||||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
|
||||
q.close();
|
||||
assertThat(p.hasSpace(element.serialize().length), is(false));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
// disabled test until we figure what to do in this condition
|
||||
|
@ -107,7 +102,7 @@ public class HeadPageTest {
|
|||
// URL url = FileCheckpointIOTest.class.getResource("checkpoint.head");
|
||||
// String dirPath = Paths.get(url.toURI()).getParent().toString();
|
||||
// Queueable element = new StringElement("foobarbaz");
|
||||
// int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
// int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
// Settings s = TestSettings.persistedQueueSettings(singleElementCapacity, dirPath);
|
||||
// TestQueue q = new TestQueue(s);
|
||||
// try {
|
||||
|
|
|
@ -19,7 +19,6 @@ import org.junit.Before;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
@ -27,6 +26,7 @@ import static org.hamcrest.CoreMatchers.notNullValue;
|
|||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.singleElementCapacityForByteBufferPageIO;
|
||||
|
||||
public class QueueTest {
|
||||
|
||||
|
@ -120,7 +120,7 @@ public class QueueTest {
|
|||
@Test
|
||||
public void writeMultiPage() throws IOException {
|
||||
List<Queueable> elements = Arrays.asList(new StringElement("foobarbaz1"), new StringElement("foobarbaz2"), new StringElement("foobarbaz3"), new StringElement("foobarbaz4"));
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements.get(0).serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(elements.get(0));
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(2 * singleElementCapacity))) {
|
||||
q.open();
|
||||
|
@ -164,7 +164,7 @@ public class QueueTest {
|
|||
@Test
|
||||
public void writeMultiPageWithInOrderAcking() throws IOException {
|
||||
List<Queueable> elements = Arrays.asList(new StringElement("foobarbaz1"), new StringElement("foobarbaz2"), new StringElement("foobarbaz3"), new StringElement("foobarbaz4"));
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements.get(0).serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(elements.get(0));
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(2 * singleElementCapacity))) {
|
||||
q.open();
|
||||
|
@ -206,7 +206,7 @@ public class QueueTest {
|
|||
public void writeMultiPageWithInOrderAckingCheckpoints() throws IOException {
|
||||
List<Queueable> elements1 = Arrays.asList(new StringElement("foobarbaz1"), new StringElement("foobarbaz2"));
|
||||
List<Queueable> elements2 = Arrays.asList(new StringElement("foobarbaz3"), new StringElement("foobarbaz4"));
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements1.get(0).serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(elements1.get(0));
|
||||
|
||||
Settings settings = SettingsImpl.builder(
|
||||
TestSettings.volatileQueueSettings(2 * singleElementCapacity)
|
||||
|
@ -304,7 +304,7 @@ public class QueueTest {
|
|||
for (int i = 0; i < page_count; i++) {
|
||||
elements.add(new StringElement(String.format("%0" + digits + "d", i)));
|
||||
}
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements.get(0).serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(elements.get(0));
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(singleElementCapacity))) {
|
||||
q.open();
|
||||
|
@ -336,7 +336,7 @@ public class QueueTest {
|
|||
@Test(timeout = 5000)
|
||||
public void reachMaxUnread() throws IOException, InterruptedException, ExecutionException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
|
||||
Settings settings = SettingsImpl.builder(
|
||||
TestSettings.volatileQueueSettings(singleElementCapacity)
|
||||
|
@ -429,7 +429,7 @@ public class QueueTest {
|
|||
public void reachMaxSizeTest() throws IOException, InterruptedException, ExecutionException {
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
@ -457,7 +457,7 @@ public class QueueTest {
|
|||
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
@ -493,7 +493,7 @@ public class QueueTest {
|
|||
public void resumeWriteOnNoLongerFullQueueTest() throws IOException, InterruptedException, ExecutionException {
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
@ -531,7 +531,7 @@ public class QueueTest {
|
|||
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
@ -664,7 +664,7 @@ public class QueueTest {
|
|||
@Test
|
||||
public void fullyAckedHeadPageBeheadingTest() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz1");
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(2 * singleElementCapacity))) {
|
||||
q.open();
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Class containing common methods to help DRY up acked queue tests.
|
||||
*/
|
||||
public class QueueTestHelpers {
|
||||
|
||||
/**
|
||||
* Returns the minimum capacity required for {@link ByteBufferPageIO}
|
||||
* @return int - minimum capacity required
|
||||
*/
|
||||
public static final int BYTE_BUF_PAGEIO_MIN_CAPACITY = ByteBufferPageIO.WRAPPER_SIZE;
|
||||
|
||||
/**
|
||||
* Returns the {@link ByteBufferPageIO} capacity required for the supplied element
|
||||
* @param element
|
||||
* @return int - capacity required for the supplied element
|
||||
* @throws IOException Throws if a serialization error occurs
|
||||
*/
|
||||
public static int singleElementCapacityForByteBufferPageIO(final Queueable element) throws IOException {
|
||||
return ByteBufferPageIO.WRAPPER_SIZE + element.serialize().length;
|
||||
}
|
||||
}
|
|
@ -8,8 +8,6 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
import org.logstash.ackedqueue.Queueable;
|
||||
import org.logstash.ackedqueue.SequencedList;
|
||||
import org.logstash.ackedqueue.StringElement;
|
||||
import org.logstash.ackedqueue.io.AbstractByteBufferPageIO;
|
||||
import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -21,6 +19,8 @@ import java.util.stream.Collectors;
|
|||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.BYTE_BUF_PAGEIO_MIN_CAPACITY;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.singleElementCapacityForByteBufferPageIO;
|
||||
|
||||
public class ByteBufferPageIOTest {
|
||||
|
||||
|
@ -40,7 +40,6 @@ public class ByteBufferPageIOTest {
|
|||
}
|
||||
|
||||
private static int CAPACITY = 1024;
|
||||
private static int MIN_CAPACITY = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(0);
|
||||
|
||||
private static ByteBufferPageIO newEmptyPageIO() throws IOException {
|
||||
return newEmptyPageIO(CAPACITY);
|
||||
|
@ -77,17 +76,16 @@ public class ByteBufferPageIOTest {
|
|||
|
||||
@Test
|
||||
public void hasSpace() throws IOException {
|
||||
assertThat(newEmptyPageIO(MIN_CAPACITY).hasSpace(0), is(true));
|
||||
assertThat(newEmptyPageIO(MIN_CAPACITY).hasSpace(1), is(false));
|
||||
assertThat(newEmptyPageIO(BYTE_BUF_PAGEIO_MIN_CAPACITY).hasSpace(0), is(true));
|
||||
assertThat(newEmptyPageIO(BYTE_BUF_PAGEIO_MIN_CAPACITY).hasSpace(1), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void hasSpaceAfterWrite() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
long seqNum = 1L;
|
||||
|
||||
ByteBufferPageIO io = newEmptyPageIO(singleElementCapacity);
|
||||
ByteBufferPageIO io = newEmptyPageIO(singleElementCapacityForByteBufferPageIO(element));
|
||||
|
||||
assertThat(io.hasSpace(element.serialize().length), is(true));
|
||||
io.write(element.serialize(), seqNum);
|
||||
|
@ -101,7 +99,7 @@ public class ByteBufferPageIOTest {
|
|||
long seqNum = 42L;
|
||||
ByteBufferPageIO io = newEmptyPageIO();
|
||||
io.write(element.serialize(), seqNum);
|
||||
assertThat(io.getWritePosition(), is(equalTo(ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length))));
|
||||
assertThat(io.getWritePosition(), is(equalTo(singleElementCapacityForByteBufferPageIO(element))));
|
||||
assertThat(io.getElementCount(), is(equalTo(1)));
|
||||
assertThat(io.getMinSeqNum(), is(equalTo(seqNum)));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue