diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index 3bb3aa980..17f69a985 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -23,6 +23,7 @@ import org.logstash.ackedqueue.io.ByteBufferPageIO; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; @@ -41,7 +42,7 @@ public class QueueTest { try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(10))) { q.open(); - assertThat(q.nonBlockReadBatch(1), is(equalTo(null))); + assertThat(q.nonBlockReadBatch(1), nullValue()); } } @@ -55,9 +56,9 @@ public class QueueTest { Batch b = q.nonBlockReadBatch(1); - assertThat(b.getElements().size(), is(equalTo(1))); - assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString()))); - assertThat(q.nonBlockReadBatch(1), is(equalTo(null))); + assertThat(b.getElements().size(), is(1)); + assertThat(b.getElements().get(0).toString(), is(element.toString())); + assertThat(q.nonBlockReadBatch(1), nullValue()); } } @@ -71,9 +72,9 @@ public class QueueTest { Batch b = q.nonBlockReadBatch(2); - assertThat(b.getElements().size(), is(equalTo(1))); - assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString()))); - assertThat(q.nonBlockReadBatch(2), is(equalTo(null))); + assertThat(b.getElements().size(), is(1)); + assertThat(b.getElements().get(0).toString(), is(element.toString())); + assertThat(q.nonBlockReadBatch(2), nullValue()); } } @@ -91,14 +92,14 @@ public class QueueTest { Batch b = q.nonBlockReadBatch(2); - assertThat(b.getElements().size(), is(equalTo(2))); - assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(0).toString()))); - assertThat(b.getElements().get(1).toString(), is(equalTo(elements.get(1).toString()))); + assertThat(b.getElements().size(), is(2)); + assertThat(b.getElements().get(0).toString(), is(elements.get(0).toString())); + assertThat(b.getElements().get(1).toString(), is(elements.get(1).toString())); b = q.nonBlockReadBatch(2); - assertThat(b.getElements().size(), is(equalTo(1))); - assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(2).toString()))); + assertThat(b.getElements().size(), is(1)); + assertThat(b.getElements().get(0).toString(), is(elements.get(2).toString())); } } @@ -115,33 +116,33 @@ public class QueueTest { } // total of 2 pages: 1 head and 1 tail - assertThat(q.getTailPages().size(), is(equalTo(1))); + assertThat(q.getTailPages().size(), is(1)); - assertThat(q.getTailPages().get(0).isFullyRead(), is(equalTo(false))); - assertThat(q.getTailPages().get(0).isFullyAcked(), is(equalTo(false))); - assertThat(q.getHeadPage().isFullyRead(), is(equalTo(false))); - assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false))); + assertThat(q.getTailPages().get(0).isFullyRead(), is(false)); + assertThat(q.getTailPages().get(0).isFullyAcked(), is(false)); + assertThat(q.getHeadPage().isFullyRead(), is(false)); + assertThat(q.getHeadPage().isFullyAcked(), is(false)); Batch b = q.nonBlockReadBatch(10); - assertThat(b.getElements().size(), is(equalTo(2))); + assertThat(b.getElements().size(), is(2)); - assertThat(q.getTailPages().size(), is(equalTo(1))); + assertThat(q.getTailPages().size(), is(1)); - assertThat(q.getTailPages().get(0).isFullyRead(), is(equalTo(true))); - assertThat(q.getTailPages().get(0).isFullyAcked(), is(equalTo(false))); - assertThat(q.getHeadPage().isFullyRead(), is(equalTo(false))); - assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false))); + assertThat(q.getTailPages().get(0).isFullyRead(), is(true)); + assertThat(q.getTailPages().get(0).isFullyAcked(), is(false)); + assertThat(q.getHeadPage().isFullyRead(), is(false)); + assertThat(q.getHeadPage().isFullyAcked(), is(false)); b = q.nonBlockReadBatch(10); - assertThat(b.getElements().size(), is(equalTo(2))); + assertThat(b.getElements().size(), is(2)); - assertThat(q.getTailPages().get(0).isFullyRead(), is(equalTo(true))); - assertThat(q.getTailPages().get(0).isFullyAcked(), is(equalTo(false))); - assertThat(q.getHeadPage().isFullyRead(), is(equalTo(true))); - assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false))); + assertThat(q.getTailPages().get(0).isFullyRead(), is(true)); + assertThat(q.getTailPages().get(0).isFullyAcked(), is(false)); + assertThat(q.getHeadPage().isFullyRead(), is(true)); + assertThat(q.getHeadPage().isFullyAcked(), is(false)); b = q.nonBlockReadBatch(10); - assertThat(b, is(equalTo(null))); + assertThat(b, nullValue()); } } @@ -160,30 +161,30 @@ public class QueueTest { Batch b = q.nonBlockReadBatch(10); - assertThat(b.getElements().size(), is(equalTo(2))); - assertThat(q.getTailPages().size(), is(equalTo(1))); + assertThat(b.getElements().size(), is(2)); + assertThat(q.getTailPages().size(), is(1)); // lets keep a ref to that tail page before acking TailPage tailPage = q.getTailPages().get(0); - assertThat(tailPage.isFullyRead(), is(equalTo(true))); + assertThat(tailPage.isFullyRead(), is(true)); // ack first batch which includes all elements from tailPages b.close(); - assertThat(q.getTailPages().size(), is(equalTo(0))); - assertThat(tailPage.isFullyRead(), is(equalTo(true))); - assertThat(tailPage.isFullyAcked(), is(equalTo(true))); + assertThat(q.getTailPages().size(), is(0)); + assertThat(tailPage.isFullyRead(), is(true)); + assertThat(tailPage.isFullyAcked(), is(true)); b = q.nonBlockReadBatch(10); - assertThat(b.getElements().size(), is(equalTo(2))); - assertThat(q.getHeadPage().isFullyRead(), is(equalTo(true))); - assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false))); + assertThat(b.getElements().size(), is(2)); + assertThat(q.getHeadPage().isFullyRead(), is(true)); + assertThat(q.getHeadPage().isFullyAcked(), is(false)); b.close(); - assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(true))); + assertThat(q.getHeadPage().isFullyAcked(), is(true)); } } @@ -200,51 +201,51 @@ public class QueueTest { try (TestQueue q = new TestQueue(settings)) { q.open(); - assertThat(q.getHeadPage().getPageNum(), is(equalTo(0))); + assertThat(q.getHeadPage().getPageNum(), is(0)); Checkpoint c = q.getCheckpointIO().read("checkpoint.head"); - assertThat(c.getPageNum(), is(equalTo(0))); - assertThat(c.getElementCount(), is(equalTo(0))); - assertThat(c.getMinSeqNum(), is(equalTo(0L))); - assertThat(c.getFirstUnackedSeqNum(), is(equalTo(0L))); - assertThat(c.getFirstUnackedPageNum(), is(equalTo(0))); + assertThat(c.getPageNum(), is(0)); + assertThat(c.getElementCount(), is(0)); + assertThat(c.getMinSeqNum(), is(0L)); + assertThat(c.getFirstUnackedSeqNum(), is(0L)); + assertThat(c.getFirstUnackedPageNum(), is(0)); for (Queueable e : elements1) { q.write(e); } c = q.getCheckpointIO().read("checkpoint.head"); - assertThat(c.getPageNum(), is(equalTo(0))); - assertThat(c.getElementCount(), is(equalTo(0))); - assertThat(c.getMinSeqNum(), is(equalTo(0L))); - assertThat(c.getFirstUnackedSeqNum(), is(equalTo(0L))); - assertThat(c.getFirstUnackedPageNum(), is(equalTo(0))); + assertThat(c.getPageNum(), is(0)); + assertThat(c.getElementCount(), is(0)); + assertThat(c.getMinSeqNum(), is(0L)); + assertThat(c.getFirstUnackedSeqNum(), is(0L)); + assertThat(c.getFirstUnackedPageNum(), is(0)); - // assertThat(elements1.get(1).getSeqNum(), is(equalTo(2L))); + // assertThat(elements1.get(1).getSeqNum(), is(2L)); q.ensurePersistedUpto(2); c = q.getCheckpointIO().read("checkpoint.head"); - assertThat(c.getPageNum(), is(equalTo(0))); - assertThat(c.getElementCount(), is(equalTo(2))); - assertThat(c.getMinSeqNum(), is(equalTo(1L))); - assertThat(c.getFirstUnackedSeqNum(), is(equalTo(1L))); - assertThat(c.getFirstUnackedPageNum(), is(equalTo(0))); + assertThat(c.getPageNum(), is(0)); + assertThat(c.getElementCount(), is(2)); + assertThat(c.getMinSeqNum(), is(1L)); + assertThat(c.getFirstUnackedSeqNum(), is(1L)); + assertThat(c.getFirstUnackedPageNum(), is(0)); for (Queueable e : elements2) { q.write(e); } c = q.getCheckpointIO().read("checkpoint.head"); - assertThat(c.getPageNum(), is(equalTo(1))); - assertThat(c.getElementCount(), is(equalTo(0))); - assertThat(c.getMinSeqNum(), is(equalTo(0L))); - assertThat(c.getFirstUnackedSeqNum(), is(equalTo(0L))); - assertThat(c.getFirstUnackedPageNum(), is(equalTo(0))); + assertThat(c.getPageNum(), is(1)); + assertThat(c.getElementCount(), is(0)); + assertThat(c.getMinSeqNum(), is(0L)); + assertThat(c.getFirstUnackedSeqNum(), is(0L)); + assertThat(c.getFirstUnackedPageNum(), is(0)); c = q.getCheckpointIO().read("checkpoint.0"); - assertThat(c.getPageNum(), is(equalTo(0))); - assertThat(c.getElementCount(), is(equalTo(2))); - assertThat(c.getMinSeqNum(), is(equalTo(1L))); - assertThat(c.getFirstUnackedSeqNum(), is(equalTo(1L))); + assertThat(c.getPageNum(), is(0)); + assertThat(c.getElementCount(), is(2)); + assertThat(c.getMinSeqNum(), is(1L)); + assertThat(c.getFirstUnackedSeqNum(), is(1L)); Batch b = q.nonBlockReadBatch(10); b.close(); @@ -257,21 +258,21 @@ public class QueueTest { } c = q.getCheckpointIO().read("checkpoint.head"); - assertThat(c.getPageNum(), is(equalTo(1))); - assertThat(c.getElementCount(), is(equalTo(2))); - assertThat(c.getMinSeqNum(), is(equalTo(3L))); - assertThat(c.getFirstUnackedSeqNum(), is(equalTo(3L))); - assertThat(c.getFirstUnackedPageNum(), is(equalTo(1))); + assertThat(c.getPageNum(), is(1)); + assertThat(c.getElementCount(), is(2)); + assertThat(c.getMinSeqNum(), is(3L)); + assertThat(c.getFirstUnackedSeqNum(), is(3L)); + assertThat(c.getFirstUnackedPageNum(), is(1)); b = q.nonBlockReadBatch(10); b.close(); c = q.getCheckpointIO().read("checkpoint.head"); - assertThat(c.getPageNum(), is(equalTo(1))); - assertThat(c.getElementCount(), is(equalTo(2))); - assertThat(c.getMinSeqNum(), is(equalTo(3L))); - assertThat(c.getFirstUnackedSeqNum(), is(equalTo(5L))); - assertThat(c.getFirstUnackedPageNum(), is(equalTo(1))); + assertThat(c.getPageNum(), is(1)); + assertThat(c.getElementCount(), is(2)); + assertThat(c.getMinSeqNum(), is(3L)); + assertThat(c.getFirstUnackedSeqNum(), is(5L)); + assertThat(c.getFirstUnackedPageNum(), is(1)); } } @@ -298,14 +299,14 @@ public class QueueTest { q.write(e); } - assertThat(q.getTailPages().size(), is(equalTo(page_count - 1))); + assertThat(q.getTailPages().size(), is(page_count - 1)); // first read all elements List batches = new ArrayList<>(); for (Batch b = q.nonBlockReadBatch(1); b != null; b = q.nonBlockReadBatch(1)) { batches.add(b); } - assertThat(batches.size(), is(equalTo(page_count))); + assertThat(batches.size(), is(page_count)); // then ack randomly Collections.shuffle(batches); @@ -313,7 +314,7 @@ public class QueueTest { b.close(); } - assertThat(q.getTailPages().size(), is(equalTo(0))); + assertThat(q.getTailPages().size(), is(0)); } } } @@ -331,7 +332,7 @@ public class QueueTest { q.open(); long seqNum = q.write(element); - assertThat(seqNum, is(equalTo(1L))); + assertThat(seqNum, is(1L)); assertThat(q.isFull(), is(false)); int ELEMENT_COUNT = 1000; @@ -349,22 +350,22 @@ public class QueueTest { // spin wait until data is written and write blocks Thread.sleep(1); } - assertThat(q.unreadCount, is(equalTo(2L))); + assertThat(q.unreadCount, is(2L)); assertThat(future.isDone(), is(false)); // read one element, which will unblock the last write Batch b = q.nonBlockReadBatch(1); - assertThat(b.getElements().size(), is(equalTo(1))); + assertThat(b.getElements().size(), is(1)); // future result is the blocked write seqNum for the second element - assertThat(future.get(), is(equalTo(2L + i))); + assertThat(future.get(), is(2L + i)); assertThat(q.isFull(), is(false)); executor.shutdown(); } // since we did not ack and pages hold a single item - assertThat(q.getTailPages().size(), is(equalTo(ELEMENT_COUNT))); + assertThat(q.getTailPages().size(), is(ELEMENT_COUNT)); } } @@ -383,16 +384,14 @@ public class QueueTest { // perform first non-blocking write long seqNum = q.write(element); - assertThat(seqNum, is(equalTo(1L))); + assertThat(seqNum, is(1L)); assertThat(q.isFull(), is(false)); int ELEMENT_COUNT = 1000; for (int i = 0; i < ELEMENT_COUNT; i++) { // we expect this next write call to block so let's wrap it in a Future - Callable write = () -> { - return q.write(element); - }; + Callable write = () -> q.write(element); ExecutorService executor = Executors.newFixedThreadPool(1); Future future = executor.submit(write); @@ -403,24 +402,24 @@ public class QueueTest { } // read one element, which will unblock the last write Batch b = q.nonBlockReadBatch(1); - assertThat(b, is(notNullValue())); - assertThat(b.getElements().size(), is(equalTo(1))); + assertThat(b, notNullValue()); + assertThat(b.getElements().size(), is(1)); b.close(); // future result is the blocked write seqNum for the second element - assertThat(future.get(), is(equalTo(2L + i))); + assertThat(future.get(), is(2L + i)); assertThat(q.isFull(), is(false)); executor.shutdown(); } // all batches are acked, no tail pages should exist - assertThat(q.getTailPages().size(), is(equalTo(0))); + assertThat(q.getTailPages().size(), is(0)); // the last read unblocked the last write so some elements (1 unread and maybe some acked) should be in the head page assertThat(q.getHeadPage().getElementCount() > 0L, is(true)); - assertThat(q.getHeadPage().unreadCount(), is(equalTo(1L))); - assertThat(q.unreadCount, is(equalTo(1L))); + assertThat(q.getHeadPage().unreadCount(), is(1L)); + assertThat(q.unreadCount, is(1L)); } } @@ -443,9 +442,7 @@ public class QueueTest { assertThat(q.isFull(), is(false)); // we expect this next write call to block so let's wrap it in a Future - Callable write = () -> { - return q.write(element); - }; + Callable write = () -> q.write(element); ExecutorService executor = Executors.newFixedThreadPool(1); Future future = executor.submit(write); @@ -523,9 +520,7 @@ public class QueueTest { Batch b = q.readBatch(10); // we expect this next write call to block so let's wrap it in a Future - Callable write = () -> { - return q.write(element); - }; + Callable write = () -> q.write(element); ExecutorService executor = Executors.newFixedThreadPool(1); Future future = executor.submit(write); assertThat(future.isDone(), is(false)); @@ -537,7 +532,7 @@ public class QueueTest { b.close(); // purge 1 page - assertThat(future.get(), is(equalTo(ELEMENT_COUNT + 1L))); + assertThat(future.get(), is(ELEMENT_COUNT + 1L)); executor.shutdown(); } @@ -563,9 +558,7 @@ public class QueueTest { assertThat(q.isFull(), is(false)); // we expect this next write call to block so let's wrap it in a Future - Callable write = () -> { - return q.write(element); - }; + Callable write = () -> q.write(element); ExecutorService executor = Executors.newFixedThreadPool(1); Future future = executor.submit(write); @@ -599,7 +592,7 @@ public class QueueTest { element3 = new StringElement("third"); firstSeqNum = q.write(element1); b = q.nonBlockReadBatch(1); - assertThat(b.getElements().size(), is(equalTo(1))); + assertThat(b.getElements().size(), is(1)); } long secondSeqNum; @@ -611,13 +604,13 @@ public class QueueTest { thirdSeqNum = q.write(element3); b = q.nonBlockReadBatch(1); - assertThat(b.getElements().size(), is(equalTo(1))); - assertThat(b.getElements().get(0), is(equalTo(element1))); + assertThat(b.getElements().size(), is(1)); + assertThat(b.getElements().get(0), is(element1)); b = q.nonBlockReadBatch(2); - assertThat(b.getElements().size(), is(equalTo(2))); - assertThat(b.getElements().get(0), is(equalTo(element2))); - assertThat(b.getElements().get(1), is(equalTo(element3))); + assertThat(b.getElements().size(), is(2)); + assertThat(b.getElements().get(0), is(element2)); + assertThat(b.getElements().get(1), is(element3)); q.ack(Collections.singletonList(firstSeqNum)); } @@ -626,7 +619,7 @@ public class QueueTest { q.open(); b = q.nonBlockReadBatch(2); - assertThat(b.getElements().size(), is(equalTo(2))); + assertThat(b.getElements().size(), is(2)); q.ack(Arrays.asList(secondSeqNum, thirdSeqNum)); @@ -673,7 +666,7 @@ public class QueueTest { for (Future future : futures) { int result = future.get(); - assertThat(result, is(equalTo(ELEMENT_COUNT))); + assertThat(result, is(ELEMENT_COUNT)); } assertThat(q.getTailPages().isEmpty(), is(true)); @@ -694,12 +687,12 @@ public class QueueTest { Batch b; q.write(element); b = q.nonBlockReadBatch(1); - assertThat(b.getElements().size(), is(equalTo(1))); + assertThat(b.getElements().size(), is(1)); b.close(); q.write(element); b = q.nonBlockReadBatch(1); - assertThat(b.getElements().size(), is(equalTo(1))); + assertThat(b.getElements().size(), is(1)); b.close(); // head page should be full and fully acked @@ -713,8 +706,8 @@ public class QueueTest { // since head page was fully acked it should not have created a new tail page assertThat(q.getTailPages().isEmpty(), is(true)); - assertThat(q.getHeadPage().getPageNum(), is(equalTo(1))); - assertThat(q.firstUnackedPageNum(), is(equalTo(1))); + assertThat(q.getHeadPage().getPageNum(), is(1)); + assertThat(q.firstUnackedPageNum(), is(1)); assertThat(q.isFullyAcked(), is(false)); } } @@ -723,7 +716,7 @@ public class QueueTest { public void getsPersistedByteSizeCorrectlyForUnopened() throws Exception { Settings settings = TestSettings.persistedQueueSettings(100, dataPath); try (Queue q = new Queue(settings)) { - assertThat(q.getPersistedByteSize(), is(equalTo(0L))); + assertThat(q.getPersistedByteSize(), is(0L)); } } }