diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 4e39e7586..3c7232e15 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -158,134 +158,148 @@ public final class Queue implements Closeable { * @throws IOException if an IO error occurs */ public void open() throws IOException { - final int headPageNum; - if (!this.closed.get()) { throw new IOException("queue already opened"); } lock.lock(); try { - // verify exclusive access to the dirPath - this.dirLock = FileLockFactory.obtainLock(this.dirPath, LOCK_NAME); - - // Upgrade to serialization format V2 - QueueUpgrade.upgradeQueueDirectoryToV2(dirPath); - - Checkpoint headCheckpoint; try { - headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName()); - } catch (NoSuchFileException e) { - // if there is no head checkpoint, create a new headpage and checkpoint it and exit method + // verify exclusive access to the dirPath + this.dirLock = FileLockFactory.obtainLock(this.dirPath, LOCK_NAME); + } catch (LockException e) { + throw new LockException("The queue failed to obtain exclusive access, cause: " + e.getMessage()); + } - logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName()); - - this.ensureDiskAvailable(this.maxBytes, 0); - - this.seqNum = 0; - headPageNum = 0; - - newCheckpointedHeadpage(headPageNum); + try { + openPages(); this.closed.set(false); - - return; + } catch (IOException e) { + // upon any exception while opening the queue and after dirlock has been obtained + // we need to make sure to release the dirlock. Calling the close method on a partially + // open queue has no effect because the closed flag is still true. + releaseLockAndSwallow(); + throw(e); } - - // at this point we have a head checkpoint to figure queue recovery - - // as we load pages, compute actually disk needed substracting existing pages size to the required maxBytes - long pqSizeBytes = 0; - - // reconstruct all tail pages state upto but excluding the head page - for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) { - final String cpFileName = checkpointIO.tailFileName(pageNum); - if (!dirPath.resolve(cpFileName).toFile().exists()) { - continue; - } - final Checkpoint cp = this.checkpointIO.read(cpFileName); - - logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString()); - - PageIO pageIO = new MmapPageIOV2(pageNum, this.pageCapacity, this.dirPath); - // important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case - // we can purge it and we don't care about its integrity for example if it is of zero-byte file size. - if (cp.isFullyAcked()) { - purgeTailPage(cp, pageIO); - } else { - pageIO.open(cp.getMinSeqNum(), cp.getElementCount()); - addTailPage(PageFactory.newTailPage(cp, this, pageIO)); - pqSizeBytes += pageIO.getCapacity(); - } - - // track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum - if (cp.maxSeqNum() > this.seqNum) { - this.seqNum = cp.maxSeqNum(); - } - } - - // transform the head page into a tail page only if the headpage is non-empty - // in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages - - logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString()); - - PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath); - pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data - - pqSizeBytes += (long)pageIO.getHead(); - ensureDiskAvailable(this.maxBytes, pqSizeBytes); - - if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) { - // the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes - - logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum()); - logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount()); - - long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum(); - if (firstUnackedSeqNum < pageIO.getMinSeqNum()) { - logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum()); - firstUnackedSeqNum = pageIO.getMinSeqNum(); - } - headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount()); - } - this.headPage = PageFactory.newHeadPage(headCheckpoint, this, pageIO); - - if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) { - // head page is empty, let's keep it as-is - // but checkpoint it to update the firstUnackedPageNum if it changed - this.headPage.checkpoint(); - } else { - // head page is non-empty, transform it into a tail page - this.headPage.behead(); - - if (headCheckpoint.isFullyAcked()) { - purgeTailPage(headCheckpoint, pageIO); - } else { - addTailPage(this.headPage); - } - - // track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum - if (headCheckpoint.maxSeqNum() > this.seqNum) { - this.seqNum = headCheckpoint.maxSeqNum(); - } - - // create a new empty head page - headPageNum = headCheckpoint.getPageNum() + 1; - newCheckpointedHeadpage(headPageNum); - } - - // only activate the first tail page - if (tailPages.size() > 0) { - this.tailPages.get(0).getPageIO().activate(); - } - - // TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start? - - this.closed.set(false); - } catch (LockException e) { - throw new LockException("The queue failed to obtain exclusive access, cause: " + e.getMessage()); } finally { lock.unlock(); } } + private void openPages() throws IOException { + final int headPageNum; + + // Upgrade to serialization format V2 + QueueUpgrade.upgradeQueueDirectoryToV2(dirPath); + + Checkpoint headCheckpoint; + try { + headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName()); + } catch (NoSuchFileException e) { + // if there is no head checkpoint, create a new headpage and checkpoint it and exit method + + logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName()); + + this.ensureDiskAvailable(this.maxBytes, 0); + + this.seqNum = 0; + headPageNum = 0; + + newCheckpointedHeadpage(headPageNum); + this.closed.set(false); + + return; + } + + // at this point we have a head checkpoint to figure queue recovery + + // as we load pages, compute actually disk needed substracting existing pages size to the required maxBytes + long pqSizeBytes = 0; + + // reconstruct all tail pages state upto but excluding the head page + for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) { + final String cpFileName = checkpointIO.tailFileName(pageNum); + if (!dirPath.resolve(cpFileName).toFile().exists()) { + continue; + } + final Checkpoint cp = this.checkpointIO.read(cpFileName); + + logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString()); + + PageIO pageIO = new MmapPageIOV2(pageNum, this.pageCapacity, this.dirPath); + // important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case + // we can purge it and we don't care about its integrity for example if it is of zero-byte file size. + if (cp.isFullyAcked()) { + purgeTailPage(cp, pageIO); + } else { + pageIO.open(cp.getMinSeqNum(), cp.getElementCount()); + addTailPage(PageFactory.newTailPage(cp, this, pageIO)); + pqSizeBytes += pageIO.getCapacity(); + } + + // track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum + if (cp.maxSeqNum() > this.seqNum) { + this.seqNum = cp.maxSeqNum(); + } + } + + // transform the head page into a tail page only if the headpage is non-empty + // in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages + + logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString()); + + PageIO pageIO = new MmapPageIOV2(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath); + pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data + + pqSizeBytes += (long) pageIO.getHead(); + ensureDiskAvailable(this.maxBytes, pqSizeBytes); + + if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) { + // the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes + + logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum()); + logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount()); + + long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum(); + if (firstUnackedSeqNum < pageIO.getMinSeqNum()) { + logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum()); + firstUnackedSeqNum = pageIO.getMinSeqNum(); + } + headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount()); + } + this.headPage = PageFactory.newHeadPage(headCheckpoint, this, pageIO); + + if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) { + // head page is empty, let's keep it as-is + // but checkpoint it to update the firstUnackedPageNum if it changed + this.headPage.checkpoint(); + } else { + // head page is non-empty, transform it into a tail page + this.headPage.behead(); + + if (headCheckpoint.isFullyAcked()) { + purgeTailPage(headCheckpoint, pageIO); + } else { + addTailPage(this.headPage); + } + + // track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum + if (headCheckpoint.maxSeqNum() > this.seqNum) { + this.seqNum = headCheckpoint.maxSeqNum(); + } + + // create a new empty head page + headPageNum = headCheckpoint.getPageNum() + 1; + newCheckpointedHeadpage(headPageNum); + } + + // only activate the first tail page + if (tailPages.size() > 0) { + this.tailPages.get(0).getPageIO().activate(); + } + + // TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start? + } + + /** * delete files for the given page * @@ -713,18 +727,21 @@ public final class Queue implements Closeable { notFull.signalAll(); } finally { - try { - FileLockFactory.releaseLock(this.dirLock); - } catch (IOException e) { - // log error and ignore - logger.error("Queue close releaseLock failed, error={}", e.getMessage()); - } finally { - lock.unlock(); - } + releaseLockAndSwallow(); + lock.unlock(); } } } + private void releaseLockAndSwallow() { + try { + FileLockFactory.releaseLock(this.dirLock); + } catch (IOException e) { + // log error and ignore + logger.error("Queue close releaseLock failed, error={}", e.getMessage()); + } + } + /** * return the {@link Page} for the next read operation. * @return {@link Page} will be either a read-only tail page or the head page. diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index 8c7e0ed0f..f5f4b892b 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -38,6 +38,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; + +import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -1043,4 +1045,23 @@ public class QueueTest { queue.open(); } } + + @Test + public void lockIsReleasedUponOpenException() throws Exception { + Settings settings = SettingsImpl.builder(TestSettings.persistedQueueSettings(100, dataPath)) + .queueMaxBytes(Long.MAX_VALUE) + .build(); + try { + Queue queue = new Queue(settings); + queue.open(); + fail("expected queue.open() to throws when not enough disk free"); + } catch (IOException e) { + assertThat(e.getMessage(), CoreMatchers.containsString("Unable to allocate")); + } + + // at this point the Queue lock should be released and Queue.open should not throw a LockException + try (Queue queue = new Queue(TestSettings.persistedQueueSettings(10, dataPath))) { + queue.open(); + } + } }