From b4e745ced0b235ba941e423decea1be29128820b Mon Sep 17 00:00:00 2001 From: liketic Date: Tue, 19 Sep 2017 21:17:06 +0800 Subject: [PATCH] Obtain file lock through FileLockFactory for DLQ Fixes #8314 --- .../common/io/DeadLetterQueueWriter.java | 59 +++++++------------ .../common/io/DeadLetterQueueWriterTest.java | 3 +- 2 files changed, 24 insertions(+), 38 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 3fa0604d5..131e5b3c3 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -18,25 +18,25 @@ */ package org.logstash.common.io; -import java.io.Closeable; -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.channels.OverlappingFileLockException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.LongAdder; -import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.DLQEntry; import org.logstash.Event; import org.logstash.FieldReference; +import org.logstash.FileLockFactory; import org.logstash.PathCache; import org.logstash.Timestamp; +import java.io.Closeable; +import java.io.IOException; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Stream; + import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE; public final class DeadLetterQueueWriter implements Closeable { @@ -52,27 +52,14 @@ public final class DeadLetterQueueWriter implements Closeable { private final long maxQueueSize; private LongAdder currentQueueSize; private final Path queuePath; - private final FileChannel lockChannel; + private final FileLock lock; private volatile RecordIOWriter currentWriter; private int currentSegmentIndex; private Timestamp lastEntryTimestamp; private final AtomicBoolean open = new AtomicBoolean(true); public DeadLetterQueueWriter(Path queuePath, long maxSegmentSize, long maxQueueSize) throws IOException { - // ensure path exists, create it otherwise. - Files.createDirectories(queuePath); - // check that only one instance of the writer is open in this configured path - Path lockFilePath = queuePath.resolve(LOCK_FILE); - boolean isNewlyCreated = lockFilePath.toFile().createNewFile(); - lockChannel = FileChannel.open(lockFilePath, StandardOpenOption.WRITE); - try { - lockChannel.lock(); - } catch (OverlappingFileLockException e) { - if (isNewlyCreated) { - logger.warn("Previous Dead Letter Queue Writer was not closed safely."); - } - throw new RuntimeException("uh oh, someone else is writing to this dead-letter queue"); - } + this.lock = FileLockFactory.obtainLock(queuePath.toString(), LOCK_FILE); this.queuePath = queuePath; this.maxSegmentSize = maxSegmentSize; this.maxQueueSize = maxQueueSize; @@ -178,17 +165,15 @@ public final class DeadLetterQueueWriter implements Closeable { } private void releaseLock() { - if (lockChannel != null){ - try { - lockChannel.close(); - } catch (Exception e) { - logger.debug("Unable to close lock channel", e); - } - try { - Files.deleteIfExists(queuePath.resolve(LOCK_FILE)); - } catch (IOException e){ - logger.debug("Unable to delete lock file", e); - } + try { + FileLockFactory.releaseLock(lock); + } catch (IOException e) { + logger.debug("Unable to release lock", e); + } + try { + Files.deleteIfExists(queuePath.resolve(LOCK_FILE)); + } catch (IOException e){ + logger.debug("Unable to delete lock file", e); } } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index 87b9a777b..dc8557410 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.logstash.DLQEntry; import org.logstash.Event; +import org.logstash.LockException; import java.io.IOException; import java.nio.channels.FileChannel; @@ -70,7 +71,7 @@ public class DeadLetterQueueWriterTest { try { new DeadLetterQueueWriter(dir, 1000, 100000); fail(); - } catch (RuntimeException e) { + } catch (LockException e) { } finally { writer.close(); }