verify available disk space for PQ #8978. This code was originally written by @original-brownbear for #6998

localize error message

refactor & simplify hasFreeSpace() per @yaauie suggestion

reformat for clarity
This commit is contained in:
Colin Surprenant 2018-01-17 14:48:47 -05:00
parent 2996062232
commit da15c5b3b7
8 changed files with 165 additions and 1 deletions

View file

@ -0,0 +1,15 @@
# encoding: utf-8
require 'logstash/errors'
module LogStash
module BootstrapCheck
class PersistedQueueConfig
def self.check(settings)
return unless settings.get('queue.type') == 'persisted'
if settings.get('queue.page_capacity') > settings.get('queue.max_bytes')
raise(LogStash::BootstrapCheckError, I18n.t("logstash.bootstrap_check.persisted_queue_config.page-capacity"))
end
end
end
end
end

View file

@ -25,6 +25,7 @@ require "logstash/modules/util"
require "logstash/bootstrap_check/default_config"
require "logstash/bootstrap_check/bad_java"
require "logstash/bootstrap_check/bad_ruby"
require "logstash/bootstrap_check/persisted_queue_config"
require "set"
java_import 'org.logstash.FileLockFactory'
@ -39,7 +40,8 @@ class LogStash::Runner < Clamp::StrictCommand
DEFAULT_BOOTSTRAP_CHECKS = [
LogStash::BootstrapCheck::BadRuby,
LogStash::BootstrapCheck::BadJava,
LogStash::BootstrapCheck::DefaultConfig
LogStash::BootstrapCheck::DefaultConfig,
LogStash::BootstrapCheck::PersistedQueueConfig
]
# Node Settings

View file

@ -8,6 +8,10 @@ en:
logstash:
error: >-
Error: %{error}
bootstrap_check:
persisted_queue_config:
page-capacity: >-
Invalid configuration, 'queue.page_capacity' must be less than or equal to 'queue.max_bytes'
environment:
jruby-required: >-
JRuby is required

View file

@ -0,0 +1,23 @@
require "spec_helper"
require "tmpdir"
require "logstash/bootstrap_check/persisted_queue_config"
describe LogStash::BootstrapCheck::PersistedQueueConfig do
context("when persisted queues are enabled") do
let(:settings) do
settings = LogStash::SETTINGS.dup
settings.set_value("queue.type", "persisted")
settings.set_value("queue.page_capacity", 1024)
settings.set_value("path.queue", ::File.join(Dir.tmpdir, "some/path"))
settings
end
context("and 'queue.max_bytes' is set to a value less than the value of 'queue.page_capacity'") do
it "should throw" do
settings.set_value("queue.max_bytes", 512)
expect { LogStash::BootstrapCheck::PersistedQueueConfig.check(settings) }.to raise_error
end
end
end
end

View file

@ -23,6 +23,7 @@ import org.logstash.ackedqueue.io.CheckpointIO;
import org.logstash.ackedqueue.io.LongVector;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIO;
import org.logstash.common.FsUtil;
public final class Queue implements Closeable {
@ -158,6 +159,8 @@ public final class Queue implements Closeable {
logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName());
this.ensureDiskAvailable(this.maxBytes);
this.seqNum = 0;
headPageNum = 0;
@ -169,6 +172,9 @@ public final class Queue implements Closeable {
// at this point we have a head checkpoint to figure queue recovery
// as we load pages, compute actuall disk needed substracting existing pages size to the required maxBytes
long diskNeeded = this.maxBytes;
// reconstruct all tail pages state upto but excluding the head page
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
@ -185,6 +191,7 @@ public final class Queue implements Closeable {
} else {
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
addTailPage(cp, PageFactory.newTailPage(cp, this, pageIO));
diskNeeded -= (long)pageIO.getHead();
}
// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
@ -201,6 +208,8 @@ public final class Queue implements Closeable {
PageIO pageIO = new MmapPageIO(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data
ensureDiskAvailable(diskNeeded - (long)pageIO.getHead());
if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes
@ -850,4 +859,10 @@ public final class Queue implements Closeable {
private boolean isTailPage(Page p) {
return !isHeadPage(p);
}
private void ensureDiskAvailable(final long diskNeeded) throws IOException {
if (!FsUtil.hasFreeSpace(this.dirPath, diskNeeded)) {
throw new IOException("Not enough free disk space available to allocate persisted queue.");
}
}
}

View file

@ -0,0 +1,41 @@
package org.logstash.common;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* File System Utility Methods.
*/
public final class FsUtil {
private FsUtil() {
}
/**
* Checks if the request number of bytes of free disk space are available under the given
* path.
* @param path Directory to check
* @param size Bytes of free space requested
* @return True iff the
* @throws IOException on failure to determine free space for given path's partition
*/
public static boolean hasFreeSpace(final String path, final long size)
throws IOException
{
final Set<File> partitionRoots = new HashSet<>(Arrays.asList(File.listRoots()));
// crawl up file path until we find a root partition
File location = new File(path).getCanonicalFile();
while (!partitionRoots.contains(location)) {
location = location.getParentFile();
if (location == null) {
throw new IllegalStateException(String.format("Unable to determine the partition that contains '%s'", path));
}
}
return location.getFreeSpace() >= size;
}
}

View file

@ -776,6 +776,20 @@ public class QueueTest {
}
}
@Test
public void getsPersistedByteSizeCorrectly() throws Exception {
Settings settings = TestSettings.persistedQueueSettings(100, dataPath);
try (Queue queue = new Queue(settings)) {
queue.open();
long seqNum = 0;
for (int i = 0; i < 50; ++i) {
seqNum = queue.write(new StringElement("foooo"));
}
queue.ensurePersistedUpto(seqNum);
assertThat(queue.getPersistedByteSize(), is(1063L));
}
}
@Test
public void getsPersistedByteSizeCorrectlyForUnopened() throws Exception {
Settings settings = TestSettings.persistedQueueSettings(100, dataPath);
@ -1002,4 +1016,14 @@ public class QueueTest {
assertThat(b.size(), is(2));
}
}
@Test(expected = IOException.class)
public void throwsWhenNotEnoughDiskFree() throws Exception {
Settings settings = SettingsImpl.builder(TestSettings.persistedQueueSettings(100, dataPath))
.queueMaxBytes(Long.MAX_VALUE)
.build();
try (Queue queue = new Queue(settings)) {
queue.open();
}
}
}

View file

@ -0,0 +1,40 @@
package org.logstash.common;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/**
* Tests for {@link FsUtil}.
*/
public final class FsUtilTest {
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
/**
* {@link FsUtil#hasFreeSpace(String, long)} should return true when asked for 1kb of free
* space in a subfolder of the system's TEMP location.
*/
@Test
public void trueIfEnoughSpace() throws Exception {
MatcherAssert.assertThat(
FsUtil.hasFreeSpace(temp.newFolder().getAbsolutePath(), 1024L),
CoreMatchers.is(true)
);
}
/**
* {@link FsUtil#hasFreeSpace(String, long)} should return false when asked for
* {@link Long#MAX_VALUE} of free space in a subfolder of the system's TEMP location.
*/
@Test
public void falseIfNotEnoughSpace() throws Exception {
MatcherAssert.assertThat(
FsUtil.hasFreeSpace(temp.newFolder().getAbsolutePath(), Long.MAX_VALUE),
CoreMatchers.is(false)
);
}
}