mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
release queue dir lock upon exceptions while opening queue
This commit is contained in:
parent
8eddbd608b
commit
c2e4e4f185
2 changed files with 161 additions and 123 deletions
|
@ -158,134 +158,148 @@ public final class Queue implements Closeable {
|
|||
* @throws IOException if an IO error occurs
|
||||
*/
|
||||
public void open() throws IOException {
|
||||
final int headPageNum;
|
||||
|
||||
if (!this.closed.get()) { throw new IOException("queue already opened"); }
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
// verify exclusive access to the dirPath
|
||||
this.dirLock = FileLockFactory.obtainLock(this.dirPath, LOCK_NAME);
|
||||
|
||||
// Upgrade to serialization format V2
|
||||
QueueUpgrade.upgradeQueueDirectoryToV2(dirPath);
|
||||
|
||||
Checkpoint headCheckpoint;
|
||||
try {
|
||||
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
|
||||
} catch (NoSuchFileException e) {
|
||||
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
|
||||
// verify exclusive access to the dirPath
|
||||
this.dirLock = FileLockFactory.obtainLock(this.dirPath, LOCK_NAME);
|
||||
} catch (LockException e) {
|
||||
throw new LockException("The queue failed to obtain exclusive access, cause: " + e.getMessage());
|
||||
}
|
||||
|
||||
logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName());
|
||||
|
||||
this.ensureDiskAvailable(this.maxBytes, 0);
|
||||
|
||||
this.seqNum = 0;
|
||||
headPageNum = 0;
|
||||
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
try {
|
||||
openPages();
|
||||
this.closed.set(false);
|
||||
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
// upon any exception while opening the queue and after dirlock has been obtained
|
||||
// we need to make sure to release the dirlock. Calling the close method on a partially
|
||||
// open queue has no effect because the closed flag is still true.
|
||||
releaseLockAndSwallow();
|
||||
throw(e);
|
||||
}
|
||||
|
||||
// at this point we have a head checkpoint to figure queue recovery
|
||||
|
||||
// as we load pages, compute actually disk needed substracting existing pages size to the required maxBytes
|
||||
long pqSizeBytes = 0;
|
||||
|
||||
// reconstruct all tail pages state upto but excluding the head page
|
||||
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
|
||||
final String cpFileName = checkpointIO.tailFileName(pageNum);
|
||||
if (!dirPath.resolve(cpFileName).toFile().exists()) {
|
||||
continue;
|
||||
}
|
||||
final Checkpoint cp = this.checkpointIO.read(cpFileName);
|
||||
|
||||
logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString());
|
||||
|
||||
PageIO pageIO = new MmapPageIOV2(pageNum, this.pageCapacity, this.dirPath);
|
||||
// important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case
|
||||
// we can purge it and we don't care about its integrity for example if it is of zero-byte file size.
|
||||
if (cp.isFullyAcked()) {
|
||||
purgeTailPage(cp, pageIO);
|
||||
} else {
|
||||
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
|
||||
addTailPage(PageFactory.newTailPage(cp, this, pageIO));
|
||||
pqSizeBytes += pageIO.getCapacity();
|
||||
}
|
||||
|
||||
// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
|
||||
if (cp.maxSeqNum() > this.seqNum) {
|
||||
this.seqNum = cp.maxSeqNum();
|
||||
}
|
||||
}
|
||||
|
||||
// transform the head page into a tail page only if the headpage is non-empty
|
||||
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
|
||||
|
||||
logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString());
|
||||
|
||||
PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
|
||||
|
||||
pqSizeBytes += (long)pageIO.getHead();
|
||||
ensureDiskAvailable(this.maxBytes, pqSizeBytes);
|
||||
|
||||
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
|
||||
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes
|
||||
|
||||
logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
|
||||
logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
|
||||
long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
|
||||
if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
|
||||
logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
|
||||
firstUnackedSeqNum = pageIO.getMinSeqNum();
|
||||
}
|
||||
headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
}
|
||||
this.headPage = PageFactory.newHeadPage(headCheckpoint, this, pageIO);
|
||||
|
||||
if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
|
||||
// head page is empty, let's keep it as-is
|
||||
// but checkpoint it to update the firstUnackedPageNum if it changed
|
||||
this.headPage.checkpoint();
|
||||
} else {
|
||||
// head page is non-empty, transform it into a tail page
|
||||
this.headPage.behead();
|
||||
|
||||
if (headCheckpoint.isFullyAcked()) {
|
||||
purgeTailPage(headCheckpoint, pageIO);
|
||||
} else {
|
||||
addTailPage(this.headPage);
|
||||
}
|
||||
|
||||
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
|
||||
if (headCheckpoint.maxSeqNum() > this.seqNum) {
|
||||
this.seqNum = headCheckpoint.maxSeqNum();
|
||||
}
|
||||
|
||||
// create a new empty head page
|
||||
headPageNum = headCheckpoint.getPageNum() + 1;
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
}
|
||||
|
||||
// only activate the first tail page
|
||||
if (tailPages.size() > 0) {
|
||||
this.tailPages.get(0).getPageIO().activate();
|
||||
}
|
||||
|
||||
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
|
||||
|
||||
this.closed.set(false);
|
||||
} catch (LockException e) {
|
||||
throw new LockException("The queue failed to obtain exclusive access, cause: " + e.getMessage());
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void openPages() throws IOException {
|
||||
final int headPageNum;
|
||||
|
||||
// Upgrade to serialization format V2
|
||||
QueueUpgrade.upgradeQueueDirectoryToV2(dirPath);
|
||||
|
||||
Checkpoint headCheckpoint;
|
||||
try {
|
||||
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
|
||||
} catch (NoSuchFileException e) {
|
||||
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
|
||||
|
||||
logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName());
|
||||
|
||||
this.ensureDiskAvailable(this.maxBytes, 0);
|
||||
|
||||
this.seqNum = 0;
|
||||
headPageNum = 0;
|
||||
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
this.closed.set(false);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// at this point we have a head checkpoint to figure queue recovery
|
||||
|
||||
// as we load pages, compute actually disk needed substracting existing pages size to the required maxBytes
|
||||
long pqSizeBytes = 0;
|
||||
|
||||
// reconstruct all tail pages state upto but excluding the head page
|
||||
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
|
||||
final String cpFileName = checkpointIO.tailFileName(pageNum);
|
||||
if (!dirPath.resolve(cpFileName).toFile().exists()) {
|
||||
continue;
|
||||
}
|
||||
final Checkpoint cp = this.checkpointIO.read(cpFileName);
|
||||
|
||||
logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString());
|
||||
|
||||
PageIO pageIO = new MmapPageIOV2(pageNum, this.pageCapacity, this.dirPath);
|
||||
// important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case
|
||||
// we can purge it and we don't care about its integrity for example if it is of zero-byte file size.
|
||||
if (cp.isFullyAcked()) {
|
||||
purgeTailPage(cp, pageIO);
|
||||
} else {
|
||||
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
|
||||
addTailPage(PageFactory.newTailPage(cp, this, pageIO));
|
||||
pqSizeBytes += pageIO.getCapacity();
|
||||
}
|
||||
|
||||
// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
|
||||
if (cp.maxSeqNum() > this.seqNum) {
|
||||
this.seqNum = cp.maxSeqNum();
|
||||
}
|
||||
}
|
||||
|
||||
// transform the head page into a tail page only if the headpage is non-empty
|
||||
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
|
||||
|
||||
logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString());
|
||||
|
||||
PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
|
||||
|
||||
pqSizeBytes += (long) pageIO.getHead();
|
||||
ensureDiskAvailable(this.maxBytes, pqSizeBytes);
|
||||
|
||||
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
|
||||
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes
|
||||
|
||||
logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
|
||||
logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
|
||||
long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
|
||||
if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
|
||||
logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
|
||||
firstUnackedSeqNum = pageIO.getMinSeqNum();
|
||||
}
|
||||
headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
|
||||
}
|
||||
this.headPage = PageFactory.newHeadPage(headCheckpoint, this, pageIO);
|
||||
|
||||
if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
|
||||
// head page is empty, let's keep it as-is
|
||||
// but checkpoint it to update the firstUnackedPageNum if it changed
|
||||
this.headPage.checkpoint();
|
||||
} else {
|
||||
// head page is non-empty, transform it into a tail page
|
||||
this.headPage.behead();
|
||||
|
||||
if (headCheckpoint.isFullyAcked()) {
|
||||
purgeTailPage(headCheckpoint, pageIO);
|
||||
} else {
|
||||
addTailPage(this.headPage);
|
||||
}
|
||||
|
||||
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
|
||||
if (headCheckpoint.maxSeqNum() > this.seqNum) {
|
||||
this.seqNum = headCheckpoint.maxSeqNum();
|
||||
}
|
||||
|
||||
// create a new empty head page
|
||||
headPageNum = headCheckpoint.getPageNum() + 1;
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
}
|
||||
|
||||
// only activate the first tail page
|
||||
if (tailPages.size() > 0) {
|
||||
this.tailPages.get(0).getPageIO().activate();
|
||||
}
|
||||
|
||||
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* delete files for the given page
|
||||
*
|
||||
|
@ -713,18 +727,21 @@ public final class Queue implements Closeable {
|
|||
notFull.signalAll();
|
||||
|
||||
} finally {
|
||||
try {
|
||||
FileLockFactory.releaseLock(this.dirLock);
|
||||
} catch (IOException e) {
|
||||
// log error and ignore
|
||||
logger.error("Queue close releaseLock failed, error={}", e.getMessage());
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
releaseLockAndSwallow();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void releaseLockAndSwallow() {
|
||||
try {
|
||||
FileLockFactory.releaseLock(this.dirLock);
|
||||
} catch (IOException e) {
|
||||
// log error and ignore
|
||||
logger.error("Queue close releaseLock failed, error={}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* return the {@link Page} for the next read operation.
|
||||
* @return {@link Page} will be either a read-only tail page or the head page.
|
||||
|
|
|
@ -38,6 +38,8 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -1043,4 +1045,23 @@ public class QueueTest {
|
|||
queue.open();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lockIsReleasedUponOpenException() throws Exception {
|
||||
Settings settings = SettingsImpl.builder(TestSettings.persistedQueueSettings(100, dataPath))
|
||||
.queueMaxBytes(Long.MAX_VALUE)
|
||||
.build();
|
||||
try {
|
||||
Queue queue = new Queue(settings);
|
||||
queue.open();
|
||||
fail("expected queue.open() to throws when not enough disk free");
|
||||
} catch (IOException e) {
|
||||
assertThat(e.getMessage(), CoreMatchers.containsString("Unable to allocate"));
|
||||
}
|
||||
|
||||
// at this point the Queue lock should be released and Queue.open should not throw a LockException
|
||||
try (Queue queue = new Queue(TestSettings.persistedQueueSettings(10, dataPath))) {
|
||||
queue.open();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue