mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
parent
b7bd9dec13
commit
b4e745ced0
2 changed files with 24 additions and 38 deletions
|
@ -18,25 +18,25 @@
|
||||||
*/
|
*/
|
||||||
package org.logstash.common.io;
|
package org.logstash.common.io;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.channels.FileChannel;
|
|
||||||
import java.nio.channels.OverlappingFileLockException;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.nio.file.StandardOpenOption;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.logstash.DLQEntry;
|
import org.logstash.DLQEntry;
|
||||||
import org.logstash.Event;
|
import org.logstash.Event;
|
||||||
import org.logstash.FieldReference;
|
import org.logstash.FieldReference;
|
||||||
|
import org.logstash.FileLockFactory;
|
||||||
import org.logstash.PathCache;
|
import org.logstash.PathCache;
|
||||||
import org.logstash.Timestamp;
|
import org.logstash.Timestamp;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.channels.FileLock;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
|
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
|
||||||
|
|
||||||
public final class DeadLetterQueueWriter implements Closeable {
|
public final class DeadLetterQueueWriter implements Closeable {
|
||||||
|
@ -52,27 +52,14 @@ public final class DeadLetterQueueWriter implements Closeable {
|
||||||
private final long maxQueueSize;
|
private final long maxQueueSize;
|
||||||
private LongAdder currentQueueSize;
|
private LongAdder currentQueueSize;
|
||||||
private final Path queuePath;
|
private final Path queuePath;
|
||||||
private final FileChannel lockChannel;
|
private final FileLock lock;
|
||||||
private volatile RecordIOWriter currentWriter;
|
private volatile RecordIOWriter currentWriter;
|
||||||
private int currentSegmentIndex;
|
private int currentSegmentIndex;
|
||||||
private Timestamp lastEntryTimestamp;
|
private Timestamp lastEntryTimestamp;
|
||||||
private final AtomicBoolean open = new AtomicBoolean(true);
|
private final AtomicBoolean open = new AtomicBoolean(true);
|
||||||
|
|
||||||
public DeadLetterQueueWriter(Path queuePath, long maxSegmentSize, long maxQueueSize) throws IOException {
|
public DeadLetterQueueWriter(Path queuePath, long maxSegmentSize, long maxQueueSize) throws IOException {
|
||||||
// ensure path exists, create it otherwise.
|
this.lock = FileLockFactory.obtainLock(queuePath.toString(), LOCK_FILE);
|
||||||
Files.createDirectories(queuePath);
|
|
||||||
// check that only one instance of the writer is open in this configured path
|
|
||||||
Path lockFilePath = queuePath.resolve(LOCK_FILE);
|
|
||||||
boolean isNewlyCreated = lockFilePath.toFile().createNewFile();
|
|
||||||
lockChannel = FileChannel.open(lockFilePath, StandardOpenOption.WRITE);
|
|
||||||
try {
|
|
||||||
lockChannel.lock();
|
|
||||||
} catch (OverlappingFileLockException e) {
|
|
||||||
if (isNewlyCreated) {
|
|
||||||
logger.warn("Previous Dead Letter Queue Writer was not closed safely.");
|
|
||||||
}
|
|
||||||
throw new RuntimeException("uh oh, someone else is writing to this dead-letter queue");
|
|
||||||
}
|
|
||||||
this.queuePath = queuePath;
|
this.queuePath = queuePath;
|
||||||
this.maxSegmentSize = maxSegmentSize;
|
this.maxSegmentSize = maxSegmentSize;
|
||||||
this.maxQueueSize = maxQueueSize;
|
this.maxQueueSize = maxQueueSize;
|
||||||
|
@ -178,17 +165,15 @@ public final class DeadLetterQueueWriter implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void releaseLock() {
|
private void releaseLock() {
|
||||||
if (lockChannel != null){
|
try {
|
||||||
try {
|
FileLockFactory.releaseLock(lock);
|
||||||
lockChannel.close();
|
} catch (IOException e) {
|
||||||
} catch (Exception e) {
|
logger.debug("Unable to release lock", e);
|
||||||
logger.debug("Unable to close lock channel", e);
|
}
|
||||||
}
|
try {
|
||||||
try {
|
Files.deleteIfExists(queuePath.resolve(LOCK_FILE));
|
||||||
Files.deleteIfExists(queuePath.resolve(LOCK_FILE));
|
} catch (IOException e){
|
||||||
} catch (IOException e){
|
logger.debug("Unable to delete lock file", e);
|
||||||
logger.debug("Unable to delete lock file", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
import org.logstash.DLQEntry;
|
import org.logstash.DLQEntry;
|
||||||
import org.logstash.Event;
|
import org.logstash.Event;
|
||||||
|
import org.logstash.LockException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
|
@ -70,7 +71,7 @@ public class DeadLetterQueueWriterTest {
|
||||||
try {
|
try {
|
||||||
new DeadLetterQueueWriter(dir, 1000, 100000);
|
new DeadLetterQueueWriter(dir, 1000, 100000);
|
||||||
fail();
|
fail();
|
||||||
} catch (RuntimeException e) {
|
} catch (LockException e) {
|
||||||
} finally {
|
} finally {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue