mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
This commit adds a checkpoint for a fully acked page before purging to keep the checkpoint up-to-date
Fixed: #6592
Co-authored-by: Andrea Selva <selva.andre@gmail.com>
(cherry picked from commit 7f36665c09
)
Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com>
This commit is contained in:
parent
27c512fb17
commit
6db9b083ac
2 changed files with 44 additions and 3 deletions
|
@ -161,9 +161,10 @@ public final class Page implements Closeable {
|
|||
|
||||
final boolean done = isFullyAcked();
|
||||
if (done) {
|
||||
if (this.writable) {
|
||||
headPageCheckpoint();
|
||||
} else {
|
||||
checkpoint();
|
||||
|
||||
// purge fully acked tail page
|
||||
if (!this.writable) {
|
||||
purge();
|
||||
final CheckpointIO cpIO = queue.getCheckpointIO();
|
||||
cpIO.purge(cpIO.tailFileName(pageNum));
|
||||
|
|
|
@ -56,6 +56,7 @@ import static org.hamcrest.CoreMatchers.notNullValue;
|
|||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.logstash.ackedqueue.QueueTestHelpers.computeCapacityForMmapPageIO;
|
||||
|
||||
|
@ -1105,4 +1106,43 @@ public class QueueTest {
|
|||
queue.open();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void firstUnackedPagePointToFullyAckedPurgedPage() throws Exception {
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
Settings settings = TestSettings.persistedQueueSettings(computeCapacityForMmapPageIO(element), dataPath);
|
||||
// simulate a scenario that a tail page fail to complete fully ack, crash in the middle of purge
|
||||
// normal purge: write fully acked checkpoint -> delete tail page -> (boom!) delete checkpoint -> write head page
|
||||
// the queue head page, firstUnackedPageNum, points to a removed page which is fully acked
|
||||
// the queue should be able to open and remove dangling checkpoint
|
||||
|
||||
try(Queue q = new Queue(settings)) {
|
||||
q.open();
|
||||
// create two pages
|
||||
q.write(element);
|
||||
q.write(element);
|
||||
}
|
||||
|
||||
|
||||
try(Queue q = new Queue(settings)) {
|
||||
// now we have head checkpoint pointing to page.0
|
||||
// manually delete page.0
|
||||
Paths.get(dataPath, "page.0").toFile().delete();
|
||||
// create a fully acked checkpoint.0 to mock a partial acked action
|
||||
// which purges the tail page and the checkpoint file remains
|
||||
Checkpoint cp = q.getCheckpointIO().read("checkpoint.0");
|
||||
Paths.get(dataPath, "checkpoint.0").toFile().delete();
|
||||
Checkpoint mockAckedCp = new Checkpoint(cp.getPageNum(), cp.getFirstUnackedPageNum(), cp.getFirstUnackedSeqNum() + 1, cp.getMinSeqNum(), cp.getElementCount());
|
||||
q.getCheckpointIO().write("checkpoint.0", mockAckedCp);
|
||||
|
||||
// here q.open used to crash with:
|
||||
// java.io.IOException: Page file size is too small to hold elements
|
||||
// because checkpoint has outdated state saying it is not fully acked
|
||||
q.open();
|
||||
|
||||
// dangling checkpoint should be deleted
|
||||
File cp0 = Paths.get(dataPath, "checkpoint.0").toFile();
|
||||
assertFalse("Dangling page's checkpoint file should be removed", cp0.exists());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue