mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
b7f4fd7ebf
commit
9fe4e9b3b7
1 changed files with 109 additions and 116 deletions
|
@ -23,6 +23,7 @@ import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
|||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -41,7 +42,7 @@ public class QueueTest {
|
|||
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(10))) {
|
||||
q.open();
|
||||
|
||||
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
|
||||
assertThat(q.nonBlockReadBatch(1), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,9 +56,9 @@ public class QueueTest {
|
|||
|
||||
Batch b = q.nonBlockReadBatch(1);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(q.nonBlockReadBatch(1), is(equalTo(null)));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
assertThat(b.getElements().get(0).toString(), is(element.toString()));
|
||||
assertThat(q.nonBlockReadBatch(1), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,9 +72,9 @@ public class QueueTest {
|
|||
|
||||
Batch b = q.nonBlockReadBatch(2);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString())));
|
||||
assertThat(q.nonBlockReadBatch(2), is(equalTo(null)));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
assertThat(b.getElements().get(0).toString(), is(element.toString()));
|
||||
assertThat(q.nonBlockReadBatch(2), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -91,14 +92,14 @@ public class QueueTest {
|
|||
|
||||
Batch b = q.nonBlockReadBatch(2);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(2)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(0).toString())));
|
||||
assertThat(b.getElements().get(1).toString(), is(equalTo(elements.get(1).toString())));
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
assertThat(b.getElements().get(0).toString(), is(elements.get(0).toString()));
|
||||
assertThat(b.getElements().get(1).toString(), is(elements.get(1).toString()));
|
||||
|
||||
b = q.nonBlockReadBatch(2);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(2).toString())));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
assertThat(b.getElements().get(0).toString(), is(elements.get(2).toString()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,33 +116,33 @@ public class QueueTest {
|
|||
}
|
||||
|
||||
// total of 2 pages: 1 head and 1 tail
|
||||
assertThat(q.getTailPages().size(), is(equalTo(1)));
|
||||
assertThat(q.getTailPages().size(), is(1));
|
||||
|
||||
assertThat(q.getTailPages().get(0).isFullyRead(), is(equalTo(false)));
|
||||
assertThat(q.getTailPages().get(0).isFullyAcked(), is(equalTo(false)));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(equalTo(false)));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false)));
|
||||
assertThat(q.getTailPages().get(0).isFullyRead(), is(false));
|
||||
assertThat(q.getTailPages().get(0).isFullyAcked(), is(false));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(false));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(false));
|
||||
|
||||
Batch b = q.nonBlockReadBatch(10);
|
||||
assertThat(b.getElements().size(), is(equalTo(2)));
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
|
||||
assertThat(q.getTailPages().size(), is(equalTo(1)));
|
||||
assertThat(q.getTailPages().size(), is(1));
|
||||
|
||||
assertThat(q.getTailPages().get(0).isFullyRead(), is(equalTo(true)));
|
||||
assertThat(q.getTailPages().get(0).isFullyAcked(), is(equalTo(false)));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(equalTo(false)));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false)));
|
||||
assertThat(q.getTailPages().get(0).isFullyRead(), is(true));
|
||||
assertThat(q.getTailPages().get(0).isFullyAcked(), is(false));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(false));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(false));
|
||||
|
||||
b = q.nonBlockReadBatch(10);
|
||||
assertThat(b.getElements().size(), is(equalTo(2)));
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
|
||||
assertThat(q.getTailPages().get(0).isFullyRead(), is(equalTo(true)));
|
||||
assertThat(q.getTailPages().get(0).isFullyAcked(), is(equalTo(false)));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(equalTo(true)));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false)));
|
||||
assertThat(q.getTailPages().get(0).isFullyRead(), is(true));
|
||||
assertThat(q.getTailPages().get(0).isFullyAcked(), is(false));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(true));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(false));
|
||||
|
||||
b = q.nonBlockReadBatch(10);
|
||||
assertThat(b, is(equalTo(null)));
|
||||
assertThat(b, nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,30 +161,30 @@ public class QueueTest {
|
|||
|
||||
Batch b = q.nonBlockReadBatch(10);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(2)));
|
||||
assertThat(q.getTailPages().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
assertThat(q.getTailPages().size(), is(1));
|
||||
|
||||
// lets keep a ref to that tail page before acking
|
||||
TailPage tailPage = q.getTailPages().get(0);
|
||||
|
||||
assertThat(tailPage.isFullyRead(), is(equalTo(true)));
|
||||
assertThat(tailPage.isFullyRead(), is(true));
|
||||
|
||||
// ack first batch which includes all elements from tailPages
|
||||
b.close();
|
||||
|
||||
assertThat(q.getTailPages().size(), is(equalTo(0)));
|
||||
assertThat(tailPage.isFullyRead(), is(equalTo(true)));
|
||||
assertThat(tailPage.isFullyAcked(), is(equalTo(true)));
|
||||
assertThat(q.getTailPages().size(), is(0));
|
||||
assertThat(tailPage.isFullyRead(), is(true));
|
||||
assertThat(tailPage.isFullyAcked(), is(true));
|
||||
|
||||
b = q.nonBlockReadBatch(10);
|
||||
|
||||
assertThat(b.getElements().size(), is(equalTo(2)));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(equalTo(true)));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(false)));
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
assertThat(q.getHeadPage().isFullyRead(), is(true));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(false));
|
||||
|
||||
b.close();
|
||||
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(true)));
|
||||
assertThat(q.getHeadPage().isFullyAcked(), is(true));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -200,51 +201,51 @@ public class QueueTest {
|
|||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
assertThat(q.getHeadPage().getPageNum(), is(equalTo(0)));
|
||||
assertThat(q.getHeadPage().getPageNum(), is(0));
|
||||
Checkpoint c = q.getCheckpointIO().read("checkpoint.head");
|
||||
assertThat(c.getPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getElementCount(), is(equalTo(0)));
|
||||
assertThat(c.getMinSeqNum(), is(equalTo(0L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(0L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getPageNum(), is(0));
|
||||
assertThat(c.getElementCount(), is(0));
|
||||
assertThat(c.getMinSeqNum(), is(0L));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(0L));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(0));
|
||||
|
||||
for (Queueable e : elements1) {
|
||||
q.write(e);
|
||||
}
|
||||
|
||||
c = q.getCheckpointIO().read("checkpoint.head");
|
||||
assertThat(c.getPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getElementCount(), is(equalTo(0)));
|
||||
assertThat(c.getMinSeqNum(), is(equalTo(0L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(0L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getPageNum(), is(0));
|
||||
assertThat(c.getElementCount(), is(0));
|
||||
assertThat(c.getMinSeqNum(), is(0L));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(0L));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(0));
|
||||
|
||||
// assertThat(elements1.get(1).getSeqNum(), is(equalTo(2L)));
|
||||
// assertThat(elements1.get(1).getSeqNum(), is(2L));
|
||||
q.ensurePersistedUpto(2);
|
||||
|
||||
c = q.getCheckpointIO().read("checkpoint.head");
|
||||
assertThat(c.getPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getElementCount(), is(equalTo(2)));
|
||||
assertThat(c.getMinSeqNum(), is(equalTo(1L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(1L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getPageNum(), is(0));
|
||||
assertThat(c.getElementCount(), is(2));
|
||||
assertThat(c.getMinSeqNum(), is(1L));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(1L));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(0));
|
||||
|
||||
for (Queueable e : elements2) {
|
||||
q.write(e);
|
||||
}
|
||||
|
||||
c = q.getCheckpointIO().read("checkpoint.head");
|
||||
assertThat(c.getPageNum(), is(equalTo(1)));
|
||||
assertThat(c.getElementCount(), is(equalTo(0)));
|
||||
assertThat(c.getMinSeqNum(), is(equalTo(0L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(0L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getPageNum(), is(1));
|
||||
assertThat(c.getElementCount(), is(0));
|
||||
assertThat(c.getMinSeqNum(), is(0L));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(0L));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(0));
|
||||
|
||||
c = q.getCheckpointIO().read("checkpoint.0");
|
||||
assertThat(c.getPageNum(), is(equalTo(0)));
|
||||
assertThat(c.getElementCount(), is(equalTo(2)));
|
||||
assertThat(c.getMinSeqNum(), is(equalTo(1L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(1L)));
|
||||
assertThat(c.getPageNum(), is(0));
|
||||
assertThat(c.getElementCount(), is(2));
|
||||
assertThat(c.getMinSeqNum(), is(1L));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(1L));
|
||||
|
||||
Batch b = q.nonBlockReadBatch(10);
|
||||
b.close();
|
||||
|
@ -257,21 +258,21 @@ public class QueueTest {
|
|||
}
|
||||
|
||||
c = q.getCheckpointIO().read("checkpoint.head");
|
||||
assertThat(c.getPageNum(), is(equalTo(1)));
|
||||
assertThat(c.getElementCount(), is(equalTo(2)));
|
||||
assertThat(c.getMinSeqNum(), is(equalTo(3L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(3L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(1)));
|
||||
assertThat(c.getPageNum(), is(1));
|
||||
assertThat(c.getElementCount(), is(2));
|
||||
assertThat(c.getMinSeqNum(), is(3L));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(3L));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(1));
|
||||
|
||||
b = q.nonBlockReadBatch(10);
|
||||
b.close();
|
||||
|
||||
c = q.getCheckpointIO().read("checkpoint.head");
|
||||
assertThat(c.getPageNum(), is(equalTo(1)));
|
||||
assertThat(c.getElementCount(), is(equalTo(2)));
|
||||
assertThat(c.getMinSeqNum(), is(equalTo(3L)));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(equalTo(5L)));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(equalTo(1)));
|
||||
assertThat(c.getPageNum(), is(1));
|
||||
assertThat(c.getElementCount(), is(2));
|
||||
assertThat(c.getMinSeqNum(), is(3L));
|
||||
assertThat(c.getFirstUnackedSeqNum(), is(5L));
|
||||
assertThat(c.getFirstUnackedPageNum(), is(1));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -298,14 +299,14 @@ public class QueueTest {
|
|||
q.write(e);
|
||||
}
|
||||
|
||||
assertThat(q.getTailPages().size(), is(equalTo(page_count - 1)));
|
||||
assertThat(q.getTailPages().size(), is(page_count - 1));
|
||||
|
||||
// first read all elements
|
||||
List<Batch> batches = new ArrayList<>();
|
||||
for (Batch b = q.nonBlockReadBatch(1); b != null; b = q.nonBlockReadBatch(1)) {
|
||||
batches.add(b);
|
||||
}
|
||||
assertThat(batches.size(), is(equalTo(page_count)));
|
||||
assertThat(batches.size(), is(page_count));
|
||||
|
||||
// then ack randomly
|
||||
Collections.shuffle(batches);
|
||||
|
@ -313,7 +314,7 @@ public class QueueTest {
|
|||
b.close();
|
||||
}
|
||||
|
||||
assertThat(q.getTailPages().size(), is(equalTo(0)));
|
||||
assertThat(q.getTailPages().size(), is(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -331,7 +332,7 @@ public class QueueTest {
|
|||
q.open();
|
||||
|
||||
long seqNum = q.write(element);
|
||||
assertThat(seqNum, is(equalTo(1L)));
|
||||
assertThat(seqNum, is(1L));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
int ELEMENT_COUNT = 1000;
|
||||
|
@ -349,22 +350,22 @@ public class QueueTest {
|
|||
// spin wait until data is written and write blocks
|
||||
Thread.sleep(1);
|
||||
}
|
||||
assertThat(q.unreadCount, is(equalTo(2L)));
|
||||
assertThat(q.unreadCount, is(2L));
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
// read one element, which will unblock the last write
|
||||
Batch b = q.nonBlockReadBatch(1);
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
|
||||
// future result is the blocked write seqNum for the second element
|
||||
assertThat(future.get(), is(equalTo(2L + i)));
|
||||
assertThat(future.get(), is(2L + i));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
// since we did not ack and pages hold a single item
|
||||
assertThat(q.getTailPages().size(), is(equalTo(ELEMENT_COUNT)));
|
||||
assertThat(q.getTailPages().size(), is(ELEMENT_COUNT));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -383,16 +384,14 @@ public class QueueTest {
|
|||
// perform first non-blocking write
|
||||
long seqNum = q.write(element);
|
||||
|
||||
assertThat(seqNum, is(equalTo(1L)));
|
||||
assertThat(seqNum, is(1L));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
int ELEMENT_COUNT = 1000;
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
|
@ -403,24 +402,24 @@ public class QueueTest {
|
|||
}
|
||||
// read one element, which will unblock the last write
|
||||
Batch b = q.nonBlockReadBatch(1);
|
||||
assertThat(b, is(notNullValue()));
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b, notNullValue());
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
b.close();
|
||||
|
||||
// future result is the blocked write seqNum for the second element
|
||||
assertThat(future.get(), is(equalTo(2L + i)));
|
||||
assertThat(future.get(), is(2L + i));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
// all batches are acked, no tail pages should exist
|
||||
assertThat(q.getTailPages().size(), is(equalTo(0)));
|
||||
assertThat(q.getTailPages().size(), is(0));
|
||||
|
||||
// the last read unblocked the last write so some elements (1 unread and maybe some acked) should be in the head page
|
||||
assertThat(q.getHeadPage().getElementCount() > 0L, is(true));
|
||||
assertThat(q.getHeadPage().unreadCount(), is(equalTo(1L)));
|
||||
assertThat(q.unreadCount, is(equalTo(1L)));
|
||||
assertThat(q.getHeadPage().unreadCount(), is(1L));
|
||||
assertThat(q.unreadCount, is(1L));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -443,9 +442,7 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
|
@ -523,9 +520,7 @@ public class QueueTest {
|
|||
Batch b = q.readBatch(10);
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
@ -537,7 +532,7 @@ public class QueueTest {
|
|||
|
||||
b.close(); // purge 1 page
|
||||
|
||||
assertThat(future.get(), is(equalTo(ELEMENT_COUNT + 1L)));
|
||||
assertThat(future.get(), is(ELEMENT_COUNT + 1L));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
@ -563,9 +558,7 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
|
@ -599,7 +592,7 @@ public class QueueTest {
|
|||
element3 = new StringElement("third");
|
||||
firstSeqNum = q.write(element1);
|
||||
b = q.nonBlockReadBatch(1);
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
}
|
||||
|
||||
long secondSeqNum;
|
||||
|
@ -611,13 +604,13 @@ public class QueueTest {
|
|||
thirdSeqNum = q.write(element3);
|
||||
|
||||
b = q.nonBlockReadBatch(1);
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().get(0), is(equalTo(element1)));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
assertThat(b.getElements().get(0), is(element1));
|
||||
|
||||
b = q.nonBlockReadBatch(2);
|
||||
assertThat(b.getElements().size(), is(equalTo(2)));
|
||||
assertThat(b.getElements().get(0), is(equalTo(element2)));
|
||||
assertThat(b.getElements().get(1), is(equalTo(element3)));
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
assertThat(b.getElements().get(0), is(element2));
|
||||
assertThat(b.getElements().get(1), is(element3));
|
||||
|
||||
q.ack(Collections.singletonList(firstSeqNum));
|
||||
}
|
||||
|
@ -626,7 +619,7 @@ public class QueueTest {
|
|||
q.open();
|
||||
|
||||
b = q.nonBlockReadBatch(2);
|
||||
assertThat(b.getElements().size(), is(equalTo(2)));
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
|
||||
q.ack(Arrays.asList(secondSeqNum, thirdSeqNum));
|
||||
|
||||
|
@ -673,7 +666,7 @@ public class QueueTest {
|
|||
|
||||
for (Future<Integer> future : futures) {
|
||||
int result = future.get();
|
||||
assertThat(result, is(equalTo(ELEMENT_COUNT)));
|
||||
assertThat(result, is(ELEMENT_COUNT));
|
||||
}
|
||||
|
||||
assertThat(q.getTailPages().isEmpty(), is(true));
|
||||
|
@ -694,12 +687,12 @@ public class QueueTest {
|
|||
Batch b;
|
||||
q.write(element);
|
||||
b = q.nonBlockReadBatch(1);
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
b.close();
|
||||
|
||||
q.write(element);
|
||||
b = q.nonBlockReadBatch(1);
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
b.close();
|
||||
|
||||
// head page should be full and fully acked
|
||||
|
@ -713,8 +706,8 @@ public class QueueTest {
|
|||
// since head page was fully acked it should not have created a new tail page
|
||||
|
||||
assertThat(q.getTailPages().isEmpty(), is(true));
|
||||
assertThat(q.getHeadPage().getPageNum(), is(equalTo(1)));
|
||||
assertThat(q.firstUnackedPageNum(), is(equalTo(1)));
|
||||
assertThat(q.getHeadPage().getPageNum(), is(1));
|
||||
assertThat(q.firstUnackedPageNum(), is(1));
|
||||
assertThat(q.isFullyAcked(), is(false));
|
||||
}
|
||||
}
|
||||
|
@ -723,7 +716,7 @@ public class QueueTest {
|
|||
public void getsPersistedByteSizeCorrectlyForUnopened() throws Exception {
|
||||
Settings settings = TestSettings.persistedQueueSettings(100, dataPath);
|
||||
try (Queue q = new Queue(settings)) {
|
||||
assertThat(q.getPersistedByteSize(), is(equalTo(0L)));
|
||||
assertThat(q.getPersistedByteSize(), is(0L));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue