diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index ec1310c2d..03259d916 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -15,7 +15,10 @@ import java.lang.reflect.Method; import java.nio.channels.FileLock; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -44,6 +47,10 @@ public class Queue implements Closeable { // reads will simply remove the first page from the list when fully read and writes will append new pages upon beheading protected final List unreadTailPages; + // checkpoints that were not purged in the acking code to keep contiguous checkpoint files + // regardless of the correcponding data file purge. + protected final Set preservedCheckpoints; + protected volatile long unreadCount; protected volatile long currentByteSize; @@ -98,6 +105,7 @@ public class Queue implements Closeable { this.elementClass = elementClass; this.tailPages = new ArrayList<>(); this.unreadTailPages = new ArrayList<>(); + this.preservedCheckpoints = new HashSet<>(); this.closed = new AtomicBoolean(true); // not yet opened this.maxUnread = maxUnread; this.checkpointMaxAcks = checkpointMaxAcks; @@ -294,7 +302,6 @@ public class Queue implements Closeable { // @param element the Queueable object to write to the queue // @return long written sequence number public long write(Queueable element) throws IOException { - long seqNum = nextSeqNum(); byte[] data = element.serialize(); if (! this.headPage.hasCapacity(data.length)) { @@ -314,18 +321,32 @@ public class Queue implements Closeable { // create a new head page if the current does not have sufficient space left for data to be written if (! this.headPage.hasSpace(data.length)) { - // beheading includes checkpoint+fsync if required - TailPage tailPage = this.headPage.behead(); - this.tailPages.add(tailPage); - if (! tailPage.isFullyRead()) { - this.unreadTailPages.add(tailPage); + // TODO: verify queue state integrity WRT Queue.open()/recover() at each step of this process + + int newHeadPageNum = this.headPage.pageNum + 1; + + if (this.headPage.isFullyAcked()) { + // purge the old headPage because its full and fully acked + // there is no checkpoint file to purge since just creating a new TailPage from a HeadPage does + // not trigger a checkpoint creation in itself + TailPage tailPage = new TailPage(this.headPage); + tailPage.purge(); + } else { + // beheading includes checkpoint+fsync if required + TailPage tailPage = this.headPage.behead(); + + this.tailPages.add(tailPage); + if (! tailPage.isFullyRead()) { + this.unreadTailPages.add(tailPage); + } } // create new head page - newCheckpointedHeadpage(tailPage.pageNum + 1); + newCheckpointedHeadpage(newHeadPageNum); } + long seqNum = nextSeqNum(); this.headPage.write(data, seqNum, this.checkpointMaxWrites); this.unreadCount++; @@ -574,16 +595,19 @@ public class Queue implements Closeable { result.page.purge(); this.currentByteSize -= result.page.getPageIO().getCapacity(); - if (result.index == 0) { + if (result.index != 0) { + // this an in-between page, we don't purge it's checkpoint to preserve checkpoints sequence on disk + // save that checkpoint so that if it becomes the first checkpoint it can be purged later on. + this.preservedCheckpoints.add(result.page.getPageNum()); + } else { // if this is the first page also remove checkpoint file this.checkpointIO.purge(this.checkpointIO.tailFileName(result.page.getPageNum())); - // and see if next "first tail page" is also fully acked and remove checkpoint file - while (this.tailPages.size() > 0) { - TailPage p = this.tailPages.get(0); - if (!p.isFullyAcked()) { break; } - this.checkpointIO.purge(this.checkpointIO.tailFileName(p.getPageNum())); - this.tailPages.remove(0); + // check if there are preserved checkpoints file next to this one and delete them + int nextPageNum = result.page.getPageNum() + 1; + while (preservedCheckpoints.remove(nextPageNum)) { + this.checkpointIO.purge(this.checkpointIO.tailFileName(nextPageNum)); + nextPageNum++; } } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index ee3ee17da..42b40758b 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -638,4 +639,90 @@ public class QueueTest { q.close(); } + @Test(timeout = 5000) + public void concurrentWritesTest() throws IOException, InterruptedException, ExecutionException { + + // very small pages to maximize page creation + Settings settings = TestSettings.volatileQueueSettings(100); + + TestQueue q = new TestQueue(settings); + q.open(); + + int ELEMENT_COUNT = 10000; + int WRITER_COUNT = 5; + AtomicInteger element_num = new AtomicInteger(0); + + // we expect this next write call to block so let's wrap it in a Future + Callable writer = () -> { + for (int i = 0; i < ELEMENT_COUNT; i++) { + int n = element_num.getAndIncrement(); + q.write(new StringElement("" + n)); + } + return ELEMENT_COUNT; + }; + + List> futures = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(WRITER_COUNT); + for (int i = 0; i < WRITER_COUNT; i++) { + futures.add(executor.submit(writer)); + } + + int BATCH_SIZE = 10; + int read_count = 0; + + while (read_count < ELEMENT_COUNT * WRITER_COUNT) { + Batch b = q.readBatch(BATCH_SIZE); + read_count += b.size(); + b.close(); + } + + for (Future future : futures) { + int result = future.get(); + assertThat(result, is(equalTo(ELEMENT_COUNT))); + } + + assertThat(q.getTailPages().isEmpty(), is(true)); + assertThat(q.isFullyAcked(), is(true)); + + executor.shutdown(); + q.close(); + } + + @Test + public void fullyAckedHeadPageBeheadingTest() throws IOException { + Queueable element = new StringElement("foobarbaz1"); + int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length); + + TestQueue q = new TestQueue(TestSettings.volatileQueueSettings(2 * singleElementCapacity)); + q.open(); + + Batch b; + q.write(element); + b = q.nonBlockReadBatch(1); + assertThat(b.getElements().size(), is(equalTo(1))); + b.close(); + + q.write(element); + b = q.nonBlockReadBatch(1); + assertThat(b.getElements().size(), is(equalTo(1))); + b.close(); + + // head page should be full and fully acked + assertThat(q.getHeadPage().isFullyAcked(), is(true)); + assertThat(q.getHeadPage().hasSpace(element.serialize().length), is(false)); + assertThat(q.isFullyAcked(), is(true)); + + // write extra element to trigger beheading + q.write(element); + + // since head page was fully acked it should not have created a new tail page + + assertThat(q.getTailPages().isEmpty(), is(true)); + assertThat(q.getHeadPage().getPageNum(), is(equalTo(1))); + assertThat(q.firstUnackedPageNum(), is(equalTo(1))); + assertThat(q.isFullyAcked(), is(false)); + + q.close(); + } + }