mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
maximize PQ read bach size by using same read timeout semantic as MQ #8702
add from another LongVector use timedout read batch and added test for maximization WIP maximized batch read size fixes and cleanups cleanups use the await returned boolean to assess timeout remove useless constant use Callable<Void> and cosmetic fix
This commit is contained in:
parent
d3c33e7926
commit
e75eca4483
9 changed files with 288 additions and 151 deletions
|
@ -100,7 +100,7 @@ public class QueueRWBenchmark {
|
|||
}
|
||||
});
|
||||
for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) {
|
||||
try (Batch batch = queuePersisted.readBatch(BATCH_SIZE)) {
|
||||
try (Batch batch = queuePersisted.readBatch(BATCH_SIZE, TimeUnit.SECONDS.toMillis(1))) {
|
||||
for (final Queueable elem : batch.getElements()) {
|
||||
blackhole.consume(elem);
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ public class QueueRWBenchmark {
|
|||
}
|
||||
});
|
||||
for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) {
|
||||
try (Batch batch = queueMemory.readBatch(BATCH_SIZE)) {
|
||||
try (Batch batch = queueMemory.readBatch(BATCH_SIZE, TimeUnit.SECONDS.toMillis(1))) {
|
||||
for (final Queueable elem : batch.getElements()) {
|
||||
blackhole.consume(elem);
|
||||
}
|
||||
|
|
|
@ -15,17 +15,17 @@ public class Batch implements Closeable {
|
|||
private final Queue queue;
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
public Batch(List<Queueable> elements, LongVector seqNums, Queue q) {
|
||||
this.elements = elements;
|
||||
public Batch(SequencedList<byte[]> serialized, Queue q) {
|
||||
this(serialized.getElements(), serialized.getSeqNums(), q);
|
||||
}
|
||||
|
||||
public Batch(List<byte[]> elements, LongVector seqNums, Queue q) {
|
||||
this.elements = deserializeElements(elements, q);
|
||||
this.seqNums = seqNums;
|
||||
this.queue = q;
|
||||
this.closed = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
public Batch(SequencedList<byte[]> serialized, Queue q) {
|
||||
this(deserializeElements(serialized.getElements(), q), serialized.getSeqNums(), q);
|
||||
}
|
||||
|
||||
// close acks the batch ackable events
|
||||
public void close() throws IOException {
|
||||
if (closed.getAndSet(true) == false) {
|
||||
|
|
|
@ -202,21 +202,24 @@ public final class Page implements Closeable {
|
|||
}
|
||||
|
||||
public void behead() throws IOException {
|
||||
checkpoint();
|
||||
assert this.writable == true : "cannot behead a tail page";
|
||||
|
||||
headPageCheckpoint();
|
||||
|
||||
this.writable = false;
|
||||
this.lastCheckpoint = new Checkpoint(0, 0, 0, 0, 0);
|
||||
|
||||
// first thing that must be done after beheading is to create a new checkpoint for that new tail page
|
||||
// tail page checkpoint does NOT includes a fsync
|
||||
checkpoint();
|
||||
tailPageCheckpoint();
|
||||
}
|
||||
|
||||
// TODO: should we have a better deactivation strategy to avoid too rapid reactivation scenario?
|
||||
Page firstUnreadPage = queue.firstUnreadPage();
|
||||
if (firstUnreadPage == null || (this.getPageNum() > firstUnreadPage.getPageNum())) {
|
||||
// deactivate if this new tailPage is not where the read is occurring
|
||||
this.getPageIO().deactivate();
|
||||
}
|
||||
/**
|
||||
* signal that this page is not active and resources can be released
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deactivate() throws IOException {
|
||||
this.getPageIO().deactivate();
|
||||
}
|
||||
|
||||
public boolean hasSpace(int byteSize) {
|
||||
|
|
|
@ -25,14 +25,6 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
// TODO: Notes
|
||||
//
|
||||
// - time-based fsync
|
||||
//
|
||||
// - tragic errors handling
|
||||
// - what errors cause whole queue to be broken
|
||||
// - where to put try/catch for these errors
|
||||
|
||||
public final class Queue implements Closeable {
|
||||
|
||||
private long seqNum;
|
||||
|
@ -147,9 +139,10 @@ public final class Queue implements Closeable {
|
|||
return this.unreadCount;
|
||||
}
|
||||
|
||||
// moved queue opening logic in open() method until we have something in place to used in-memory checkpoints for testing
|
||||
// because for now we need to pass a Queue instance to the Page and we don't want to trigger a Queue recovery when
|
||||
// testing Page
|
||||
/**
|
||||
* Open an existing {@link Queue} or create a new one in the configured path.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void open() throws IOException {
|
||||
final int headPageNum;
|
||||
|
||||
|
@ -308,8 +301,12 @@ public final class Queue implements Closeable {
|
|||
page.getPageIO().deactivate();
|
||||
}
|
||||
|
||||
// create a new empty headpage for the given pageNum and immediately checkpoint it
|
||||
// @param pageNum the page number of the new head page
|
||||
/**
|
||||
* create a new empty headpage for the given pageNum and immediately checkpoint it
|
||||
*
|
||||
* @param pageNum the page number of the new head page
|
||||
* @throws IOException
|
||||
*/
|
||||
private void newCheckpointedHeadpage(int pageNum) throws IOException {
|
||||
PageIO headPageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
||||
headPageIO.create();
|
||||
|
@ -318,8 +315,14 @@ public final class Queue implements Closeable {
|
|||
this.currentByteSize += headPageIO.getCapacity();
|
||||
}
|
||||
|
||||
// @param element the Queueable object to write to the queue
|
||||
// @return long written sequence number
|
||||
/**
|
||||
* write a {@link Queueable} element to the queue. Note that the element will always be written and the queue full
|
||||
* condition will be checked and waited on **after** the write operation.
|
||||
*
|
||||
* @param element the {@link Queueable} element to write
|
||||
* @return the written sequence number
|
||||
* @throws IOException
|
||||
*/
|
||||
public long write(Queueable element) throws IOException {
|
||||
byte[] data = element.serialize();
|
||||
|
||||
|
@ -344,18 +347,13 @@ public final class Queue implements Closeable {
|
|||
int newHeadPageNum = this.headPage.pageNum + 1;
|
||||
|
||||
if (this.headPage.isFullyAcked()) {
|
||||
// purge the old headPage because its full and fully acked
|
||||
// there is no checkpoint file to purge since just creating a new TailPage from a HeadPage does
|
||||
// not trigger a checkpoint creation in itself
|
||||
// here we can just purge the data file and avoid beheading since we do not need
|
||||
// to add this fully hacked page into tailPages. a new head page will just be created.
|
||||
// TODO: we could possibly reuse the same page file but just rename it?
|
||||
this.headPage.purge();
|
||||
currentByteSize -= this.headPage.getPageIO().getCapacity();
|
||||
} else {
|
||||
// beheading includes checkpoint+fsync if required
|
||||
this.headPage.behead();
|
||||
this.tailPages.add(this.headPage);
|
||||
if (! this.headPage.isFullyRead()) {
|
||||
this.unreadTailPages.add(this.headPage);
|
||||
}
|
||||
behead();
|
||||
}
|
||||
|
||||
// create new head page
|
||||
|
@ -395,6 +393,29 @@ public final class Queue implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* mark head page as read-only (behead) and add it to the tailPages and unreadTailPages collections accordingly
|
||||
* also deactivate it if it's not next-in-line for reading
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void behead() throws IOException {
|
||||
// beheading includes checkpoint+fsync if required
|
||||
this.headPage.behead();
|
||||
this.tailPages.add(this.headPage);
|
||||
|
||||
if (! this.headPage.isFullyRead()) {
|
||||
if (!this.unreadTailPages.isEmpty()) {
|
||||
// there are already other unread pages so this new one is not next in line and we can deactivate
|
||||
this.headPage.deactivate();
|
||||
}
|
||||
this.unreadTailPages.add(this.headPage);
|
||||
} else {
|
||||
// it is fully read so we can deactivate
|
||||
this.headPage.deactivate();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Checks if the Queue is full, with "full" defined as either of:</p>
|
||||
* <p>Assuming a maximum size of the queue larger than 0 is defined:</p>
|
||||
|
@ -445,7 +466,9 @@ public final class Queue implements Closeable {
|
|||
|
||||
}
|
||||
|
||||
// @return true if the queue is fully acked, which implies that it is fully read which works as an "empty" state.
|
||||
/**
|
||||
* @return true if the queue is fully acked, which implies that it is fully read which works as an "empty" state.
|
||||
*/
|
||||
public boolean isFullyAcked() {
|
||||
lock.lock();
|
||||
try {
|
||||
|
@ -455,7 +478,12 @@ public final class Queue implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
// @param seqNum the element sequence number upper bound for which persistence should be guaranteed (by fsync'ing)
|
||||
/**
|
||||
* guarantee persistence up to a given sequence number.
|
||||
*
|
||||
* @param seqNum the element sequence number upper bound for which persistence should be guaranteed (by fsync'ing)
|
||||
* @throws IOException
|
||||
*/
|
||||
public void ensurePersistedUpto(long seqNum) throws IOException{
|
||||
lock.lock();
|
||||
try {
|
||||
|
@ -465,100 +493,100 @@ public final class Queue implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
// non-blockin queue read
|
||||
// @param limit read the next bach of size up to this limit. the returned batch size can be smaller than than the requested limit if fewer elements are available
|
||||
// @return Batch the batch containing 1 or more element up to the required limit or null of no elements were available
|
||||
/**
|
||||
* non-blocking queue read
|
||||
*
|
||||
* @param limit read the next bach of size up to this limit. the returned batch size can be smaller than than the requested limit if fewer elements are available
|
||||
* @return {@link Batch} the batch containing 1 or more element up to the required limit or null of no elements were available
|
||||
* @throws IOException
|
||||
*/
|
||||
public Batch nonBlockReadBatch(int limit) throws IOException {
|
||||
lock.lock();
|
||||
try {
|
||||
Page p = firstUnreadPage();
|
||||
return (p == null) ? null : _readPageBatch(p, limit);
|
||||
Page p = nextReadPage();
|
||||
return (isHeadPage(p) && p.isFullyRead()) ? null : _readPageBatch(p, limit, 0L);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// blocking readBatch notes:
|
||||
// the queue close() notifies all pending blocking read so that they unblock if the queue is being closed.
|
||||
// this means that all blocking read methods need to verify for the queue close condition.
|
||||
//
|
||||
// blocking queue read until elements are available for read
|
||||
// @param limit read the next bach of size up to this limit. the returned batch size can be smaller than than the requested limit if fewer elements are available
|
||||
// @return Batch the batch containing 1 or more element up to the required limit or null if no elements were available or the blocking call was interrupted
|
||||
public Batch readBatch(int limit) throws IOException {
|
||||
Page p;
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
while ((p = firstUnreadPage()) == null && !isClosed()) {
|
||||
try {
|
||||
notEmpty.await();
|
||||
} catch (InterruptedException e) {
|
||||
// the thread interrupt() has been called while in the await() blocking call.
|
||||
// at this point the interrupted flag is reset and Thread.interrupted() will return false
|
||||
// to any upstream calls on it. for now our choice is to simply return null and set back
|
||||
// the Thread.interrupted() flag so it can be checked upstream.
|
||||
|
||||
// set back the interrupted flag
|
||||
Thread.currentThread().interrupt();
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// need to check for close since it is a condition for exiting the while loop
|
||||
return (isClosed()) ? null : _readPageBatch(p, limit);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// blocking queue read until elements are available for read or the given timeout is reached.
|
||||
// @param limit read the next batch of size up to this limit. the returned batch size can be smaller than than the requested limit if fewer elements are available
|
||||
// @param timeout the maximum time to wait in milliseconds
|
||||
// @return Batch the batch containing 1 or more element up to the required limit or null if no elements were available or the blocking call was interrupted
|
||||
/**
|
||||
*
|
||||
* @param limit size limit of the batch to read. returned {@link Batch} can be smaller.
|
||||
* @param timeout the maximum time to wait in milliseconds on write operations
|
||||
* @return the read {@link Batch} or null if no element upon timeout
|
||||
* @throws IOException
|
||||
*/
|
||||
public Batch readBatch(int limit, long timeout) throws IOException {
|
||||
Page p;
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
// wait only if queue is empty
|
||||
if ((p = firstUnreadPage()) == null) {
|
||||
try {
|
||||
notEmpty.await(timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// the thread interrupt() has been called while in the await() blocking call.
|
||||
// at this point the interrupted flag is reset and Thread.interrupted() will return false
|
||||
// to any upstream calls on it. for now our choice is to simply return null and set back
|
||||
// the Thread.interrupted() flag so it can be checked upstream.
|
||||
|
||||
// set back the interrupted flag
|
||||
Thread.currentThread().interrupt();
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
// if after returning from wait queue is still empty, or the queue was closed return null
|
||||
if ((p = firstUnreadPage()) == null || isClosed()) { return null; }
|
||||
}
|
||||
|
||||
return _readPageBatch(p, limit);
|
||||
return _readPageBatch(nextReadPage(), limit, timeout);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private Batch _readPageBatch(Page p, int limit) throws IOException {
|
||||
boolean wasFull = isFull();
|
||||
/**
|
||||
* read a {@link Batch} from the given {@link Page}. If the page is a head page, try to maximize the
|
||||
* batch size by waiting for writes.
|
||||
* @param p the {@link Page} to read from.
|
||||
* @param limit size limit of the batch to read.
|
||||
* @param timeout the maximum time to wait in milliseconds on write operations.
|
||||
* @return {@link Batch} with read elements or null if nothing was read
|
||||
* @throws IOException
|
||||
*/
|
||||
private Batch _readPageBatch(Page p, int limit, long timeout) throws IOException {
|
||||
int left = limit;
|
||||
final List<byte[]> elements = new ArrayList<>(limit);
|
||||
final LongVector seqNums = new LongVector(limit);
|
||||
|
||||
SequencedList<byte[]> serialized = p.read(limit);
|
||||
// NOTE: the tricky thing here is that upon entering this method, if p is initially a head page
|
||||
// it could become a tail page upon returning from the notEmpty.await call.
|
||||
|
||||
this.unreadCount -= serialized.getElements().size();
|
||||
do {
|
||||
if (isHeadPage(p) && p.isFullyRead()) {
|
||||
boolean elapsed;
|
||||
// a head page is fully read but can be written to so let's wait for more data
|
||||
try {
|
||||
elapsed = !notEmpty.await(timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// set back the interrupted flag
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
|
||||
if (p.isFullyRead()) { removeUnreadPage(p); }
|
||||
if (wasFull) { notFull.signalAll(); }
|
||||
if ((elapsed && p.isFullyRead()) || isClosed()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return new Batch(serialized, this);
|
||||
if (! p.isFullyRead()) {
|
||||
boolean wasFull = isFull();
|
||||
|
||||
final SequencedList<byte[]> serialized = p.read(left);
|
||||
int n = serialized.getElements().size();
|
||||
assert n > 0 : "page read returned 0 elements";
|
||||
elements.addAll(serialized.getElements());
|
||||
seqNums.add(serialized.getSeqNums());
|
||||
|
||||
this.unreadCount -= n;
|
||||
left -= n;
|
||||
|
||||
if (wasFull) {
|
||||
notFull.signalAll();
|
||||
}
|
||||
}
|
||||
|
||||
if (isTailPage(p) && p.isFullyRead()) {
|
||||
break;
|
||||
}
|
||||
} while (left > 0);
|
||||
|
||||
if (isTailPage(p) && p.isFullyRead()) {
|
||||
removeUnreadPage(p);
|
||||
}
|
||||
|
||||
return (left >= limit) ? null : new Batch(elements, seqNums, this);
|
||||
}
|
||||
|
||||
private static class TailPageResult {
|
||||
|
@ -571,7 +599,12 @@ public final class Queue implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
// perform a binary search through tail pages to find in which page this seqNum falls into
|
||||
/**
|
||||
* perform a binary search through tail pages to find in which page this seqNum falls into
|
||||
*
|
||||
* @param seqNum the sequence number to search for in the tail pages
|
||||
* @return {@link TailPageResult}
|
||||
*/
|
||||
private TailPageResult binaryFindPageForSeqnum(long seqNum) {
|
||||
int lo = 0;
|
||||
int hi = this.tailPages.size() - 1;
|
||||
|
@ -590,7 +623,12 @@ public final class Queue implements Closeable {
|
|||
return null;
|
||||
}
|
||||
|
||||
// perform a linear search through tail pages to find in which page this seqNum falls into
|
||||
/**
|
||||
* perform a linear search through tail pages to find in which page this seqNum falls into
|
||||
*
|
||||
* @param seqNum the sequence number to search for in the tail pages
|
||||
* @return {@link TailPageResult}
|
||||
*/
|
||||
private TailPageResult linearFindPageForSeqnum(long seqNum) {
|
||||
for (int i = 0; i < this.tailPages.size(); i++) {
|
||||
Page p = this.tailPages.get(i);
|
||||
|
@ -601,10 +639,14 @@ public final class Queue implements Closeable {
|
|||
return null;
|
||||
}
|
||||
|
||||
// ack a list of seqNums that are assumed to be all part of the same page, leveraging the fact that batches are also created from
|
||||
// same-page elements. A fully acked page will trigger a checkpoint for that page. Also if a page has more than checkpointMaxAcks
|
||||
// acks since last checkpoint it will also trigger a checkpoint.
|
||||
// @param seqNums the list of same-page sequence numbers to ack
|
||||
/**
|
||||
* ack a list of seqNums that are assumed to be all part of the same page, leveraging the fact that batches are also created from
|
||||
* same-page elements. A fully acked page will trigger a checkpoint for that page. Also if a page has more than checkpointMaxAcks
|
||||
* acks since last checkpoint it will also trigger a checkpoint.
|
||||
*
|
||||
* @param seqNums the list of same-page sequence numbers to ack
|
||||
* @throws IOException
|
||||
*/
|
||||
public void ack(LongVector seqNums) throws IOException {
|
||||
// as a first implementation we assume that all batches are created from the same page
|
||||
// so we will avoid multi pages acking here for now
|
||||
|
@ -677,10 +719,13 @@ public final class Queue implements Closeable {
|
|||
public CheckpointIO getCheckpointIO() {
|
||||
return this.checkpointIO;
|
||||
}
|
||||
|
||||
// deserialize a byte array into the required element class.
|
||||
// @param bytes the byte array to deserialize
|
||||
// @return Queueable the deserialized byte array into the required Queueable interface implementation concrete class
|
||||
|
||||
/**
|
||||
* deserialize a byte array into the required element class.
|
||||
*
|
||||
* @param bytes the byte array to deserialize
|
||||
* @return {@link Queueable} the deserialized byte array into the required Queueable interface implementation concrete class
|
||||
*/
|
||||
public Queueable deserialize(byte[] bytes) {
|
||||
try {
|
||||
return (Queueable)this.deserializeMethod.invoke(this.elementClass, bytes);
|
||||
|
@ -729,22 +774,28 @@ public final class Queue implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public Page firstUnreadPage() {
|
||||
/**
|
||||
* return the {@link Page} for the next read operation.
|
||||
* @return {@link Page} will be either a read-only tail page or the head page.
|
||||
*/
|
||||
public Page nextReadPage() {
|
||||
lock.lock();
|
||||
try {
|
||||
// look at head page if no unreadTailPages
|
||||
return (this.unreadTailPages.isEmpty()) ? (this.headPage.isFullyRead() ? null : this.headPage) : this.unreadTailPages.get(0);
|
||||
return (this.unreadTailPages.isEmpty()) ? this.headPage : this.unreadTailPages.get(0);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void removeUnreadPage(Page p) {
|
||||
// HeadPage is not part of the unreadTailPages, just ignore
|
||||
if (p != this.headPage) {
|
||||
// the page to remove should always be the first one
|
||||
assert this.unreadTailPages.get(0) == p : String.format("unread page is not first in unreadTailPages list");
|
||||
this.unreadTailPages.remove(0);
|
||||
if (! this.unreadTailPages.isEmpty()) {
|
||||
Page firstUnread = this.unreadTailPages.get(0);
|
||||
assert p.pageNum <= firstUnread.pageNum : String.format("fully read pageNum=%d is greater than first unread pageNum=%d", p.pageNum, firstUnread.pageNum);
|
||||
if (firstUnread == p) {
|
||||
// it is possible that when starting to read from a head page which is beheaded will not be inserted in the unreadTailPages list
|
||||
this.unreadTailPages.remove(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -786,4 +837,20 @@ public final class Queue implements Closeable {
|
|||
private boolean isClosed() {
|
||||
return this.closed.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param p the {@link Page} to verify if it is the head page
|
||||
* @return true if the given {@link Page} is the head page
|
||||
*/
|
||||
private boolean isHeadPage(Page p) {
|
||||
return p == this.headPage;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param p the {@link Page} to verify if it is a tail page
|
||||
* @return true if the given {@link Page} is a tail page
|
||||
*/
|
||||
private boolean isTailPage(Page p) {
|
||||
return !isHeadPage(p);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,22 @@ public final class LongVector {
|
|||
data[count++] = num;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the {@code long[]} to the underlying {@code long[]}, resizing it if necessary.
|
||||
* @param nums {@code long[]} to store
|
||||
*/
|
||||
public void add(final LongVector nums) {
|
||||
if (data.length < count + nums.size()) {
|
||||
final long[] old = data;
|
||||
data = new long[(data.length << 1) + nums.size()];
|
||||
System.arraycopy(old, 0, data, 0, old.length);
|
||||
}
|
||||
for (int i = 0; i < nums.size(); i++) {
|
||||
data[count + i] = nums.get(i);
|
||||
}
|
||||
count += nums.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get value stored at given index.
|
||||
* @param index Array index (only values smaller than {@link LongVector#count} are valid)
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
|
||||
|
@ -84,7 +86,7 @@ public class HeadPageTest {
|
|||
assertThat(p.isEmpty(), is(true));
|
||||
p.write(element.serialize(), 1, 1);
|
||||
assertThat(p.isEmpty(), is(false));
|
||||
Batch b = q.readBatch(1);
|
||||
Batch b = q.readBatch(1, TimeUnit.SECONDS.toMillis(1));
|
||||
assertThat(p.isEmpty(), is(false));
|
||||
b.close();
|
||||
assertThat(p.isEmpty(), is(true));
|
||||
|
|
|
@ -535,7 +535,7 @@ public class QueueTest {
|
|||
}
|
||||
assertThat(q.isFull(), is(true));
|
||||
|
||||
Batch b = q.readBatch(10); // read 1 page (10 events)
|
||||
Batch b = q.readBatch(10, TimeUnit.SECONDS.toMillis(1)); // read 1 page (10 events)
|
||||
b.close(); // purge 1 page
|
||||
|
||||
while (q.isFull()) { Thread.sleep(10); }
|
||||
|
@ -565,7 +565,7 @@ public class QueueTest {
|
|||
|
||||
// read 1 page (10 events) here while not full yet so that the read will not singal the not full state
|
||||
// we want the batch closing below to signal the not full state
|
||||
Batch b = q.readBatch(10);
|
||||
Batch b = q.readBatch(10, TimeUnit.SECONDS.toMillis(1));
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Future<Long> future = executor.submit(() -> q.write(element));
|
||||
|
@ -608,7 +608,7 @@ public class QueueTest {
|
|||
}
|
||||
assertThat(q.isFull(), is(true));
|
||||
|
||||
Batch b = q.readBatch(9); // read 90% of page (9 events)
|
||||
Batch b = q.readBatch(9, TimeUnit.SECONDS.toMillis(1)); // read 90% of page (9 events)
|
||||
b.close(); // this should not purge a page
|
||||
|
||||
assertThat(q.isFull(), is(true)); // queue should still be full
|
||||
|
@ -714,7 +714,7 @@ public class QueueTest {
|
|||
int read_count = 0;
|
||||
|
||||
while (read_count < ELEMENT_COUNT * WRITER_COUNT) {
|
||||
Batch b = q.readBatch(BATCH_SIZE);
|
||||
Batch b = q.readBatch(BATCH_SIZE, TimeUnit.SECONDS.toMillis(1));
|
||||
read_count += b.size();
|
||||
b.close();
|
||||
}
|
||||
|
@ -786,13 +786,13 @@ public class QueueTest {
|
|||
q.open();
|
||||
|
||||
q.write(element);
|
||||
Batch b1 = q.readBatch(2);
|
||||
Batch b1 = q.readBatch(2, TimeUnit.SECONDS.toMillis(1));
|
||||
q.write(element);
|
||||
Batch b2 = q.readBatch(2);
|
||||
Batch b2 = q.readBatch(2, TimeUnit.SECONDS.toMillis(1));
|
||||
q.write(element);
|
||||
Batch b3 = q.readBatch(2);
|
||||
Batch b3 = q.readBatch(2, TimeUnit.SECONDS.toMillis(1));
|
||||
q.write(element);
|
||||
Batch b4 = q.readBatch(2);
|
||||
Batch b4 = q.readBatch(2, TimeUnit.SECONDS.toMillis(1));
|
||||
|
||||
assertThat(q.tailPages.size(), is(3));
|
||||
assertThat(q.getPersistedByteSize() > 0, is(true));
|
||||
|
@ -825,10 +825,12 @@ public class QueueTest {
|
|||
int i = 0;
|
||||
try {
|
||||
while (i < count / concurrent) {
|
||||
final Batch batch = queue.readBatch(1);
|
||||
for (final Queueable elem : batch.getElements()) {
|
||||
if (elem != null) {
|
||||
++i;
|
||||
final Batch batch = queue.readBatch(1, TimeUnit.SECONDS.toMillis(1));
|
||||
if (batch != null) {
|
||||
for (final Queueable elem : batch.getElements()) {
|
||||
if (elem != null) {
|
||||
++i;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -868,7 +870,7 @@ public class QueueTest {
|
|||
q.write(new StringElement("foobarbaz"));
|
||||
assertThat(q.isEmpty(), is(false));
|
||||
|
||||
Batch b = q.readBatch(1);
|
||||
Batch b = q.readBatch(1, TimeUnit.SECONDS.toMillis(1));
|
||||
assertThat(q.isEmpty(), is(false));
|
||||
|
||||
b.close();
|
||||
|
@ -953,16 +955,43 @@ public class QueueTest {
|
|||
assertThat(q.headPage.getPageIO().getCapacity(), is(NEW_CAPACITY));
|
||||
|
||||
// will read only within a page boundary
|
||||
Batch b1 = q.readBatch( 10);
|
||||
Batch b1 = q.readBatch( 10, TimeUnit.SECONDS.toMillis(1));
|
||||
assertThat(b1.size(), is(1));
|
||||
b1.close();
|
||||
|
||||
// will read only within a page boundary
|
||||
Batch b2 = q.readBatch( 10);
|
||||
Batch b2 = q.readBatch( 10, TimeUnit.SECONDS.toMillis(1));
|
||||
assertThat(b2.size(), is(1));
|
||||
b2.close();
|
||||
|
||||
assertThat(q.tailPages.size(), is(0));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void maximizeBatch() throws IOException, InterruptedException, ExecutionException {
|
||||
|
||||
// very small pages to maximize page creation
|
||||
Settings settings = TestSettings.persistedQueueSettings(1000, dataPath);
|
||||
try (Queue q = new Queue(settings)) {
|
||||
q.open();
|
||||
|
||||
Callable<Void> writer = () -> {
|
||||
Thread.sleep(500); // sleep 500 ms
|
||||
q.write(new StringElement("E2"));
|
||||
return null;
|
||||
};
|
||||
|
||||
// write one element now and schedule the 2nd write in 500ms
|
||||
q.write(new StringElement("E1"));
|
||||
executor.submit(writer);
|
||||
|
||||
// issue a batch read with a 1s timeout, the batch should contain both elements since
|
||||
// the timeout is greater than the 2nd write delay
|
||||
Batch b = q.readBatch(10, TimeUnit.SECONDS.toMillis(1));
|
||||
|
||||
assertThat(b.size(), is(2));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,4 +19,22 @@ public class LongVectorTest {
|
|||
assertThat((long) i, is(vector.get(i)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void storesVecorAndResizes() {
|
||||
final int count = 1000;
|
||||
final LongVector vector1 = new LongVector(count);
|
||||
for (long i = 0L; i < count; ++i) {
|
||||
vector1.add(i);
|
||||
}
|
||||
final LongVector vector2 = new LongVector(count);
|
||||
for (long i = 0L + count; i < 2 * count; ++i) {
|
||||
vector2.add(i);
|
||||
}
|
||||
vector1.add(vector2);
|
||||
assertThat(vector1.size(), is(2 * count));
|
||||
for (int i = 0; i < 2 * count; ++i) {
|
||||
assertThat((long) i, is(vector1.get(i)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,8 @@ import java.util.Collection;
|
|||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.logstash.ackedqueue.Batch;
|
||||
import org.logstash.ackedqueue.SettingsImpl;
|
||||
import org.logstash.ackedqueue.Queue;
|
||||
|
@ -75,7 +77,7 @@ public class Concurrent {
|
|||
|
||||
try {
|
||||
while (consumedCount < ELEMENT_COUNT) {
|
||||
Batch b = q.readBatch(BATCH_SIZE);
|
||||
Batch b = q.readBatch(BATCH_SIZE, TimeUnit.SECONDS.toMillis(1));
|
||||
// if (b.getElements().size() < BATCH_SIZE) {
|
||||
// System.out.println("read small batch=" + b.getElements().size());
|
||||
// } else {
|
||||
|
@ -129,7 +131,7 @@ public class Concurrent {
|
|||
consumers.add(new Thread(() -> {
|
||||
try {
|
||||
while (output.size() < ELEMENT_COUNT) {
|
||||
Batch b = q.readBatch(BATCH_SIZE);
|
||||
Batch b = q.readBatch(BATCH_SIZE, TimeUnit.SECONDS.toMillis(1));
|
||||
// if (b.getElements().size() < BATCH_SIZE) {
|
||||
// System.out.println("read small batch=" + b.getElements().size());
|
||||
// } else {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue