diff --git a/logstash-core/src/main/java/org/logstash/DLQEntry.java b/logstash-core/src/main/java/org/logstash/DLQEntry.java index faaef9375..786249015 100644 --- a/logstash-core/src/main/java/org/logstash/DLQEntry.java +++ b/logstash-core/src/main/java/org/logstash/DLQEntry.java @@ -58,17 +58,12 @@ public class DLQEntry implements Cloneable, Serializable, Queueable { + pluginTypeBytes.length + pluginIdBytes.length + reasonBytes.length - + (Integer.BYTES * 5)); - buffer.putInt(entryTimeInBytes.length); - buffer.put(entryTimeInBytes); - buffer.putInt(eventInBytes.length); - buffer.put(eventInBytes); - buffer.putInt(pluginTypeBytes.length); - buffer.put(pluginTypeBytes); - buffer.putInt(pluginIdBytes.length); - buffer.put(pluginIdBytes); - buffer.putInt(reasonBytes.length); - buffer.put(reasonBytes); + + (Integer.BYTES * 5)); // magic number represents the five byte[] + lengths + putLengthAndBytes(buffer, entryTimeInBytes); + putLengthAndBytes(buffer, eventInBytes); + putLengthAndBytes(buffer, pluginTypeBytes); + putLengthAndBytes(buffer, pluginIdBytes); + putLengthAndBytes(buffer, reasonBytes); return buffer.array(); } @@ -77,34 +72,27 @@ public class DLQEntry implements Cloneable, Serializable, Queueable { buffer.put(bytes); buffer.position(0); - int entryTimeLength = buffer.getInt(); - byte[] entryTimeBytes = new byte[entryTimeLength]; - buffer.get(entryTimeBytes); - Timestamp entryTime = new Timestamp(new String(entryTimeBytes)); - - int eventLength = buffer.getInt(); - byte[] eventBytes = new byte[eventLength]; - buffer.get(eventBytes); - Event event = Event.deserialize(eventBytes); - - int pluginTypeLength = buffer.getInt(); - byte[] pluginTypeBytes = new byte[pluginTypeLength]; - buffer.get(pluginTypeBytes); - String pluginType = new String(pluginTypeBytes); - - int pluginIdLength = buffer.getInt(); - byte[] pluginIdBytes = new byte[pluginIdLength]; - buffer.get(pluginIdBytes); - String pluginId = new String(pluginIdBytes); - - int reasonLength = buffer.getInt(); - byte[] reasonBytes = new byte[reasonLength]; - buffer.get(reasonBytes); - String reason = new String(reasonBytes); + Timestamp entryTime = new Timestamp(new String(getLengthPrefixedBytes(buffer))); + Event event = Event.deserialize(getLengthPrefixedBytes(buffer)); + String pluginType = new String(getLengthPrefixedBytes(buffer)); + String pluginId = new String(getLengthPrefixedBytes(buffer)); + String reason = new String(getLengthPrefixedBytes(buffer)); return new DLQEntry(event, pluginType, pluginId, reason, entryTime); } + private static void putLengthAndBytes(ByteBuffer buffer, byte[] bytes) { + buffer.putInt(bytes.length); + buffer.put(bytes); + } + + private static byte[] getLengthPrefixedBytes(ByteBuffer buffer) { + int length = buffer.getInt(); + byte[] bytes = new byte[length]; + buffer.get(bytes); + return bytes; + } + public Event getEvent() { return event; } diff --git a/logstash-core/src/main/java/org/logstash/Timestamp.java b/logstash-core/src/main/java/org/logstash/Timestamp.java index 32dff8302..f76ede04a 100644 --- a/logstash-core/src/main/java/org/logstash/Timestamp.java +++ b/logstash-core/src/main/java/org/logstash/Timestamp.java @@ -13,7 +13,7 @@ import java.io.IOException; import java.util.Date; @JsonSerialize(using = org.logstash.json.TimestampSerializer.class) -public class Timestamp implements Cloneable, Queueable { +public class Timestamp implements Cloneable, Comparable, Queueable { // all methods setting the time object must set it in the UTC timezone private DateTime time; @@ -77,6 +77,11 @@ public class Timestamp implements Cloneable, Queueable { return (new Duration(JAN_1_1970.toDateTime(DateTimeZone.UTC), this.time).getMillis() % 1000) * 1000; } + @Override + public int compareTo(Object other) { + return getTime().compareTo(((Timestamp) other).getTime()); + } + @Override public Timestamp clone() throws CloneNotSupportedException { Timestamp clone = (Timestamp)super.clone(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReadManager.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReadManager.java index b682049f9..740994475 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReadManager.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReadManager.java @@ -19,6 +19,7 @@ package org.logstash.common.io; import org.logstash.DLQEntry; +import org.logstash.Timestamp; import java.io.IOException; import java.nio.file.FileSystems; @@ -55,6 +56,24 @@ public class DeadLetterQueueReadManager { segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList())); } + public void seekToNextEvent(Timestamp timestamp) throws IOException { + for (Path segment : segments) { + currentReader = new RecordIOReader(segment); + byte[] event = currentReader.seekToNextEventPosition(timestamp, (b) -> { + try { + return DLQEntry.deserialize(b).getEntryTime(); + } catch (IOException e) { + return null; + } + }, Timestamp::compareTo); + if (event != null) { + return; + } + } + currentReader.close(); + currentReader = null; + } + private long pollNewSegments(long timeout) throws IOException, InterruptedException { long startTime = System.currentTimeMillis(); WatchKey key = watchService.poll(timeout, TimeUnit.MILLISECONDS); @@ -70,7 +89,11 @@ public class DeadLetterQueueReadManager { } public DLQEntry pollEntry(long timeout) throws IOException, InterruptedException { - return DLQEntry.deserialize(pollEntryBytes(timeout)); + byte[] bytes = pollEntryBytes(timeout); + if (bytes == null) { + return null; + } + return DLQEntry.deserialize(bytes); } byte[] pollEntryBytes() throws IOException, InterruptedException { @@ -98,6 +121,19 @@ public class DeadLetterQueueReadManager { return event; } + public void setCurrentReaderAndPosition(Path segmentPath, long position) throws IOException { + currentReader = new RecordIOReader(segmentPath); + currentReader.seekToOffset(position); + } + + public Path getCurrentSegment() { + return currentReader.getPath(); + } + + public long getCurrentPosition() throws IOException { + return currentReader.getChannelPosition(); + } + public void close() throws IOException { if (currentReader != null) { currentReader.close(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriteManager.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriteManager.java index ccbc4f13a..7a1995253 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriteManager.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriteManager.java @@ -99,7 +99,7 @@ public class DeadLetterQueueWriteManager { return Files.list(path).filter((p) -> p.toString().endsWith(".log")); } - public void writeEntry(DLQEntry event) throws IOException { + public synchronized void writeEntry(DLQEntry event) throws IOException { byte[] record = event.serialize(); int eventPayloadSize = RECORD_HEADER_SIZE + record.length; if (currentQueueSize + eventPayloadSize > maxQueueSize) { @@ -112,7 +112,7 @@ public class DeadLetterQueueWriteManager { currentQueueSize += currentWriter.writeEvent(record); } - public void close() throws IOException { + public synchronized void close() throws IOException { this.lock.release(); if (currentWriter != null) { currentWriter.close(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java index f4ec14e2b..c4c1499c4 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java @@ -18,11 +18,17 @@ */ package org.logstash.common.io; +import org.logstash.Timestamp; + import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Comparator; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -39,6 +45,7 @@ public class RecordIOReader { private final ByteBuffer currentBlock; private int currentBlockSizeReadFromChannel; private final Path path; + private long channelPosition; public RecordIOReader(Path path) throws IOException { this.path = path; @@ -51,6 +58,7 @@ public class RecordIOReader { if (versionBuffer.get() != VERSION) { throw new RuntimeException("Invalid file. check version"); } + this.channelPosition = this.channel.position(); } public Path getPath() { @@ -58,9 +66,57 @@ public class RecordIOReader { } public void seekToBlock(int bid) throws IOException { + seekToOffset(bid * BLOCK_SIZE + VERSION_SIZE); + } + + public void seekToOffset(long channelOffset) throws IOException { currentBlock.rewind(); currentBlockSizeReadFromChannel = 0; - channel.position(bid * BLOCK_SIZE + VERSION_SIZE); + channel.position(channelOffset); + channelPosition = channel.position(); + } + + public byte[] seekToNextEventPosition(T target, Function keyExtractor, Comparator keyComparator) throws IOException { + int matchingBlock; + int lowBlock = 0; + int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE; + + if (highBlock == 0) { + return null; + } + + while (lowBlock < highBlock) { + int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0); + seekToBlock(middle); + T found = keyExtractor.apply(readEvent()); + int compare = keyComparator.compare(found, target); + if (compare > 0) { + highBlock = middle - 1; + } else if (compare < 0) { + lowBlock = middle; + } else { + matchingBlock = middle; + break; + } + } + matchingBlock = lowBlock; + + // now sequential scan to event + seekToBlock(matchingBlock); + int currentPosition = 0; + int compare = -1; + byte[] event = null; + while (compare < 0) { + currentPosition = currentBlock.position(); + event = readEvent(); + compare = keyComparator.compare(keyExtractor.apply(event), target); + } + currentBlock.position(currentPosition); + return event; + } + + public long getChannelPosition() throws IOException { + return channelPosition; } /** @@ -153,22 +209,30 @@ public class RecordIOReader { * @throws IOException */ public byte[] readEvent() throws IOException { - if (consumeToStartOfEvent() == false) { + try { + if (channel.isOpen() == false || consumeToStartOfEvent() == false) { + return null; + } + RecordHeader header = RecordHeader.get(currentBlock); + int cumReadSize = 0; + int bufferSize = header.getTotalEventSize().orElseGet(header::getSize); + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + getRecord(buffer, header); + cumReadSize += header.getSize(); + while (cumReadSize < bufferSize) { + maybeRollToNextBlock(); + RecordHeader nextHeader = RecordHeader.get(currentBlock); + getRecord(buffer, nextHeader); + cumReadSize += nextHeader.getSize(); + } + return buffer.array(); + } catch (ClosedByInterruptException e) { return null; + } finally { + if (channel.isOpen()) { + channelPosition = channel.position(); + } } - RecordHeader header = RecordHeader.get(currentBlock); - int cumReadSize = 0; - int bufferSize = header.getTotalEventSize().orElseGet(header::getSize); - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - getRecord(buffer, header); - cumReadSize += header.getSize(); - while (cumReadSize < bufferSize) { - maybeRollToNextBlock(); - RecordHeader nextHeader = RecordHeader.get(currentBlock); - getRecord(buffer, nextHeader); - cumReadSize += nextHeader.getSize(); - } - return buffer.array(); } public void close() throws IOException { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReadManagerTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReadManagerTest.java index e5421ea52..5f349247a 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReadManagerTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReadManagerTest.java @@ -24,9 +24,14 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.logstash.DLQEntry; +import org.logstash.Event; +import org.logstash.Timestamp; import org.logstash.ackedqueue.StringElement; +import java.io.IOException; import java.nio.file.Path; +import java.util.Collections; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -39,6 +44,10 @@ public class DeadLetterQueueReadManagerTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private static String segmentFileName(int i) { + return String.format(DeadLetterQueueWriteManager.SEGMENT_FILE_PATTERN, i); + } + @Before public void setUp() throws Exception { dir = temporaryFolder.newFolder().toPath(); @@ -49,7 +58,7 @@ public class DeadLetterQueueReadManagerTest { RecordIOWriter writer = null; for (int i = 0; i < 5; i++) { - Path segmentPath = dir.resolve(String.format(DeadLetterQueueWriteManager.SEGMENT_FILE_PATTERN, i)); + Path segmentPath = dir.resolve(segmentFileName(i)); writer = new RecordIOWriter(segmentPath); for (int j = 0; j < 10; j++) { writer.writeEvent((new StringElement("" + (i * 10 + j))).serialize()); @@ -82,7 +91,7 @@ public class DeadLetterQueueReadManagerTest { writer.close(); - Path segmentPath = dir.resolve(String.format(DeadLetterQueueWriteManager.SEGMENT_FILE_PATTERN, 5)); + Path segmentPath = dir.resolve(segmentFileName(5)); writer = new RecordIOWriter(segmentPath); for (int j = 0; j < 10; j++) { @@ -102,4 +111,27 @@ public class DeadLetterQueueReadManagerTest { manager.close(); } + + @Test + public void testSeek() throws Exception { + DeadLetterQueueWriteManager writeManager = new DeadLetterQueueWriteManager(dir, 10000000, 10000000); + Event event = new Event(Collections.emptyMap()); + Timestamp target = null; + long currentEpoch = System.currentTimeMillis(); + for (int i = 0; i < 1000; i++){ + DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); + writeManager.writeEntry(entry); + if (i == 543) { + target = entry.getEntryTime(); + } + + } + writeManager.close(); + + DeadLetterQueueReadManager readManager = new DeadLetterQueueReadManager(dir); + readManager.seekToNextEvent(target); + DLQEntry entry = readManager.pollEntry(100); + assertThat(entry.getEntryTime().toIso8601(), equalTo(target.toIso8601())); + assertThat(entry.getReason(), equalTo("543")); + } } \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/io/RecordHeaderIOWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/RecordHeaderIOWriterTest.java index 1bda688c7..e44224ac7 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/RecordHeaderIOWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/RecordHeaderIOWriterTest.java @@ -6,8 +6,11 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.logstash.ackedqueue.StringElement; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.Arrays; +import java.util.Comparator; +import java.util.function.Function; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -166,4 +169,22 @@ public class RecordHeaderIOWriterTest { writer.close(); reader.close(); } + + @Test + public void testFind() throws Exception { + + RecordIOWriter writer = new RecordIOWriter(file); + RecordIOReader reader = new RecordIOReader(file); + ByteBuffer intBuffer = ByteBuffer.wrap(new byte[4]); + for (int i = 0; i < 20000; i++) { + intBuffer.rewind(); + intBuffer.putInt(i); + writer.writeEvent(intBuffer.array()); + } + + Function toInt = (b) -> ByteBuffer.wrap(b).getInt(); + reader.seekToNextEventPosition(34, toInt, (o1, o2) -> ((Integer) o1).compareTo((Integer) o2)); + int nextVal = (int) toInt.apply(reader.readEvent()); + assertThat(nextVal, equalTo(34)); + } } \ No newline at end of file