From 4edf17c378ff85cf11dbd4cbe81316b367610c19 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 10 Apr 2017 16:59:30 -0700 Subject: [PATCH] fix PQ write concurrency issues and ligering files ` move nextSeqNum call under mutex lock to prevent seqNum race condition fully acked head page beading should not create new tail page explicit purge required to clean physical files correctly remove preserved checkpoints small review changes --- .../java/org/logstash/ackedqueue/Queue.java | 52 ++++++++--- .../org/logstash/ackedqueue/QueueTest.java | 87 +++++++++++++++++++ 2 files changed, 125 insertions(+), 14 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index ec1310c2d..03259d916 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -15,7 +15,10 @@ import java.lang.reflect.Method; import java.nio.channels.FileLock; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -44,6 +47,10 @@ public class Queue implements Closeable { // reads will simply remove the first page from the list when fully read and writes will append new pages upon beheading protected final List unreadTailPages; + // checkpoints that were not purged in the acking code to keep contiguous checkpoint files + // regardless of the correcponding data file purge. + protected final Set preservedCheckpoints; + protected volatile long unreadCount; protected volatile long currentByteSize; @@ -98,6 +105,7 @@ public class Queue implements Closeable { this.elementClass = elementClass; this.tailPages = new ArrayList<>(); this.unreadTailPages = new ArrayList<>(); + this.preservedCheckpoints = new HashSet<>(); this.closed = new AtomicBoolean(true); // not yet opened this.maxUnread = maxUnread; this.checkpointMaxAcks = checkpointMaxAcks; @@ -294,7 +302,6 @@ public class Queue implements Closeable { // @param element the Queueable object to write to the queue // @return long written sequence number public long write(Queueable element) throws IOException { - long seqNum = nextSeqNum(); byte[] data = element.serialize(); if (! this.headPage.hasCapacity(data.length)) { @@ -314,18 +321,32 @@ public class Queue implements Closeable { // create a new head page if the current does not have sufficient space left for data to be written if (! this.headPage.hasSpace(data.length)) { - // beheading includes checkpoint+fsync if required - TailPage tailPage = this.headPage.behead(); - this.tailPages.add(tailPage); - if (! tailPage.isFullyRead()) { - this.unreadTailPages.add(tailPage); + // TODO: verify queue state integrity WRT Queue.open()/recover() at each step of this process + + int newHeadPageNum = this.headPage.pageNum + 1; + + if (this.headPage.isFullyAcked()) { + // purge the old headPage because its full and fully acked + // there is no checkpoint file to purge since just creating a new TailPage from a HeadPage does + // not trigger a checkpoint creation in itself + TailPage tailPage = new TailPage(this.headPage); + tailPage.purge(); + } else { + // beheading includes checkpoint+fsync if required + TailPage tailPage = this.headPage.behead(); + + this.tailPages.add(tailPage); + if (! tailPage.isFullyRead()) { + this.unreadTailPages.add(tailPage); + } } // create new head page - newCheckpointedHeadpage(tailPage.pageNum + 1); + newCheckpointedHeadpage(newHeadPageNum); } + long seqNum = nextSeqNum(); this.headPage.write(data, seqNum, this.checkpointMaxWrites); this.unreadCount++; @@ -574,16 +595,19 @@ public class Queue implements Closeable { result.page.purge(); this.currentByteSize -= result.page.getPageIO().getCapacity(); - if (result.index == 0) { + if (result.index != 0) { + // this an in-between page, we don't purge it's checkpoint to preserve checkpoints sequence on disk + // save that checkpoint so that if it becomes the first checkpoint it can be purged later on. + this.preservedCheckpoints.add(result.page.getPageNum()); + } else { // if this is the first page also remove checkpoint file this.checkpointIO.purge(this.checkpointIO.tailFileName(result.page.getPageNum())); - // and see if next "first tail page" is also fully acked and remove checkpoint file - while (this.tailPages.size() > 0) { - TailPage p = this.tailPages.get(0); - if (!p.isFullyAcked()) { break; } - this.checkpointIO.purge(this.checkpointIO.tailFileName(p.getPageNum())); - this.tailPages.remove(0); + // check if there are preserved checkpoints file next to this one and delete them + int nextPageNum = result.page.getPageNum() + 1; + while (preservedCheckpoints.remove(nextPageNum)) { + this.checkpointIO.purge(this.checkpointIO.tailFileName(nextPageNum)); + nextPageNum++; } } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index ee3ee17da..42b40758b 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -638,4 +639,90 @@ public class QueueTest { q.close(); } + @Test(timeout = 5000) + public void concurrentWritesTest() throws IOException, InterruptedException, ExecutionException { + + // very small pages to maximize page creation + Settings settings = TestSettings.volatileQueueSettings(100); + + TestQueue q = new TestQueue(settings); + q.open(); + + int ELEMENT_COUNT = 10000; + int WRITER_COUNT = 5; + AtomicInteger element_num = new AtomicInteger(0); + + // we expect this next write call to block so let's wrap it in a Future + Callable writer = () -> { + for (int i = 0; i < ELEMENT_COUNT; i++) { + int n = element_num.getAndIncrement(); + q.write(new StringElement("" + n)); + } + return ELEMENT_COUNT; + }; + + List> futures = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(WRITER_COUNT); + for (int i = 0; i < WRITER_COUNT; i++) { + futures.add(executor.submit(writer)); + } + + int BATCH_SIZE = 10; + int read_count = 0; + + while (read_count < ELEMENT_COUNT * WRITER_COUNT) { + Batch b = q.readBatch(BATCH_SIZE); + read_count += b.size(); + b.close(); + } + + for (Future future : futures) { + int result = future.get(); + assertThat(result, is(equalTo(ELEMENT_COUNT))); + } + + assertThat(q.getTailPages().isEmpty(), is(true)); + assertThat(q.isFullyAcked(), is(true)); + + executor.shutdown(); + q.close(); + } + + @Test + public void fullyAckedHeadPageBeheadingTest() throws IOException { + Queueable element = new StringElement("foobarbaz1"); + int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length); + + TestQueue q = new TestQueue(TestSettings.volatileQueueSettings(2 * singleElementCapacity)); + q.open(); + + Batch b; + q.write(element); + b = q.nonBlockReadBatch(1); + assertThat(b.getElements().size(), is(equalTo(1))); + b.close(); + + q.write(element); + b = q.nonBlockReadBatch(1); + assertThat(b.getElements().size(), is(equalTo(1))); + b.close(); + + // head page should be full and fully acked + assertThat(q.getHeadPage().isFullyAcked(), is(true)); + assertThat(q.getHeadPage().hasSpace(element.serialize().length), is(false)); + assertThat(q.isFullyAcked(), is(true)); + + // write extra element to trigger beheading + q.write(element); + + // since head page was fully acked it should not have created a new tail page + + assertThat(q.getTailPages().isEmpty(), is(true)); + assertThat(q.getHeadPage().getPageNum(), is(equalTo(1))); + assertThat(q.firstUnackedPageNum(), is(equalTo(1))); + assertThat(q.isFullyAcked(), is(false)); + + q.close(); + } + }