add option for PQ checkpoint retry

Fixes #10234
This commit is contained in:
Dan Hermann 2018-12-21 20:08:15 -06:00
parent 2a45a8fd9d
commit 19554259b8
14 changed files with 88 additions and 25 deletions

View file

@ -174,9 +174,13 @@ The `logstash.yml` file includes the following settings. If you are using X-Pack
| The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited.
| 1024
| `queue.checkpoint.retry`
| When enabled, Logstash will retry once per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances.
| `false`
| `queue.drain`
| When enabled, Logstash waits until the persistent queue is drained before shutting down.
| false
| `false`
| `dead_letter_queue.enable`
| Flag to instruct Logstash to enable the DLQ feature supported by plugins.

View file

@ -62,6 +62,7 @@ module LogStash
Setting::Numeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing
Setting::Boolean.new("queue.checkpoint.retry", false),
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::TimeValue.new("slowlog.threshold.warn", "-1"),

View file

@ -28,6 +28,7 @@ module LogStash
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.writes",
"queue.checkpoint.retry",
"queue.drain",
"queue.max_bytes",
"queue.max_events",

View file

@ -14,7 +14,7 @@ describe LogStash::WrappedAckedQueue, :stress_test => true do
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }
let(:queue) do
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, queue_capacity)
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, false, queue_capacity)
end
let(:writer_threads) do

View file

@ -110,7 +110,7 @@ describe LogStash::WrappedWriteClient do
context "WrappedAckedQueue" do
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, 1024, 4096) }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, 1024, false, 4096) }
before do
read_client.set_events_metric(metric.namespace([:stats, :events]))

View file

@ -14,6 +14,7 @@ describe LogStash::QueueFactory do
LogStash::Setting::Numeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::Numeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::Numeric.new("queue.checkpoint.interval", 1000),
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
LogStash::Setting::String.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)

View file

@ -38,7 +38,7 @@ describe LogStash::WrappedAckedQueue do
let(:checkpoint_writes) { 1024 }
let(:checkpoint_interval) { 0 }
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, checkpoint_interval, max_bytes) }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, checkpoint_interval, false, max_bytes) }
after do
queue.close

View file

@ -77,7 +77,7 @@ public final class Queue implements Closeable {
}
this.pageCapacity = settings.getCapacity();
this.maxBytes = settings.getQueueMaxBytes();
this.checkpointIO = new FileCheckpointIO(dirPath);
this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry());
this.elementClass = settings.getElementClass();
this.tailPages = new ArrayList<>();
this.unreadTailPages = new ArrayList<>();

View file

@ -44,6 +44,7 @@ public final class QueueFactoryExt extends RubyBasicObject {
getSetting(context, settings, "queue.checkpoint.writes"),
getSetting(context, settings, "queue.checkpoint.acks"),
getSetting(context, settings, "queue.checkpoint.interval"),
getSetting(context, settings, "queue.checkpoint.retry"),
getSetting(context, settings, "queue.max_bytes")
}
);

View file

@ -16,6 +16,8 @@ public interface Settings {
int getCheckpointMaxWrites();
boolean getCheckpointRetry();
interface Builder {
Builder elementClass(Class<? extends Queueable> elementClass);
@ -30,6 +32,8 @@ public interface Settings {
Builder checkpointMaxWrites(int checkpointMaxWrites);
Builder checkpointRetry(boolean checkpointRetry);
Settings build();
}

View file

@ -8,11 +8,12 @@ public class SettingsImpl implements Settings {
private int maxUnread;
private int checkpointMaxAcks;
private int checkpointMaxWrites;
private boolean checkpointRetry;
public static Builder builder(final Settings settings) {
return new BuilderImpl(settings.getDirPath(), settings.getElementClass(), settings.getCapacity(),
settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxWrites()
settings.getCheckpointMaxWrites(), settings.getCheckpointRetry()
);
}
@ -22,7 +23,7 @@ public class SettingsImpl implements Settings {
private SettingsImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
final int capacity, final long queueMaxBytes, final int maxUnread,
final int checkpointMaxAcks, final int checkpointMaxWrites) {
final int checkpointMaxAcks, final int checkpointMaxWrites, boolean checkpointRetry) {
this.dirForFiles = dirForFiles;
this.elementClass = elementClass;
this.capacity = capacity;
@ -30,6 +31,7 @@ public class SettingsImpl implements Settings {
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
this.checkpointRetry = checkpointRetry;
}
@Override
@ -67,6 +69,11 @@ public class SettingsImpl implements Settings {
return this.maxUnread;
}
@Override
public boolean getCheckpointRetry() {
return this.checkpointRetry;
}
private static final class BuilderImpl implements Builder {
/**
@ -111,15 +118,17 @@ public class SettingsImpl implements Settings {
private final int checkpointMaxWrites;
private final boolean checkpointRetry;
private BuilderImpl(final String dirForFiles) {
this(dirForFiles, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES
DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES, false
);
}
private BuilderImpl(final String dirForFiles, final Class<? extends Queueable> elementClass,
final int capacity, final long queueMaxBytes, final int maxUnread,
final int checkpointMaxAcks, final int checkpointMaxWrites) {
final int checkpointMaxAcks, final int checkpointMaxWrites, final boolean checkpointRetry) {
this.dirForFiles = dirForFiles;
this.elementClass = elementClass;
this.capacity = capacity;
@ -127,14 +136,14 @@ public class SettingsImpl implements Settings {
this.maxUnread = maxUnread;
this.checkpointMaxAcks = checkpointMaxAcks;
this.checkpointMaxWrites = checkpointMaxWrites;
this.checkpointRetry = checkpointRetry;
}
@Override
public Builder elementClass(final Class<? extends Queueable> elementClass) {
return new BuilderImpl(
this.dirForFiles, elementClass, this.capacity, this.queueMaxBytes, this.maxUnread,
this.checkpointMaxAcks,
this.checkpointMaxWrites
this.checkpointMaxAcks, this.checkpointMaxWrites, false
);
}
@ -142,7 +151,7 @@ public class SettingsImpl implements Settings {
public Builder capacity(final int capacity) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, capacity, this.queueMaxBytes, this.maxUnread,
this.checkpointMaxAcks, this.checkpointMaxWrites
this.checkpointMaxAcks, this.checkpointMaxWrites, false
);
}
@ -150,7 +159,7 @@ public class SettingsImpl implements Settings {
public Builder queueMaxBytes(final long size) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, this.capacity, size, this.maxUnread,
this.checkpointMaxAcks, this.checkpointMaxWrites
this.checkpointMaxAcks, this.checkpointMaxWrites, false
);
}
@ -159,7 +168,7 @@ public class SettingsImpl implements Settings {
return new BuilderImpl(
this.dirForFiles, this.elementClass,
this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks,
this.checkpointMaxWrites
this.checkpointMaxWrites, false
);
}
@ -168,7 +177,7 @@ public class SettingsImpl implements Settings {
return new BuilderImpl(
this.dirForFiles, this.elementClass,
this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks,
this.checkpointMaxWrites
this.checkpointMaxWrites, false
);
}
@ -176,7 +185,15 @@ public class SettingsImpl implements Settings {
public Builder checkpointMaxWrites(final int checkpointMaxWrites) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, false
);
}
@Override
public Builder checkpointRetry(final boolean checkpointRetry) {
return new BuilderImpl(
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, checkpointRetry
);
}
@ -184,7 +201,7 @@ public class SettingsImpl implements Settings {
public Settings build() {
return new SettingsImpl(
this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
this.maxUnread, this.checkpointMaxAcks, this.checkpointMaxWrites
this.maxUnread, this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
);
}
}

View file

@ -34,13 +34,16 @@ public final class JRubyAckedQueueExt extends RubyObject {
return this.queue;
}
public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents, int checkpointMaxWrites, int checkpointMaxAcks, long maxBytes) {
public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents, int checkpointMaxWrites,
int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) {
JRubyAckedQueueExt queueExt = new JRubyAckedQueueExt(RubyUtil.RUBY, RubyUtil.ACKED_QUEUE_CLASS);
queueExt.initializeQueue(path, capacity, maxEvents, checkpointMaxWrites, checkpointMaxAcks, maxBytes);
queueExt.initializeQueue(path, capacity, maxEvents, checkpointMaxWrites, checkpointMaxAcks, checkpointRetry,
maxBytes);
return queueExt;
}
private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites, int checkpointMaxAcks, long maxBytes) {
private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites,
int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) {
this.queue = new Queue(
SettingsImpl.fileSettingsBuilder(path)
.capacity(capacity)
@ -48,6 +51,7 @@ public final class JRubyAckedQueueExt extends RubyObject {
.queueMaxBytes(maxBytes)
.checkpointMaxAcks(checkpointMaxAcks)
.checkpointMaxWrites(checkpointMaxWrites)
.checkpointRetry(checkpointRetry)
.elementClass(Event.class)
.build()
);

View file

@ -27,17 +27,18 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {
private JRubyAckedQueueExt queue;
private final AtomicBoolean isClosed = new AtomicBoolean();
@JRubyMethod(optional = 7)
@JRubyMethod(optional = 8)
public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
args = Arity.scanArgs(context.runtime, args, 7, 0);
args = Arity.scanArgs(context.runtime, args, 8, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxEvents = RubyFixnum.num2int(args[2]);
int checkpointMaxWrites = RubyFixnum.num2int(args[3]);
int checkpointMaxAcks = RubyFixnum.num2int(args[4]);
long queueMaxBytes = RubyFixnum.num2long(args[6]);
boolean checkpointRetry = !((RubyBoolean)args[6]).isFalse();
long queueMaxBytes = RubyFixnum.num2long(args[7]);
this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents,
checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes);
checkpointMaxWrites, checkpointMaxAcks, checkpointRetry, queueMaxBytes);
this.queue.open();
return this;

View file

@ -8,6 +8,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.zip.CRC32;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ackedqueue.Checkpoint;
public class FileCheckpointIO implements CheckpointIO {
@ -20,6 +23,8 @@ public class FileCheckpointIO implements CheckpointIO {
// long minSeqNum;
// int elementCount;
private static final Logger logger = LogManager.getLogger(FileCheckpointIO.class);
public static final int BUFFER_SIZE = Short.BYTES // version
+ Integer.BYTES // pageNum
+ Integer.BYTES // firstUnackedPageNum
@ -36,12 +41,19 @@ public class FileCheckpointIO implements CheckpointIO {
private final CRC32 crc32 = new CRC32();
private final boolean retry;
private static final String HEAD_CHECKPOINT = "checkpoint.head";
private static final String TAIL_CHECKPOINT = "checkpoint.";
private final Path dirPath;
public FileCheckpointIO(Path dirPath) {
this(dirPath, false);
}
public FileCheckpointIO(Path dirPath, boolean retry) {
this.dirPath = dirPath;
this.retry = retry;
}
@Override
@ -67,7 +79,24 @@ public class FileCheckpointIO implements CheckpointIO {
out.getChannel().write(buffer);
out.getFD().sync();
}
Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
try {
Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
} catch (IOException ex) {
if (retry) {
try {
logger.error("Retrying after exception writing checkpoint: " + ex);
Thread.sleep(500);
Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
} catch (Exception ex2) {
logger.error("Aborting after second exception writing checkpoint: " + ex2);
throw ex;
}
} else {
logger.error("Error writing checkpoint: " + ex);
throw ex;
}
}
}
@Override