fix PQ write concurrency issues and ligering files

`
move nextSeqNum call under mutex lock to prevent seqNum race condition

fully acked head page beading should not create new tail page

explicit purge required to clean physical files

correctly remove preserved checkpoints

small review changes
This commit is contained in:
Colin Surprenant 2017-04-10 16:59:30 -07:00
parent a159ac2db4
commit 4edf17c378
2 changed files with 125 additions and 14 deletions

View file

@ -15,7 +15,10 @@ import java.lang.reflect.Method;
import java.nio.channels.FileLock;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
@ -44,6 +47,10 @@ public class Queue implements Closeable {
// reads will simply remove the first page from the list when fully read and writes will append new pages upon beheading
protected final List<TailPage> unreadTailPages;
// checkpoints that were not purged in the acking code to keep contiguous checkpoint files
// regardless of the correcponding data file purge.
protected final Set<Integer> preservedCheckpoints;
protected volatile long unreadCount;
protected volatile long currentByteSize;
@ -98,6 +105,7 @@ public class Queue implements Closeable {
this.elementClass = elementClass;
this.tailPages = new ArrayList<>();
this.unreadTailPages = new ArrayList<>();
this.preservedCheckpoints = new HashSet<>();
this.closed = new AtomicBoolean(true); // not yet opened
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
@ -294,7 +302,6 @@ public class Queue implements Closeable {
// @param element the Queueable object to write to the queue
// @return long written sequence number
public long write(Queueable element) throws IOException {
long seqNum = nextSeqNum();
byte[] data = element.serialize();
if (! this.headPage.hasCapacity(data.length)) {
@ -314,18 +321,32 @@ public class Queue implements Closeable {
// create a new head page if the current does not have sufficient space left for data to be written
if (! this.headPage.hasSpace(data.length)) {
// beheading includes checkpoint+fsync if required
TailPage tailPage = this.headPage.behead();
this.tailPages.add(tailPage);
if (! tailPage.isFullyRead()) {
this.unreadTailPages.add(tailPage);
// TODO: verify queue state integrity WRT Queue.open()/recover() at each step of this process
int newHeadPageNum = this.headPage.pageNum + 1;
if (this.headPage.isFullyAcked()) {
// purge the old headPage because its full and fully acked
// there is no checkpoint file to purge since just creating a new TailPage from a HeadPage does
// not trigger a checkpoint creation in itself
TailPage tailPage = new TailPage(this.headPage);
tailPage.purge();
} else {
// beheading includes checkpoint+fsync if required
TailPage tailPage = this.headPage.behead();
this.tailPages.add(tailPage);
if (! tailPage.isFullyRead()) {
this.unreadTailPages.add(tailPage);
}
}
// create new head page
newCheckpointedHeadpage(tailPage.pageNum + 1);
newCheckpointedHeadpage(newHeadPageNum);
}
long seqNum = nextSeqNum();
this.headPage.write(data, seqNum, this.checkpointMaxWrites);
this.unreadCount++;
@ -574,16 +595,19 @@ public class Queue implements Closeable {
result.page.purge();
this.currentByteSize -= result.page.getPageIO().getCapacity();
if (result.index == 0) {
if (result.index != 0) {
// this an in-between page, we don't purge it's checkpoint to preserve checkpoints sequence on disk
// save that checkpoint so that if it becomes the first checkpoint it can be purged later on.
this.preservedCheckpoints.add(result.page.getPageNum());
} else {
// if this is the first page also remove checkpoint file
this.checkpointIO.purge(this.checkpointIO.tailFileName(result.page.getPageNum()));
// and see if next "first tail page" is also fully acked and remove checkpoint file
while (this.tailPages.size() > 0) {
TailPage p = this.tailPages.get(0);
if (!p.isFullyAcked()) { break; }
this.checkpointIO.purge(this.checkpointIO.tailFileName(p.getPageNum()));
this.tailPages.remove(0);
// check if there are preserved checkpoints file next to this one and delete them
int nextPageNum = result.page.getPageNum() + 1;
while (preservedCheckpoints.remove(nextPageNum)) {
this.checkpointIO.purge(this.checkpointIO.tailFileName(nextPageNum));
nextPageNum++;
}
}

View file

@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -638,4 +639,90 @@ public class QueueTest {
q.close();
}
@Test(timeout = 5000)
public void concurrentWritesTest() throws IOException, InterruptedException, ExecutionException {
// very small pages to maximize page creation
Settings settings = TestSettings.volatileQueueSettings(100);
TestQueue q = new TestQueue(settings);
q.open();
int ELEMENT_COUNT = 10000;
int WRITER_COUNT = 5;
AtomicInteger element_num = new AtomicInteger(0);
// we expect this next write call to block so let's wrap it in a Future
Callable<Integer> writer = () -> {
for (int i = 0; i < ELEMENT_COUNT; i++) {
int n = element_num.getAndIncrement();
q.write(new StringElement("" + n));
}
return ELEMENT_COUNT;
};
List<Future<Integer>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(WRITER_COUNT);
for (int i = 0; i < WRITER_COUNT; i++) {
futures.add(executor.submit(writer));
}
int BATCH_SIZE = 10;
int read_count = 0;
while (read_count < ELEMENT_COUNT * WRITER_COUNT) {
Batch b = q.readBatch(BATCH_SIZE);
read_count += b.size();
b.close();
}
for (Future<Integer> future : futures) {
int result = future.get();
assertThat(result, is(equalTo(ELEMENT_COUNT)));
}
assertThat(q.getTailPages().isEmpty(), is(true));
assertThat(q.isFullyAcked(), is(true));
executor.shutdown();
q.close();
}
@Test
public void fullyAckedHeadPageBeheadingTest() throws IOException {
Queueable element = new StringElement("foobarbaz1");
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
TestQueue q = new TestQueue(TestSettings.volatileQueueSettings(2 * singleElementCapacity));
q.open();
Batch b;
q.write(element);
b = q.nonBlockReadBatch(1);
assertThat(b.getElements().size(), is(equalTo(1)));
b.close();
q.write(element);
b = q.nonBlockReadBatch(1);
assertThat(b.getElements().size(), is(equalTo(1)));
b.close();
// head page should be full and fully acked
assertThat(q.getHeadPage().isFullyAcked(), is(true));
assertThat(q.getHeadPage().hasSpace(element.serialize().length), is(false));
assertThat(q.isFullyAcked(), is(true));
// write extra element to trigger beheading
q.write(element);
// since head page was fully acked it should not have created a new tail page
assertThat(q.getTailPages().isEmpty(), is(true));
assertThat(q.getHeadPage().getPageNum(), is(equalTo(1)));
assertThat(q.firstUnackedPageNum(), is(equalTo(1)));
assertThat(q.isFullyAcked(), is(false));
q.close();
}
}