add support for queue.checkpoint.{acks|writes} settings

add queue.max_acked_checkpoint and queue.checkpoint_rate settings

now using checkpoint.max_acks, checkpoint.max_writes and checkpoint.max_interval

rename options

wip rework checkpointing

refactored full acked pages handling on acking and recovery

correclty close queue

proper queue open/recovery

checkpoint dump utility

checkpoint on writes

removed debug code and added missing newline

added better comment on contiguous checkpoints

fix spec for new pipeline setting
This commit is contained in:
Colin Surprenant 2016-11-08 18:04:33 -05:00
parent c03f30fedb
commit f636a751f8
21 changed files with 376 additions and 116 deletions

11
bin/cpdump Executable file
View file

@ -0,0 +1,11 @@
#!/usr/bin/env vendor/jruby/bin/jruby
require_relative "../lib/bootstrap/environment"
LogStash::Bundler.setup!({:without => [:build]})
require "logstash-core"
require "logstash/environment"
require "logstash/settings"
io = Java::OrgLogstashCommonIo::FileCheckpointIO.new(LogStash::SETTINGS.get_value("path.queue"))
cp = io.read(ARGV[0])
puts("checkpoint #{cp.toString}")

View file

@ -107,6 +107,21 @@
#
# queue.max_events: 0
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint

View file

@ -57,19 +57,25 @@ public class JrubyAckedQueueExtLibrary implements Library {
}
// def initialize
@JRubyMethod(name = "initialize", optional = 3)
@JRubyMethod(name = "initialize", optional = 6)
public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args)
{
args = Arity.scanArgs(context.runtime, args, 3, 0);
args = Arity.scanArgs(context.runtime, args, 6, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxUnread = RubyFixnum.num2int(args[2]);
int checkpointMaxAcks = RubyFixnum.num2int(args[3]);
int checkpointMaxWrites = RubyFixnum.num2int(args[4]);
int checkpointMaxInterval = RubyFixnum.num2int(args[5]);
Settings s = new FileSettings(args[0].asJavaString());
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
s.setCapacity(capacity);
s.setMaxUnread(maxUnread);
s.setCheckpointMaxAcks(checkpointMaxAcks);
s.setCheckpointMaxWrites(checkpointMaxWrites);
s.setCheckpointMaxInterval(checkpointMaxInterval);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(Event.class);

View file

@ -43,6 +43,9 @@ module LogStash
Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]),
Setting::Bytes.new("queue.page_capacity", "250mb"),
Setting::Numeric.new("queue.max_events", 0), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing
Setting::TimeValue.new("slowlog.threshold.warn", "-1"),
Setting::TimeValue.new("slowlog.threshold.info", "-1"),
Setting::TimeValue.new("slowlog.threshold.debug", "-1"),

View file

@ -94,9 +94,9 @@ module LogStash; class Pipeline
rescue => e
raise
end
queue = build_queue_from_settings
@input_queue_client = queue.write_client
@filter_queue_client = queue.read_client
@queue = build_queue_from_settings
@input_queue_client = @queue.write_client
@filter_queue_client = @queue.read_client
@signal_queue = Queue.new
# Note that @infilght_batches as a central mechanism for tracking inflight
# batches will fail if we have multiple read clients here.
@ -119,6 +119,9 @@ module LogStash; class Pipeline
queue_type = settings.get("queue.type")
queue_page_capacity = settings.get("queue.page_capacity")
max_events = settings.get("queue.max_events")
checkpoint_max_acks = settings.get("queue.checkpoint.acks")
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
if queue_type == "memory_acked"
# memory_acked is used in tests/specs
@ -129,7 +132,7 @@ module LogStash; class Pipeline
elsif queue_type == "persisted"
# persisted is the disk based acked queue
queue_path = settings.get("path.queue")
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, max_events)
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval)
else
raise(ConfigurationError, "invalid queue.type setting")
end
@ -194,6 +197,7 @@ module LogStash; class Pipeline
shutdown_workers
@filter_queue_client.close
@queue.close
@logger.debug("Pipeline #{@pipeline_id} has been shutdown")

View file

@ -25,9 +25,9 @@ module LogStash; module Util
)
end
def self.create_file_based(path, capacity, size)
def self.create_file_based(path, capacity, size, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval)
self.allocate.with_queue(
LogStash::AckedQueue.new(path, capacity, size)
LogStash::AckedQueue.new(path, capacity, size, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval)
)
end

View file

@ -46,6 +46,10 @@ module LogStash; module Util
ReadClient.new(self)
end
def close
# ignore
end
class ReadClient
# We generally only want one thread at a time able to access pop/take/poll operations
# from this queue. We also depend on this to be able to block consumers while we snapshot

View file

@ -450,6 +450,9 @@ describe LogStash::Pipeline do
allow(settings).to receive(:get).with("queue.type").and_return("memory")
allow(settings).to receive(:get).with("queue.page_capacity").and_return(1024 * 1024)
allow(settings).to receive(:get).with("queue.max_events").and_return(250)
allow(settings).to receive(:get).with("queue.checkpoint.acks").and_return(1024)
allow(settings).to receive(:get).with("queue.checkpoint.writes").and_return(1024)
allow(settings).to receive(:get).with("queue.checkpoint.interval").and_return(1000)
pipeline = LogStash::Pipeline.new(config, settings)
expect(pipeline.metric).to be_kind_of(LogStash::Instrument::NullMetric)

View file

@ -40,8 +40,23 @@ public class Checkpoint {
return this.elementCount;
}
// @return true if this checkpoint indicates a fulle acked page
public boolean isFullyAcked() {
return this.elementCount > 0 && this.firstUnackedSeqNum >= this.minSeqNum + this.elementCount;
}
// @return the highest seqNum in this page or -1 for an initial checkpoint
public long maxSeqNum() {
return this.minSeqNum + this.elementCount - 1;
}
public String toString() {
return "pageNum=" + this.pageNum + ", firstUnackedPageNum=" + this.firstUnackedPageNum + ", firstUnackedSeqNum=" + this.firstUnackedSeqNum + ", minSeqNum=" + this.minSeqNum + ", elementCount=" + this.elementCount;
return "pageNum=" + this.pageNum + ", firstUnackedPageNum=" + this.firstUnackedPageNum + ", firstUnackedSeqNum=" + this.firstUnackedSeqNum + ", minSeqNum=" + this.minSeqNum + ", elementCount=" + this.elementCount + ", isFullyAcked=" + (this.isFullyAcked() ? "yes" : "no");
}
public boolean equals(Checkpoint other) {
if (this == other ) { return true; }
return (this.pageNum == other.pageNum && this.firstUnackedPageNum == other.firstUnackedPageNum && this.firstUnackedSeqNum == other.firstUnackedSeqNum && this.minSeqNum == other.minSeqNum && this.elementCount == other.elementCount);
}
}

View file

@ -10,12 +10,18 @@ public class FileSettings implements Settings {
private Class elementClass;
private int capacity;
private int maxUnread;
private int checkpointMaxAcks;
private int checkpointMaxWrites;
private int checkpointMaxInterval;
private FileSettings() { this(""); }
public FileSettings(String dirPath) {
this.dirForFiles = dirPath;
this.maxUnread = 0;
this.checkpointMaxAcks = 1024;
this.checkpointMaxWrites = 1024;
this.checkpointMaxInterval = 1000; // millisec
}
@Override
@ -48,6 +54,39 @@ public class FileSettings implements Settings {
return this;
}
@Override
public Settings setCheckpointMaxAcks(int checkpointMaxAcks) {
this.checkpointMaxAcks = checkpointMaxAcks;
return this;
}
@Override
public Settings setCheckpointMaxWrites(int checkpointMaxWrites) {
this.checkpointMaxWrites = checkpointMaxWrites;
return this;
}
@Override
public Settings setCheckpointMaxInterval(int checkpointMaxInterval) {
this.checkpointMaxInterval = checkpointMaxInterval;
return this;
}
@Override
public int getCheckpointMaxAcks() {
return checkpointMaxAcks;
}
@Override
public int getCheckpointMaxWrites() {
return checkpointMaxWrites;
}
@Override
public int getCheckpointMaxInterval() {
return checkpointMaxInterval;
}
@Override
public CheckpointIOFactory getCheckpointIOFactory() {
return checkpointIOFactory;

View file

@ -43,7 +43,7 @@ public class HeadPage extends Page {
// a serialized element byte[] and serialization is done at the Queue level to
// be able to use the Page.hasSpace() method with the serialized element byte size.
//
public void write(byte[] bytes, long seqNum) throws IOException {
public void write(byte[] bytes, long seqNum, int checkpointMaxWrites) throws IOException {
this.pageIO.write(bytes, seqNum);
if (this.minSeqNum <= 0) {
@ -51,6 +51,13 @@ public class HeadPage extends Page {
this.firstUnreadSeqNum = seqNum;
}
this.elementCount++;
// force a checkpoint if we wrote checkpointMaxWrites elements since last checkpoint
// the initial condition of an "empty" checkpoint, maxSeqNum() will return -1
if (checkpointMaxWrites > 0 && (seqNum >= this.lastCheckpoint.maxSeqNum() + checkpointMaxWrites)) {
// did we write more than checkpointMaxWrites elements? if so checkpoint now
checkpoint();
}
}
public void ensurePersistedUpto(long seqNum) throws IOException {
@ -63,12 +70,8 @@ public class HeadPage extends Page {
}
}
public TailPage behead() throws IOException {
// first do we need to checkpoint+fsync the headpage a last time?
if (this.elementCount > this.lastCheckpoint.getElementCount()) {
checkpoint();
}
checkpoint();
TailPage tailPage = new TailPage(this);
@ -86,17 +89,35 @@ public class HeadPage extends Page {
return tailPage;
}
// head page checkpoint, fsync data page if writes occured since last checkpoint
// update checkpoint only if it changed since lastCheckpoint
public void checkpoint() throws IOException {
// TODO: not concurrent for first iteration:
// first fsync data file
this.pageIO.ensurePersisted();
if (this.elementCount > this.lastCheckpoint.getElementCount()) {
// fsync & checkpoint if data written since last checkpoint
// then write new checkpoint
this.pageIO.ensurePersisted();
forceCheckpoint();
} else {
Checkpoint checkpoint = new Checkpoint(this.pageNum, this.queue.firstUnackedPageNum(), firstUnackedSeqNum(), this.minSeqNum, this.elementCount);
if (! checkpoint.equals(this.lastCheckpoint)) {
// checkpoint only if it changed since last checkpoint
// non-dry code with forceCheckpoint() to avoid unnecessary extra new Checkpoint object creation
CheckpointIO io = queue.getCheckpointIO();
io.write(io.headFileName(), checkpoint);
this.lastCheckpoint = checkpoint;
}
}
}
// unconditionally update head checkpoint
public void forceCheckpoint() throws IOException {
Checkpoint checkpoint = new Checkpoint(this.pageNum, this.queue.firstUnackedPageNum(), firstUnackedSeqNum(), this.minSeqNum, this.elementCount);
CheckpointIO io = queue.getCheckpointIO();
this.lastCheckpoint = io.write(io.headFileName(), this.pageNum, this.queue.firstUnackedPageNum(), firstUnackedSeqNum(), this.minSeqNum, this.elementCount);
}
io.write(io.headFileName(), checkpoint);
this.lastCheckpoint = checkpoint;
}
public void close() throws IOException {
checkpoint();

View file

@ -10,6 +10,9 @@ public class MemorySettings implements Settings {
private int capacity;
private final String dirPath;
private int maxUnread;
private int checkpointMaxAcks;
private int checkpointMaxWrites;
private int checkpointMaxInterval;
public MemorySettings() {
this("");
@ -18,6 +21,9 @@ public class MemorySettings implements Settings {
public MemorySettings(String dirPath) {
this.dirPath = dirPath;
this.maxUnread = 0;
this.checkpointMaxAcks = 1;
this.checkpointMaxWrites = 1;
this.checkpointMaxInterval = 0;
}
@Override
@ -50,6 +56,39 @@ public class MemorySettings implements Settings {
return this;
}
@Override
public Settings setCheckpointMaxAcks(int checkpointMaxAcks) {
this.checkpointMaxAcks = checkpointMaxAcks;
return this;
}
@Override
public Settings setCheckpointMaxWrites(int checkpointMaxWrites) {
this.checkpointMaxWrites = checkpointMaxWrites;
return this;
}
@Override
public Settings setCheckpointMaxInterval(int checkpointMaxInterval) {
this.checkpointMaxInterval = checkpointMaxInterval;
return this;
}
@Override
public int getCheckpointMaxAcks() {
return checkpointMaxAcks;
}
@Override
public int getCheckpointMaxWrites() {
return checkpointMaxWrites;
}
@Override
public int getCheckpointMaxInterval() {
return checkpointMaxInterval;
}
@Override
public CheckpointIOFactory getCheckpointIOFactory() {
return checkpointIOFactory;

View file

@ -82,7 +82,15 @@ public abstract class Page implements Closeable {
return this.elementCount <= 0 ? 0 : Math.max(0, (maxSeqNum() - this.firstUnreadSeqNum) + 1);
}
public void ack(List<Long> seqNums) throws IOException {
// update the page acking bitset. trigger checkpoint on the page if it is fully acked or if we acked more than the
// configured threshold checkpointMaxAcks.
// note that if the fully acked tail page is the first unacked page, it is not really necessary to also checkpoint
// the head page to update firstUnackedPageNum because it will be updated in the next upcoming head page checkpoint
// and in a crash condition, the Queue open recovery will detect and purge fully acked pages
//
// @param seqNums the list of same-page seqNums to ack
// @param checkpointMaxAcks the number of acks that will trigger a page checkpoint
public void ack(List<Long> seqNums, int checkpointMaxAcks) throws IOException {
for (long seqNum : seqNums) {
// TODO: eventually refactor to use new bit handling class
@ -96,18 +104,18 @@ public abstract class Page implements Closeable {
this.ackedSeqNums.set(index);
}
// checkpoint if totally acked or we acked more than 1024 elements in this page since last checkpoint
// checkpoint if totally acked or we acked more than checkpointMaxAcks elements in this page since last checkpoint
// note that fully acked pages cleanup is done at queue level in Queue.ack()
long firstUnackedSeqNum = firstUnackedSeqNum();
if (isFullyAcked()) {
// TODO: here if consumer is faster than producer, the head page may be always fully acked and we may end up fsync'ing too ofter?
checkpoint();
assert firstUnackedSeqNum >= this.minSeqNum + this.elementCount - 1:
String.format("invalid firstUnackedSeqNum=%d for minSeqNum=%d and elementCount=%d and cardinality=%d", firstUnackedSeqNum, this.minSeqNum, this.elementCount, this.ackedSeqNums.cardinality());
} else if (firstUnackedSeqNum > this.lastCheckpoint.getFirstUnackedSeqNum() + 1024) {
// did we acked more that 1024 elements? if so we should checkpoint now
} else if (checkpointMaxAcks > 0 && (firstUnackedSeqNum >= this.lastCheckpoint.getFirstUnackedSeqNum() + checkpointMaxAcks)) {
// did we acked more than checkpointMaxAcks elements? if so checkpoint now
checkpoint();
}
}

View file

@ -46,6 +46,9 @@ public class Queue implements Closeable {
private final int pageCapacity;
private final String dirPath;
private final int maxUnread;
private final int checkpointMaxAcks;
private final int checkpointMaxWrites;
private final int checkpointMaxInterval;
private final AtomicBoolean closed;
@ -59,10 +62,20 @@ public class Queue implements Closeable {
final Condition notEmpty = lock.newCondition();
public Queue(Settings settings) {
this(settings.getDirPath(), settings.getCapacity(), settings.getCheckpointIOFactory().build(settings.getDirPath()), settings.getPageIOFactory(), settings.getElementClass(), settings.getMaxUnread());
this(
settings.getDirPath(),
settings.getCapacity(),
settings.getCheckpointIOFactory().build(settings.getDirPath()),
settings.getPageIOFactory(),
settings.getElementClass(),
settings.getMaxUnread(),
settings.getCheckpointMaxWrites(),
settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxInterval()
);
}
public Queue(String dirPath, int pageCapacity, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread) {
public Queue(String dirPath, int pageCapacity, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
this.dirPath = dirPath;
this.pageCapacity = pageCapacity;
this.checkpointIO = checkpointIO;
@ -72,6 +85,9 @@ public class Queue implements Closeable {
this.unreadTailPages = new ArrayList<>();
this.closed = new AtomicBoolean(true); // not yet opened
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
this.checkpointMaxInterval = checkpointMaxInterval;
this.unreadCount = 0;
// retrieve the deserialize method
@ -94,7 +110,7 @@ public class Queue implements Closeable {
Checkpoint headCheckpoint;
try {
headCheckpoint = checkpointIO.read(checkpointIO.headFileName());
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
} catch (NoSuchFileException e) {
headCheckpoint = null;
}
@ -114,39 +130,43 @@ public class Queue implements Closeable {
// reconstruct all tail pages state upto but excluding the head page
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {
Checkpoint tailCheckpoint = checkpointIO.read(checkpointIO.tailFileName(pageNum));
if (tailCheckpoint == null) { throw new IOException(checkpointIO.tailFileName(pageNum) + " not found"); }
// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
Checkpoint tailCheckpoint = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
TailPage tailPage = new TailPage(tailCheckpoint, this, pageIO);
// if this page is not the first tail page, deactivate it
// we keep the first tail page activated since we know the next read operation will be in that one
if (pageNum > headCheckpoint.getFirstUnackedPageNum()) { pageIO.deactivate(); }
// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
if (tailPage.maxSeqNum() > this.seqNum) { this.seqNum = tailPage.maxSeqNum(); }
insertTailPage(tailPage);
add(tailCheckpoint, pageIO);
}
// transform the head page into a tail page only if the headpage is non-empty
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
if (headCheckpoint.getMinSeqNum() <= 0 && headCheckpoint.getElementCount() <= 0) {
// head page is empty, let's keep it as-is
PageIO headPageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
this.headPage = new HeadPage(headCheckpoint, this, headPageIO);
// but checkpoint it to update the firstUnackedPageNum if it changed
this.headPage.checkpoint();
} else {
PageIO tailPageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
TailPage tailPage = new TailPage(headCheckpoint, this, tailPageIO);
// head page is non-empty, transform it into a tail page and create a new empty head page
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
if (tailPage.maxSeqNum() > this.seqNum) { this.seqNum = tailPage.maxSeqNum(); }
insertTailPage(tailPage);
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
TailPage p = new TailPage(headCheckpoint, this, pageIO);
p.checkpoint();
add(headCheckpoint, pageIO);
headPageNum = headCheckpoint.getPageNum() + 1;
newCheckpointedHeadpage(headPageNum);
// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
if (headCheckpoint.maxSeqNum() > this.seqNum) { this.seqNum = headCheckpoint.maxSeqNum(); }
}
// only activate the first tail page
if (tailPages.size() > 0) {
this.tailPages.get(0).getPageIO().activate();
}
// TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start?
@ -154,19 +174,39 @@ public class Queue implements Closeable {
this.closed.set(false);
}
// insert a recovered tail page into the tail pages state tracking
// and purge it if it is found to be fully acked
private void insertTailPage(TailPage p) throws IOException {
if (!p.isFullyAcked()) {
this.tailPages.add(p);
if (!p.isFullyRead()) {
this.unreadTailPages.add(p);
this.unreadCount += p.unreadCount();
private void add(Checkpoint checkpoint, PageIO pageIO) throws IOException {
if (checkpoint.isFullyAcked()) {
// first make sure any fully acked page per the checkpoint is purged if not already
try { pageIO.purge(); } catch (NoSuchFileException e) { /* ignore */ }
// we want to keep all the "middle" checkpoints between the first unacked tail page and the head page
// to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this
// we will remove any prepended fully acked tail pages but keep all other checkpoints between the first
// unacked tail page and the head page. we did however purge the data file to free disk resources.
if (this.tailPages.size() == 0) {
// this is the first tail page and it is fully acked so just purge it
this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum()));
} else {
// create a tail page with a null PageIO and add it to tail pages but not unreadTailPages
// since it is fully read because also fully acked
this.tailPages.add(new TailPage(checkpoint, this, null));
}
} else {
// for some reason we found a fully acked page, let's purge it.
p.purge();
TailPage p = new TailPage(checkpoint, this, pageIO);
this.tailPages.add(p);
this.unreadTailPages.add(p);
this.unreadCount += p.unreadCount();
// for now deactivate all tail pages, we will only reactivate the first one at the end
pageIO.deactivate();
}
// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
if (checkpoint.maxSeqNum() > this.seqNum) {
this.seqNum = checkpoint.maxSeqNum();
}
}
// create a new empty headpage for the given pageNum and imidiately checkpoint it
@ -174,7 +214,7 @@ public class Queue implements Closeable {
private void newCheckpointedHeadpage(int pageNum) throws IOException {
PageIO headPageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
this.headPage = new HeadPage(pageNum, this, headPageIO);
this.headPage.checkpoint();
this.headPage.forceCheckpoint();
}
@ -210,13 +250,10 @@ public class Queue implements Closeable {
}
// create new head page
int headPageNum = tailPage.pageNum + 1;
PageIO pageIO = this.pageIOFactory.build(headPageNum, this.pageCapacity, this.dirPath);
this.headPage = new HeadPage(headPageNum, this, pageIO);
this.headPage.checkpoint();
newCheckpointedHeadpage(tailPage.pageNum + 1);
}
this.headPage.write(data, seqNum);
this.headPage.write(data, seqNum, this.checkpointMaxWrites);
this.unreadCount++;
// if the queue was empty before write, signal non emptiness
@ -273,11 +310,7 @@ public class Queue implements Closeable {
lock.lock();
try {
Page p = firstUnreadPage();
if (p == null) {
return null;
}
return _readPageBatch(p, limit);
return (p == null) ? null : _readPageBatch(p, limit);
} finally {
lock.unlock();
}
@ -312,9 +345,7 @@ public class Queue implements Closeable {
}
// need to check for close since it is a condition for exiting the while loop
if (isClosed()) { return null; }
return _readPageBatch(p, limit);
return (isClosed()) ? null : _readPageBatch(p, limit);
} finally {
lock.unlock();
}
@ -361,10 +392,7 @@ public class Queue implements Closeable {
Batch b = p.readBatch(limit);
this.unreadCount -= b.size();
if (p.isFullyRead()) {
removeUnreadPage(p);
}
if (p.isFullyRead()) { removeUnreadPage(p); }
if (wasFull) { notFull.signal(); }
return b;
@ -410,6 +438,10 @@ public class Queue implements Closeable {
return null;
}
// ack a list of seqNums that are assumed to be all part of the same page, leveraging the fact that batches are also created from
// same-page elements. A fully acked page will trigger a checkpoint for that page. Also if a page has more than checkpointMaxAcks
// acks since last checkpoint it will also trigger a checkpoint.
// @param seqNums the list of same-page sequence numbers to ack
public void ack(List<Long> seqNums) throws IOException {
// as a first implementation we assume that all batches are created from the same page
// so we will avoid multi pages acking here for now
@ -419,23 +451,52 @@ public class Queue implements Closeable {
lock.lock();
try {
// dual search strategy: if few tail pages search linearily otherwise perform binary search
TailPageResult result = (this.tailPages.size() > 3) ? binaryFindPageForSeqnum(firstAckSeqNum) : linearFindPageForSeqnum(firstAckSeqNum);
TailPageResult result = null;
if (this.tailPages.size() > 0) {
// short-circuit: first check in the first tail page as it is the most likely page where acking will happen
TailPage p = this.tailPages.get(0);
if (p.getMinSeqNum() > 0 && firstAckSeqNum >= p.getMinSeqNum() && firstAckSeqNum < p.getMinSeqNum() + p.getElementCount()) {
result = new TailPageResult(p, 0);
} else {
// dual search strategy: if few tail pages search linearily otherwise perform binary search
result = (this.tailPages.size() > 3) ? binaryFindPageForSeqnum(firstAckSeqNum) : linearFindPageForSeqnum(firstAckSeqNum);
}
}
if (result == null) {
// if not found then it is in head page
assert this.headPage.getMinSeqNum() > 0 && firstAckSeqNum >= this.headPage.getMinSeqNum() && firstAckSeqNum < this.headPage.getMinSeqNum() + this.headPage.getElementCount():
String.format("seqNum=%d is not in head page with minSeqNum=%d", firstAckSeqNum, this.headPage.getMinSeqNum());
this.headPage.ack(seqNums);
// page acking checkpoints fully acked pages
this.headPage.ack(seqNums, this.checkpointMaxAcks);
} else {
result.page.ack(seqNums);
// page acking also checkpoints fully acked pages or upon reaching the checkpointMaxAcks threshold
result.page.ack(seqNums, this.checkpointMaxAcks);
// cleanup fully acked tail page
if (result.page.isFullyAcked()) {
this.tailPages.remove(result.index);
this.headPage.checkpoint();
// remove page data file regardless if it is the first or a middle tail page to free resources
result.page.purge();
if (result.index == 0) {
// if this is the first page also remove checkpoint file
this.checkpointIO.purge(this.checkpointIO.tailFileName(result.page.getPageNum()));
// and see if next "first tail page" is also fully acked and remove checkpoint file
while (this.tailPages.size() > 0) {
TailPage p = this.tailPages.get(0);
if (!p.isFullyAcked()) { break; }
this.checkpointIO.purge(this.checkpointIO.tailFileName(p.getPageNum()));
this.tailPages.remove(0);
}
}
}
this.headPage.checkpoint();
}
} finally {
lock.unlock();

View file

@ -14,6 +14,12 @@ public interface Settings {
Settings setMaxUnread(int maxUnread);
Settings setCheckpointMaxAcks(int checkpointMaxAcks);
Settings setCheckpointMaxWrites(int checkpointMaxWrites);
Settings setCheckpointMaxInterval(int checkpointMaxInterval);
CheckpointIOFactory getCheckpointIOFactory();
PageIOFactory getPageIOFactory();
@ -25,4 +31,10 @@ public interface Settings {
int getCapacity();
int getMaxUnread();
int getCheckpointMaxAcks();
int getCheckpointMaxWrites();
int getCheckpointMaxInterval();
}

View file

@ -17,13 +17,16 @@ public class TailPage extends Page {
public TailPage(Checkpoint checkpoint, Queue queue, PageIO pageIO) throws IOException {
super(checkpoint.getPageNum(), queue, checkpoint.getMinSeqNum(), checkpoint.getElementCount(), checkpoint.getFirstUnackedSeqNum(), new BitSet(), pageIO);
// open the data file and reconstruct the IO object internal state
pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount());
// this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset
if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
this.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
}
if (pageIO != null) {
// open the data file and reconstruct the IO object internal state
pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount());
}
}
public void checkpoint() throws IOException {
@ -31,18 +34,20 @@ public class TailPage extends Page {
// since this is a tail page and no write can happen in this page, there is no point in performing a fsync on this page, just stamp checkpoint
CheckpointIO io = queue.getCheckpointIO();
this.lastCheckpoint = io.write(io.tailFileName(this.pageNum), this.pageNum, this.queue.firstUnackedPageNum(), firstUnackedSeqNum(), this.minSeqNum, this.elementCount);
this.lastCheckpoint = io.write(io.tailFileName(this.pageNum), this.pageNum, 0, firstUnackedSeqNum(), this.minSeqNum, this.elementCount);
}
// delete all IO files associated with this page
public void purge() throws IOException {
this.pageIO.purge();
CheckpointIO io = queue.getCheckpointIO();
io.purge(io.tailFileName(this.pageNum));
if (this.pageIO != null) {
this.pageIO.purge(); // page IO purge calls close
}
}
public void close() throws IOException {
checkpoint();
this.pageIO.close();
if (this.pageIO != null) {
this.pageIO.close();
}
}
}

View file

@ -8,6 +8,8 @@ public interface CheckpointIO {
// @return Checkpoint the written checkpoint object
Checkpoint write(String fileName, int pageNum, int firstUnackedPageNum, long firstUnackedSeqNum, long minSeqNum, int elementCount) throws IOException;
void write(String fileName, Checkpoint checkpoint) throws IOException;
Checkpoint read(String fileName) throws IOException;
void purge(String fileName) throws IOException;

View file

@ -43,12 +43,17 @@ public class FileCheckpointIO implements CheckpointIO {
@Override
public Checkpoint write(String fileName, int pageNum, int firstUnackedPageNum, long firstUnackedSeqNum, long minSeqNum, int elementCount) throws IOException {
Path path = Paths.get(dirPath, fileName);
Checkpoint checkpoint = new Checkpoint(pageNum, firstUnackedPageNum, firstUnackedSeqNum, minSeqNum, elementCount);
write(fileName, checkpoint);
return checkpoint;
}
@Override
public void write(String fileName, Checkpoint checkpoint) throws IOException {
Path path = Paths.get(dirPath, fileName);
final byte[] buffer = new byte[BUFFER_SIZE];
write(checkpoint, buffer);
Files.write(path, buffer);
return checkpoint;
}
@Override

View file

@ -31,10 +31,15 @@ public class MemoryCheckpointIO implements CheckpointIO {
@Override
public Checkpoint write(String fileName, int pageNum, int firstUnackedPageNum, long firstUnackedSeqNum, long minSeqNum, int elementCount) throws IOException {
Checkpoint checkpoint = new Checkpoint(pageNum, firstUnackedPageNum, firstUnackedSeqNum, minSeqNum, elementCount);
this.sources.put(fileName, checkpoint);
write(fileName, checkpoint);
return checkpoint;
}
@Override
public void write(String fileName, Checkpoint checkpoint) throws IOException {
this.sources.put(fileName, checkpoint);
}
@Override
public void purge(String fileName) {
this.sources.remove(fileName);

View file

@ -38,11 +38,11 @@ public class HeadPageTest {
Settings s = TestSettings.getSettings(singleElementCapacity);
Queue q = new Queue(s);
PageIO pageIO = s.getPageIOFactory().build(0, singleElementCapacity, "dummy");
HeadPage p = new HeadPage(0, q, pageIO);
q.open();
HeadPage p = q.headPage;
assertThat(p.hasSpace(element.serialize().length), is(true));
p.write(element.serialize(), 0);
p.write(element.serialize(), 0, 1);
assertThat(p.hasSpace(element.serialize().length), is(false));
assertThat(p.isFullyRead(), is(false));
@ -57,11 +57,11 @@ public class HeadPageTest {
Settings s = TestSettings.getSettings(singleElementCapacity);
Queue q = new Queue(s);
PageIO pageIO = s.getPageIOFactory().build(0, singleElementCapacity, "dummy");
HeadPage p = new HeadPage(0, q, pageIO);
q.open();
HeadPage p = q.headPage;
assertThat(p.hasSpace(element.serialize().length), is(true));
p.write(element.serialize(), seqNum);
p.write(element.serialize(), seqNum, 1);
Batch b = p.readBatch(1);
@ -81,11 +81,11 @@ public class HeadPageTest {
Settings s = TestSettings.getSettings(singleElementCapacity);
Queue q = new Queue(s);
PageIO pageIO = s.getPageIOFactory().build(0, singleElementCapacity, "dummy");
HeadPage p = new HeadPage(0, q, pageIO);
q.open();
HeadPage p = q.headPage;
assertThat(p.hasSpace(element.serialize().length), is(true));
p.write(element.serialize(), seqNum);
p.write(element.serialize(), seqNum, 1);
Batch b = p.readBatch(10);
@ -97,20 +97,21 @@ public class HeadPageTest {
assertThat(p.isFullyAcked(), is(false));
}
@Test
public void pageViaQueueOpenForHeadCheckpointWithoutSupportingPageFiles() throws Exception {
URL url = FileCheckpointIOTest.class.getResource("checkpoint.head");
String dirPath = Paths.get(url.toURI()).getParent().toString();
Queueable element = new StringElement("foobarbaz");
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
Settings s = TestSettings.getSettingsCheckpointFilePageMemory(singleElementCapacity, dirPath);
TestQueue q = new TestQueue(s);
try {
q.open();
} catch (NoSuchFileException e) {
assertThat(e.getMessage(), containsString("checkpoint.2"));
}
HeadPage p = q.getHeadPage();
assertThat(p, is(equalTo(null)));
}
// disabled test until we figure what to do in this condition
// @Test
// public void pageViaQueueOpenForHeadCheckpointWithoutSupportingPageFiles() throws Exception {
// URL url = FileCheckpointIOTest.class.getResource("checkpoint.head");
// String dirPath = Paths.get(url.toURI()).getParent().toString();
// Queueable element = new StringElement("foobarbaz");
// int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
// Settings s = TestSettings.getSettingsCheckpointFilePageMemory(singleElementCapacity, dirPath);
// TestQueue q = new TestQueue(s);
// try {
// q.open();
// } catch (NoSuchFileException e) {
// assertThat(e.getMessage(), containsString("checkpoint.2"));
// }
// HeadPage p = q.getHeadPage();
// assertThat(p, is(equalTo(null)));
// }
}

View file

@ -174,6 +174,7 @@ public class QueueTest {
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(elements1.get(0).serialize().length);
Settings settings = TestSettings.getSettings(2 * singleElementCapacity);
settings.setCheckpointMaxWrites(1024); // arbritary high enough threshold so that it's not reached (default for TestSettings is 1)
TestQueue q = new TestQueue(settings);
q.open();