mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
64397c66e3
commit
b7f4fd7ebf
1 changed files with 450 additions and 459 deletions
|
@ -38,17 +38,16 @@ public class QueueTest {
|
|||
|
||||
@Test
|
||||
public void newQueue() throws IOException {
|
||||
Queue q = new TestQueue(TestSettings.volatileQueueSettings(10));
|
||||
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(10))) {
|
||||
q.open();
|
||||
|
||||
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void singleWriteRead() throws IOException {
|
||||
Queue q = new TestQueue(TestSettings.volatileQueueSettings(100));
|
||||
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {
|
||||
q.open();
|
||||
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
|
@ -59,13 +58,12 @@ public class QueueTest {
|
|||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void singleWriteMultiRead() throws IOException {
|
||||
Queue q = new TestQueue(TestSettings.volatileQueueSettings(100));
|
||||
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {
|
||||
q.open();
|
||||
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
|
@ -76,17 +74,17 @@ public class QueueTest {
|
|||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(q.nonBlockReadBatch(2), is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multiWriteSamePage() throws IOException {
|
||||
Queue q = new TestQueue(TestSettings.volatileQueueSettings(100));
|
||||
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {
|
||||
q.open();
|
||||
|
||||
List<Queueable> elements = Arrays.asList(new StringElement("foobarbaz1"), new StringElement("foobarbaz2"), new StringElement("foobarbaz3"));
|
||||
|
||||
List<Queueable> elements = Arrays
|
||||
.asList(new StringElement("foobarbaz1"), new StringElement("foobarbaz2"),
|
||||
new StringElement("foobarbaz3")
|
||||
);
|
||||
for (Queueable e : elements) {
|
||||
q.write(e);
|
||||
}
|
||||
|
@ -101,16 +99,15 @@ public class QueueTest {
|
|||
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(2).toString())));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void writeMultiPage() throws IOException {
|
||||
List<Queueable> elements = Arrays.asList(new StringElement("foobarbaz1"), new StringElement("foobarbaz2"), new StringElement("foobarbaz3"), new StringElement("foobarbaz4"));
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements.get(0).serialize().length);
|
||||
|
||||
TestQueue q = new TestQueue(TestSettings.volatileQueueSettings(2 * singleElementCapacity));
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(2 * singleElementCapacity))) {
|
||||
q.open();
|
||||
|
||||
for (Queueable e : elements) {
|
||||
|
@ -145,8 +142,7 @@ public class QueueTest {
|
|||
|
||||
b = q.nonBlockReadBatch(10);
|
||||
assertThat(b, is(equalTo(null)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -154,8 +150,8 @@ public class QueueTest {
|
|||
public void writeMultiPageWithInOrderAcking() throws IOException {
|
||||
List<Queueable> elements = Arrays.asList(new StringElement("foobarbaz1"), new StringElement("foobarbaz2"), new StringElement("foobarbaz3"), new StringElement("foobarbaz4"));
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements.get(0).serialize().length);
|
||||
|
||||
TestQueue q = new TestQueue(TestSettings.volatileQueueSettings(2 * singleElementCapacity));
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(2 * singleElementCapacity))) {
|
||||
q.open();
|
||||
|
||||
for (Queueable e : elements) {
|
||||
|
@ -188,8 +184,7 @@ public class QueueTest {
|
|||
b.close();
|
||||
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(true)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -202,7 +197,7 @@ public class QueueTest {
|
|||
TestSettings.volatileQueueSettings(2 * singleElementCapacity)
|
||||
).checkpointMaxWrites(1024) // arbitrary high enough threshold so that it's not reached (default for TestSettings is 1)
|
||||
.build();
|
||||
TestQueue q = new TestQueue(settings);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
assertThat(q.getHeadPage().getPageNum(), is(equalTo(0)));
|
||||
|
@ -224,7 +219,7 @@ public class QueueTest {
|
|||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(0L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(0)));
|
||||
|
||||
// assertThat(elements1.get(1).getSeqNum(), is(equalTo(2L)));
|
||||
// assertThat(elements1.get(1).getSeqNum(), is(equalTo(2L)));
|
||||
q.ensurePersistedUpto(2);
|
||||
|
||||
c = q.getCheckpointIO().read("checkpoint.head");
|
||||
|
@ -277,8 +272,7 @@ public class QueueTest {
|
|||
assertThat(c.getMinSeqNum(), is(equalTo(3L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(5L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(1)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -296,8 +290,8 @@ public class QueueTest {
|
|||
elements.add(new StringElement(String.format("%0" + digits + "d", i)));
|
||||
}
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements.get(0).serialize().length);
|
||||
|
||||
TestQueue q = new TestQueue(TestSettings.volatileQueueSettings(singleElementCapacity));
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(singleElementCapacity))) {
|
||||
q.open();
|
||||
|
||||
for (Queueable e : elements) {
|
||||
|
@ -320,8 +314,7 @@ public class QueueTest {
|
|||
}
|
||||
|
||||
assertThat(q.getTailPages().size(), is(equalTo(0)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -334,10 +327,9 @@ public class QueueTest {
|
|||
TestSettings.volatileQueueSettings(singleElementCapacity)
|
||||
).maxUnread(2) // 2 so we know the first write should not block and the second should
|
||||
.build();
|
||||
TestQueue q = new TestQueue(settings);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
|
||||
long seqNum = q.write(element);
|
||||
assertThat(seqNum, is(equalTo(1L)));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
@ -373,8 +365,7 @@ public class QueueTest {
|
|||
|
||||
// since we did not ack and pages hold a single item
|
||||
assertThat(q.getTailPages().size(), is(equalTo(ELEMENT_COUNT)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -386,7 +377,7 @@ public class QueueTest {
|
|||
TestSettings.volatileQueueSettings(256) // 256 is arbitrary, large enough to hold a few elements
|
||||
).maxUnread(2)
|
||||
.build(); // 2 so we know the first write should not block and the second should
|
||||
TestQueue q = new TestQueue(settings);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
// perform first non-blocking write
|
||||
|
@ -407,8 +398,9 @@ public class QueueTest {
|
|||
Future<Long> future = executor.submit(write);
|
||||
|
||||
// spin wait until data is written and write blocks
|
||||
while (!q.isFull()) { Thread.sleep(1); }
|
||||
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
// read one element, which will unblock the last write
|
||||
Batch b = q.nonBlockReadBatch(1);
|
||||
assertThat(b, is(notNullValue()));
|
||||
|
@ -429,8 +421,7 @@ public class QueueTest {
|
|||
assertThat(q.getHeadPage().getElementCount() > 0L, is(true));
|
||||
assertThat(q.getHeadPage().unreadCount(), is(equalTo(1L)));
|
||||
assertThat(q.unreadCount, is(equalTo(1L)));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
@ -441,8 +432,7 @@ public class QueueTest {
|
|||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
||||
TestQueue q = new TestQueue(settings);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
|
||||
|
@ -459,13 +449,13 @@ public class QueueTest {
|
|||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
|
||||
while (!q.isFull()) { Thread.sleep(10); }
|
||||
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
assertThat(q.isFull(), is(true));
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
@ -518,11 +508,10 @@ public class QueueTest {
|
|||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
||||
TestQueue q = new TestQueue(settings);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
int ELEMENT_COUNT = 90; // should be able to write 90 events (9 pages) before getting full
|
||||
int ELEMENT_COUNT =
|
||||
90; // should be able to write 90 events (9 pages) before getting full
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
long seqNum = q.write(element);
|
||||
}
|
||||
|
@ -540,8 +529,9 @@ public class QueueTest {
|
|||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
while (!q.isFull()) { Thread.sleep(10); }
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
assertThat(q.isFull(), is(true));
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
|
@ -550,7 +540,7 @@ public class QueueTest {
|
|||
assertThat(future.get(), is(equalTo(ELEMENT_COUNT + 1L)));
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
@ -562,8 +552,7 @@ public class QueueTest {
|
|||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
||||
TestQueue q = new TestQueue(settings);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
|
||||
|
@ -580,9 +569,9 @@ public class QueueTest {
|
|||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
|
||||
while (!q.isFull()) { Thread.sleep(10); }
|
||||
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
assertThat(q.isFull(), is(true));
|
||||
|
||||
Batch b = q.readBatch(9); // read 90% of page (9 events)
|
||||
|
@ -591,30 +580,35 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(true)); // queue should still be full
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAckedCount() throws IOException {
|
||||
Settings settings = TestSettings.persistedQueueSettings(100, dataPath);
|
||||
Queue q = new Queue(settings);
|
||||
Batch b;
|
||||
Queueable element1;
|
||||
Queueable element2;
|
||||
Queueable element3;
|
||||
long firstSeqNum;
|
||||
try(Queue q = new Queue(settings)) {
|
||||
q.open();
|
||||
|
||||
Queueable element1 = new StringElement("foobarbaz");
|
||||
Queueable element2 = new StringElement("wowza");
|
||||
Queueable element3 = new StringElement("third");
|
||||
long firstSeqNum = q.write(element1);
|
||||
|
||||
Batch b = q.nonBlockReadBatch(1);
|
||||
element1 = new StringElement("foobarbaz");
|
||||
element2 = new StringElement("wowza");
|
||||
element3 = new StringElement("third");
|
||||
firstSeqNum = q.write(element1);
|
||||
b = q.nonBlockReadBatch(1);
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
}
|
||||
|
||||
q.close();
|
||||
|
||||
q = new Queue(settings);
|
||||
long secondSeqNum;
|
||||
long thirdSeqNum;
|
||||
try(Queue q = new Queue(settings)){
|
||||
q.open();
|
||||
|
||||
long secondSeqNum = q.write(element2);
|
||||
long thirdSeqNum = q.write(element3);
|
||||
secondSeqNum = q.write(element2);
|
||||
thirdSeqNum = q.write(element3);
|
||||
|
||||
b = q.nonBlockReadBatch(1);
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
|
@ -626,9 +620,9 @@ public class QueueTest {
|
|||
assertThat(b.getElements().get(1), is(equalTo(element3)));
|
||||
|
||||
q.ack(Collections.singletonList(firstSeqNum));
|
||||
q.close();
|
||||
}
|
||||
|
||||
q = new Queue(settings);
|
||||
try(Queue q = new Queue(settings)) {
|
||||
q.open();
|
||||
|
||||
b = q.nonBlockReadBatch(2);
|
||||
|
@ -638,8 +632,7 @@ public class QueueTest {
|
|||
|
||||
assertThat(q.getAckedCount(), equalTo(0L));
|
||||
assertThat(q.getUnackedCount(), equalTo(0L));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
@ -647,8 +640,7 @@ public class QueueTest {
|
|||
|
||||
// very small pages to maximize page creation
|
||||
Settings settings = TestSettings.volatileQueueSettings(100);
|
||||
|
||||
TestQueue q = new TestQueue(settings);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
int ELEMENT_COUNT = 10000;
|
||||
|
@ -688,15 +680,15 @@ public class QueueTest {
|
|||
assertThat(q.isFullyAcked(), is(true));
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fullyAckedHeadPageBeheadingTest() throws IOException {
|
||||
Queueable element = new StringElement("foobarbaz1");
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
|
||||
TestQueue q = new TestQueue(TestSettings.volatileQueueSettings(2 * singleElementCapacity));
|
||||
try (TestQueue q = new TestQueue(
|
||||
TestSettings.volatileQueueSettings(2 * singleElementCapacity))) {
|
||||
q.open();
|
||||
|
||||
Batch b;
|
||||
|
@ -724,8 +716,7 @@ public class QueueTest {
|
|||
assertThat(q.getHeadPage().getPageNum(), is(equalTo(1)));
|
||||
assertThat(q.firstUnackedPageNum(), is(equalTo(1)));
|
||||
assertThat(q.isFullyAcked(), is(false));
|
||||
|
||||
q.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue