mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
This commit deleted the corrupted zero byte PQ file and recreated the head checkpoint file to get rid of page file size is too small exception Fixed: #10855
This commit is contained in:
parent
0db25a80b5
commit
205faa6b64
6 changed files with 94 additions and 1 deletions
|
@ -195,4 +195,6 @@ dependencies {
|
|||
testImplementation 'net.javacrumbs.json-unit:json-unit:2.3.0'
|
||||
testImplementation 'org.elasticsearch:securemock:1.2'
|
||||
testImplementation 'org.assertj:assertj-core:3.11.1'
|
||||
testImplementation 'org.hamcrest:hamcrest:2.2'
|
||||
testImplementation 'org.hamcrest:hamcrest-library:2.2'
|
||||
}
|
||||
|
|
|
@ -244,6 +244,9 @@ public final class Queue implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
// delete zero byte page and recreate checkpoint if corrupted page is detected
|
||||
if ( cleanedUpFullyAckedCorruptedPage(headCheckpoint, pqSizeBytes)) { return; }
|
||||
|
||||
// transform the head page into a tail page only if the headpage is non-empty
|
||||
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
|
||||
|
||||
|
@ -302,6 +305,35 @@ public final class Queue implements Closeable {
|
|||
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
|
||||
}
|
||||
|
||||
/**
|
||||
* When the queue is fully acked and zero byte page is found, delete corrupted page and recreate checkpoint head
|
||||
* @param headCheckpoint
|
||||
* @param pqSizeBytes
|
||||
* @return true when corrupted page is found and cleaned
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean cleanedUpFullyAckedCorruptedPage(Checkpoint headCheckpoint, long pqSizeBytes) throws IOException {
|
||||
if (headCheckpoint.isFullyAcked()) {
|
||||
PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
if (pageIO.isCorruptedPage()) {
|
||||
logger.debug("Queue is fully acked. Found zero byte page.{}. Recreate checkpoint.head and delete corrupted page", headCheckpoint.getPageNum());
|
||||
|
||||
this.checkpointIO.purge(checkpointIO.headFileName());
|
||||
pageIO.purge();
|
||||
|
||||
if (headCheckpoint.maxSeqNum() > this.seqNum) {
|
||||
this.seqNum = headCheckpoint.maxSeqNum();
|
||||
}
|
||||
|
||||
newCheckpointedHeadpage(headCheckpoint.getPageNum() + 1);
|
||||
|
||||
pqSizeBytes += (long) pageIO.getHead();
|
||||
ensureDiskAvailable(this.maxBytes, pqSizeBytes);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* delete files for the given page
|
||||
|
|
|
@ -220,6 +220,13 @@ public final class MmapPageIOV1 implements PageIO {
|
|||
return this.head;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCorruptedPage() throws IOException {
|
||||
try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {
|
||||
return raf.length() < MmapPageIOV2.MIN_CAPACITY;
|
||||
}
|
||||
}
|
||||
|
||||
private int checksum(byte[] bytes) {
|
||||
checkSummer.reset();
|
||||
checkSummer.update(bytes, 0, bytes.length);
|
||||
|
|
|
@ -273,6 +273,13 @@ public final class MmapPageIOV2 implements PageIO {
|
|||
return this.head;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCorruptedPage() throws IOException {
|
||||
try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {
|
||||
return raf.length() < MIN_CAPACITY;
|
||||
}
|
||||
}
|
||||
|
||||
private int checksum(byte[] bytes) {
|
||||
checkSummer.reset();
|
||||
checkSummer.update(bytes, 0, bytes.length);
|
||||
|
@ -296,7 +303,8 @@ public final class MmapPageIOV2 implements PageIO {
|
|||
this.capacity = pageFileCapacity;
|
||||
|
||||
if (this.capacity < MIN_CAPACITY) {
|
||||
throw new IOException(String.format("Page file size is too small to hold elements"));
|
||||
throw new IOException("Page file size is too small to hold elements. " +
|
||||
"This is potentially a queue corruption problem. Run `pqcheck` and `pqrepair` to repair the queue.");
|
||||
}
|
||||
this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
|
||||
}
|
||||
|
|
|
@ -84,4 +84,7 @@ public interface PageIO extends Closeable {
|
|||
|
||||
// @return the data container min sequence number
|
||||
long getMinSeqNum();
|
||||
|
||||
// check if the page size is < minimum size
|
||||
boolean isCorruptedPage() throws IOException;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package org.logstash.ackedqueue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
@ -40,6 +41,7 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -53,6 +55,7 @@ import static org.hamcrest.CoreMatchers.is;
|
|||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.computeCapacityForMmapPageIO;
|
||||
|
||||
|
@ -971,6 +974,44 @@ public class QueueTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZeroByteFullyAckedHeadPageOnOpen() throws IOException {
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
Settings settings = TestSettings.persistedQueueSettings(computeCapacityForMmapPageIO(element), dataPath);
|
||||
|
||||
// the goal here is to recreate a condition where the queue has a head page of size zero with
|
||||
// a checkpoint that indicates it is full acknowledged
|
||||
// see issue #10855
|
||||
|
||||
try(Queue q = new Queue(settings)) {
|
||||
q.open();
|
||||
q.write(element);
|
||||
|
||||
Batch batch = q.readBatch( 1, TimeUnit.SECONDS.toMillis(1));
|
||||
batch.close();
|
||||
assertThat(batch.size(), is(1));
|
||||
assertThat(q.isFullyAcked(), is(true));
|
||||
}
|
||||
|
||||
// now we have a queue state where page 0 is fully acked but not purged
|
||||
// manually truncate page 0 to zero byte to mock corrupted page
|
||||
FileChannel c = new FileOutputStream(Paths.get(dataPath, "page.0").toFile(), true).getChannel();
|
||||
c.truncate(0);
|
||||
c.close();
|
||||
|
||||
try(Queue q = new Queue(settings)) {
|
||||
// here q.open used to crash with:
|
||||
// java.io.IOException: Page file size is too small to hold elements
|
||||
// because head page recover() check integrity of file size
|
||||
q.open();
|
||||
|
||||
// recreated head page and checkpoint
|
||||
File page1 = Paths.get(dataPath, "page.1").toFile();
|
||||
assertThat(page1.exists(), is(true));
|
||||
assertThat(page1.length(), is(greaterThan(0L)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pageCapacityChangeOnExistingQueue() throws IOException {
|
||||
final Queueable element = new StringElement("foobarbaz1");
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue