diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java index fa8e22714..c228fd363 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java @@ -18,6 +18,7 @@ */ package org.logstash.common.io; +import java.io.Closeable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.DLQEntry; @@ -39,7 +40,7 @@ import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; import static org.logstash.common.io.DeadLetterQueueWriter.getSegmentPaths; -public class DeadLetterQueueReader { +public final class DeadLetterQueueReader implements Closeable { private static final Logger logger = LogManager.getLogger(DeadLetterQueueReader.class); private RecordIOReader currentReader; @@ -142,6 +143,7 @@ public class DeadLetterQueueReader { return currentReader.getChannelPosition(); } + @Override public void close() throws IOException { if (currentReader != null) { currentReader.close(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 9fe828a78..5e1f287bb 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -18,6 +18,7 @@ */ package org.logstash.common.io; +import java.io.Closeable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.DLQEntry; @@ -37,7 +38,7 @@ import java.util.stream.Stream; import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE; -public class DeadLetterQueueWriter { +public final class DeadLetterQueueWriter implements Closeable { private static final Logger logger = LogManager.getLogger(DeadLetterQueueWriter.class); private static final long MAX_SEGMENT_SIZE_BYTES = 10 * 1024 * 1024; @@ -143,6 +144,7 @@ public class DeadLetterQueueWriter { currentQueueSize.add(currentWriter.writeEvent(record)); } + @Override public synchronized void close() throws IOException { this.lock.release(); if (currentWriter != null) { diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java index 1ccce42a7..2dea36ac3 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java @@ -18,6 +18,7 @@ */ package org.logstash.common.io; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; @@ -36,7 +37,7 @@ import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE; /** */ -public class RecordIOReader { +public final class RecordIOReader implements Closeable { private final FileChannel channel; private final ByteBuffer currentBlock; @@ -228,6 +229,7 @@ public class RecordIOReader { } } + @Override public void close() throws IOException { channel.close(); } diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java index 7bd8f56d5..d8bbcb562 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java @@ -18,6 +18,7 @@ */ package org.logstash.common.io; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -63,7 +64,7 @@ import static org.logstash.common.io.RecordType.START; * by a final END record type. * END: The final record segment of an LS Event, the final record representing the end of an LS Event. */ -public class RecordIOWriter { +public final class RecordIOWriter implements Closeable { private final FileChannel channel; private int posInBlock; @@ -139,6 +140,7 @@ public class RecordIOWriter { return channel.position() - startPosition; } + @Override public void close() throws IOException { channel.close(); }