mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
(cherry picked from commit 45b7da638f
)
This commit is contained in:
parent
8b2f41fc83
commit
81a3489843
4 changed files with 38 additions and 9 deletions
|
@ -225,7 +225,7 @@ public final class Queue implements Closeable {
|
|||
}
|
||||
final Checkpoint cp = this.checkpointIO.read(cpFileName);
|
||||
|
||||
logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString());
|
||||
logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp);
|
||||
|
||||
PageIO pageIO = new MmapPageIOV2(pageNum, this.pageCapacity, this.dirPath);
|
||||
// important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case
|
||||
|
@ -250,7 +250,7 @@ public final class Queue implements Closeable {
|
|||
// transform the head page into a tail page only if the headpage is non-empty
|
||||
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
|
||||
|
||||
logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString());
|
||||
logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint);
|
||||
|
||||
PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
|
||||
|
@ -345,7 +345,9 @@ public final class Queue implements Closeable {
|
|||
private void purgeTailPage(Checkpoint checkpoint, PageIO pageIO) throws IOException {
|
||||
try {
|
||||
pageIO.purge();
|
||||
} catch (NoSuchFileException e) { /* ignore */ }
|
||||
} catch (NoSuchFileException e) { /* ignore */
|
||||
logger.debug("tail page does not exist: {}", pageIO);
|
||||
}
|
||||
|
||||
// we want to keep all the "middle" checkpoints between the first unacked tail page and the head page
|
||||
// to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this
|
||||
|
@ -382,6 +384,7 @@ public final class Queue implements Closeable {
|
|||
private void newCheckpointedHeadpage(int pageNum) throws IOException {
|
||||
PageIO headPageIO = new MmapPageIOV2(pageNum, this.pageCapacity, this.dirPath);
|
||||
headPageIO.create();
|
||||
logger.debug("created new head page: {}", headPageIO);
|
||||
this.headPage = PageFactory.newHeadPage(pageNum, this, headPageIO);
|
||||
this.headPage.forceCheckpoint();
|
||||
}
|
||||
|
@ -442,6 +445,7 @@ public final class Queue implements Closeable {
|
|||
try {
|
||||
notFull.await();
|
||||
} catch (InterruptedException e) {
|
||||
logger.debug("interrupted waiting for queue to not be full", e);
|
||||
// the thread interrupt() has been called while in the await() blocking call.
|
||||
// at this point the interrupted flag is reset and Thread.interrupted() will return false
|
||||
// to any upstream calls on it. for now our choice is to return normally and set back
|
||||
|
|
|
@ -109,12 +109,14 @@ public class FileCheckpointIO implements CheckpointIO {
|
|||
|
||||
// Windows can have problem doing file move See: https://github.com/elastic/logstash/issues/12345
|
||||
// retry a couple of times to make it works. The first two runs has no break. The rest of reties are exponential backoff.
|
||||
final Path path = dirPath.resolve(fileName);
|
||||
try {
|
||||
Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
|
||||
Files.move(tmpPath, path, StandardCopyOption.ATOMIC_MOVE);
|
||||
} catch (IOException ex) {
|
||||
if (retry) {
|
||||
try {
|
||||
backoff.retryable(() -> Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE));
|
||||
logger.debug("CheckpointIO retry moving '{}' to '{}'", tmpPath, path);
|
||||
backoff.retryable(() -> Files.move(tmpPath, path, StandardCopyOption.ATOMIC_MOVE));
|
||||
} catch (ExponentialBackoff.RetryException re) {
|
||||
throw new IOException("Error writing checkpoint", re);
|
||||
}
|
||||
|
@ -128,6 +130,7 @@ public class FileCheckpointIO implements CheckpointIO {
|
|||
@Override
|
||||
public void purge(String fileName) throws IOException {
|
||||
Path path = dirPath.resolve(fileName);
|
||||
logger.debug("CheckpointIO deleting '{}'", path);
|
||||
Files.delete(path);
|
||||
}
|
||||
|
||||
|
|
|
@ -322,4 +322,15 @@ public final class MmapPageIOV1 implements PageIO {
|
|||
.format("Expected page version=%d but found version=%d", VERSION_ONE, version));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MmapPageIOV1{" +
|
||||
"file=" + file +
|
||||
", capacity=" + capacity +
|
||||
", minSeqNum=" + minSeqNum +
|
||||
", elementCount=" + elementCount +
|
||||
", head=" + head +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -175,7 +175,7 @@ public final class MmapPageIOV2 implements PageIO {
|
|||
this.elementCount += 1;
|
||||
} catch (MmapPageIOV2.PageIOInvalidElementException e) {
|
||||
// simply stop at first invalid element
|
||||
LOGGER.debug("PageIO recovery element index:{}, readNextElement exception: {}", i, e.getMessage());
|
||||
LOGGER.debug("PageIO recovery for '{}' element index:{}, readNextElement exception: {}", file, i, e.getMessage());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -223,8 +223,9 @@ public final class MmapPageIOV2 implements PageIO {
|
|||
@Override
|
||||
public void purge() throws IOException {
|
||||
close();
|
||||
Files.delete(this.file.toPath());
|
||||
this.head = 0;
|
||||
LOGGER.debug("PageIO deleting '{}'", this.file);
|
||||
Files.delete(this.file.toPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -356,8 +357,7 @@ public final class MmapPageIOV2 implements PageIO {
|
|||
int checksum = buffer.getInt();
|
||||
int computedChecksum = (int) this.checkSummer.getValue();
|
||||
if (computedChecksum != checksum) {
|
||||
throw new MmapPageIOV2.PageIOInvalidElementException(
|
||||
"Element invalid checksum");
|
||||
throw new MmapPageIOV2.PageIOInvalidElementException("Element invalid checksum");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -403,6 +403,17 @@ public final class MmapPageIOV2 implements PageIO {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MmapPageIOV2{" +
|
||||
"file=" + file +
|
||||
", capacity=" + capacity +
|
||||
", minSeqNum=" + minSeqNum +
|
||||
", elementCount=" + elementCount +
|
||||
", head=" + head +
|
||||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalid Page structure exception
|
||||
* */
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue