From 205faa6b64f76d4d40458c435da089482c6c268c Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Tue, 15 Feb 2022 14:38:45 +0000 Subject: [PATCH] [7.17] handle fully acked 0 byte PQ pages (#13692) (#13761) This commit deleted the corrupted zero byte PQ file and recreated the head checkpoint file to get rid of page file size is too small exception Fixed: #10855 --- logstash-core/build.gradle | 2 + .../java/org/logstash/ackedqueue/Queue.java | 32 +++++++++++++++ .../logstash/ackedqueue/io/MmapPageIOV1.java | 7 ++++ .../logstash/ackedqueue/io/MmapPageIOV2.java | 10 ++++- .../org/logstash/ackedqueue/io/PageIO.java | 3 ++ .../org/logstash/ackedqueue/QueueTest.java | 41 +++++++++++++++++++ 6 files changed, 94 insertions(+), 1 deletion(-) diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index f71184eeb..c389f5894 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -195,4 +195,6 @@ dependencies { testImplementation 'net.javacrumbs.json-unit:json-unit:2.3.0' testImplementation 'org.elasticsearch:securemock:1.2' testImplementation 'org.assertj:assertj-core:3.11.1' + testImplementation 'org.hamcrest:hamcrest:2.2' + testImplementation 'org.hamcrest:hamcrest-library:2.2' } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 12071554b..aba5b273b 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -244,6 +244,9 @@ public final class Queue implements Closeable { } } + // delete zero byte page and recreate checkpoint if corrupted page is detected + if ( cleanedUpFullyAckedCorruptedPage(headCheckpoint, pqSizeBytes)) { return; } + // transform the head page into a tail page only if the headpage is non-empty // in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages @@ -302,6 +305,35 @@ public final class Queue implements Closeable { // TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start? } + /** + * When the queue is fully acked and zero byte page is found, delete corrupted page and recreate checkpoint head + * @param headCheckpoint + * @param pqSizeBytes + * @return true when corrupted page is found and cleaned + * @throws IOException + */ + private boolean cleanedUpFullyAckedCorruptedPage(Checkpoint headCheckpoint, long pqSizeBytes) throws IOException { + if (headCheckpoint.isFullyAcked()) { + PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath); + if (pageIO.isCorruptedPage()) { + logger.debug("Queue is fully acked. Found zero byte page.{}. Recreate checkpoint.head and delete corrupted page", headCheckpoint.getPageNum()); + + this.checkpointIO.purge(checkpointIO.headFileName()); + pageIO.purge(); + + if (headCheckpoint.maxSeqNum() > this.seqNum) { + this.seqNum = headCheckpoint.maxSeqNum(); + } + + newCheckpointedHeadpage(headCheckpoint.getPageNum() + 1); + + pqSizeBytes += (long) pageIO.getHead(); + ensureDiskAvailable(this.maxBytes, pqSizeBytes); + return true; + } + } + return false; + } /** * delete files for the given page diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java index 3c306ac03..72d8502e3 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java @@ -220,6 +220,13 @@ public final class MmapPageIOV1 implements PageIO { return this.head; } + @Override + public boolean isCorruptedPage() throws IOException { + try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) { + return raf.length() < MmapPageIOV2.MIN_CAPACITY; + } + } + private int checksum(byte[] bytes) { checkSummer.reset(); checkSummer.update(bytes, 0, bytes.length); diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java index 0de29a41c..c342b6dba 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java @@ -273,6 +273,13 @@ public final class MmapPageIOV2 implements PageIO { return this.head; } + @Override + public boolean isCorruptedPage() throws IOException { + try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) { + return raf.length() < MIN_CAPACITY; + } + } + private int checksum(byte[] bytes) { checkSummer.reset(); checkSummer.update(bytes, 0, bytes.length); @@ -296,7 +303,8 @@ public final class MmapPageIOV2 implements PageIO { this.capacity = pageFileCapacity; if (this.capacity < MIN_CAPACITY) { - throw new IOException(String.format("Page file size is too small to hold elements")); + throw new IOException("Page file size is too small to hold elements. " + + "This is potentially a queue corruption problem. Run `pqcheck` and `pqrepair` to repair the queue."); } this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/PageIO.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/PageIO.java index 98231e36e..c44779e63 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/PageIO.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/io/PageIO.java @@ -84,4 +84,7 @@ public interface PageIO extends Closeable { // @return the data container min sequence number long getMinSeqNum(); + + // check if the page size is < minimum size + boolean isCorruptedPage() throws IOException; } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index f5f4b892b..0aa09d878 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -20,6 +20,7 @@ package org.logstash.ackedqueue; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; @@ -40,6 +41,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -53,6 +55,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.fail; import static org.logstash.ackedqueue.QueueTestHelpers.computeCapacityForMmapPageIO; @@ -971,6 +974,44 @@ public class QueueTest { } } + @Test + public void testZeroByteFullyAckedHeadPageOnOpen() throws IOException { + Queueable element = new StringElement("0123456789"); // 10 bytes + Settings settings = TestSettings.persistedQueueSettings(computeCapacityForMmapPageIO(element), dataPath); + + // the goal here is to recreate a condition where the queue has a head page of size zero with + // a checkpoint that indicates it is full acknowledged + // see issue #10855 + + try(Queue q = new Queue(settings)) { + q.open(); + q.write(element); + + Batch batch = q.readBatch( 1, TimeUnit.SECONDS.toMillis(1)); + batch.close(); + assertThat(batch.size(), is(1)); + assertThat(q.isFullyAcked(), is(true)); + } + + // now we have a queue state where page 0 is fully acked but not purged + // manually truncate page 0 to zero byte to mock corrupted page + FileChannel c = new FileOutputStream(Paths.get(dataPath, "page.0").toFile(), true).getChannel(); + c.truncate(0); + c.close(); + + try(Queue q = new Queue(settings)) { + // here q.open used to crash with: + // java.io.IOException: Page file size is too small to hold elements + // because head page recover() check integrity of file size + q.open(); + + // recreated head page and checkpoint + File page1 = Paths.get(dataPath, "page.1").toFile(); + assertThat(page1.exists(), is(true)); + assertThat(page1.length(), is(greaterThan(0L))); + } + } + @Test public void pageCapacityChangeOnExistingQueue() throws IOException { final Queueable element = new StringElement("foobarbaz1");