Fixes #6817
This commit is contained in:
Tal Levy 2017-03-28 16:01:31 -07:00
parent 06655739c3
commit fdab7e41b8
7 changed files with 202 additions and 56 deletions

View file

@ -58,17 +58,12 @@ public class DLQEntry implements Cloneable, Serializable, Queueable {
+ pluginTypeBytes.length
+ pluginIdBytes.length
+ reasonBytes.length
+ (Integer.BYTES * 5));
buffer.putInt(entryTimeInBytes.length);
buffer.put(entryTimeInBytes);
buffer.putInt(eventInBytes.length);
buffer.put(eventInBytes);
buffer.putInt(pluginTypeBytes.length);
buffer.put(pluginTypeBytes);
buffer.putInt(pluginIdBytes.length);
buffer.put(pluginIdBytes);
buffer.putInt(reasonBytes.length);
buffer.put(reasonBytes);
+ (Integer.BYTES * 5)); // magic number represents the five byte[] + lengths
putLengthAndBytes(buffer, entryTimeInBytes);
putLengthAndBytes(buffer, eventInBytes);
putLengthAndBytes(buffer, pluginTypeBytes);
putLengthAndBytes(buffer, pluginIdBytes);
putLengthAndBytes(buffer, reasonBytes);
return buffer.array();
}
@ -77,34 +72,27 @@ public class DLQEntry implements Cloneable, Serializable, Queueable {
buffer.put(bytes);
buffer.position(0);
int entryTimeLength = buffer.getInt();
byte[] entryTimeBytes = new byte[entryTimeLength];
buffer.get(entryTimeBytes);
Timestamp entryTime = new Timestamp(new String(entryTimeBytes));
int eventLength = buffer.getInt();
byte[] eventBytes = new byte[eventLength];
buffer.get(eventBytes);
Event event = Event.deserialize(eventBytes);
int pluginTypeLength = buffer.getInt();
byte[] pluginTypeBytes = new byte[pluginTypeLength];
buffer.get(pluginTypeBytes);
String pluginType = new String(pluginTypeBytes);
int pluginIdLength = buffer.getInt();
byte[] pluginIdBytes = new byte[pluginIdLength];
buffer.get(pluginIdBytes);
String pluginId = new String(pluginIdBytes);
int reasonLength = buffer.getInt();
byte[] reasonBytes = new byte[reasonLength];
buffer.get(reasonBytes);
String reason = new String(reasonBytes);
Timestamp entryTime = new Timestamp(new String(getLengthPrefixedBytes(buffer)));
Event event = Event.deserialize(getLengthPrefixedBytes(buffer));
String pluginType = new String(getLengthPrefixedBytes(buffer));
String pluginId = new String(getLengthPrefixedBytes(buffer));
String reason = new String(getLengthPrefixedBytes(buffer));
return new DLQEntry(event, pluginType, pluginId, reason, entryTime);
}
private static void putLengthAndBytes(ByteBuffer buffer, byte[] bytes) {
buffer.putInt(bytes.length);
buffer.put(bytes);
}
private static byte[] getLengthPrefixedBytes(ByteBuffer buffer) {
int length = buffer.getInt();
byte[] bytes = new byte[length];
buffer.get(bytes);
return bytes;
}
public Event getEvent() {
return event;
}

View file

@ -13,7 +13,7 @@ import java.io.IOException;
import java.util.Date;
@JsonSerialize(using = org.logstash.json.TimestampSerializer.class)
public class Timestamp implements Cloneable, Queueable {
public class Timestamp implements Cloneable, Comparable, Queueable {
// all methods setting the time object must set it in the UTC timezone
private DateTime time;
@ -77,6 +77,11 @@ public class Timestamp implements Cloneable, Queueable {
return (new Duration(JAN_1_1970.toDateTime(DateTimeZone.UTC), this.time).getMillis() % 1000) * 1000;
}
@Override
public int compareTo(Object other) {
return getTime().compareTo(((Timestamp) other).getTime());
}
@Override
public Timestamp clone() throws CloneNotSupportedException {
Timestamp clone = (Timestamp)super.clone();

View file

@ -19,6 +19,7 @@
package org.logstash.common.io;
import org.logstash.DLQEntry;
import org.logstash.Timestamp;
import java.io.IOException;
import java.nio.file.FileSystems;
@ -55,6 +56,24 @@ public class DeadLetterQueueReadManager {
segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
}
public void seekToNextEvent(Timestamp timestamp) throws IOException {
for (Path segment : segments) {
currentReader = new RecordIOReader(segment);
byte[] event = currentReader.seekToNextEventPosition(timestamp, (b) -> {
try {
return DLQEntry.deserialize(b).getEntryTime();
} catch (IOException e) {
return null;
}
}, Timestamp::compareTo);
if (event != null) {
return;
}
}
currentReader.close();
currentReader = null;
}
private long pollNewSegments(long timeout) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
WatchKey key = watchService.poll(timeout, TimeUnit.MILLISECONDS);
@ -70,7 +89,11 @@ public class DeadLetterQueueReadManager {
}
public DLQEntry pollEntry(long timeout) throws IOException, InterruptedException {
return DLQEntry.deserialize(pollEntryBytes(timeout));
byte[] bytes = pollEntryBytes(timeout);
if (bytes == null) {
return null;
}
return DLQEntry.deserialize(bytes);
}
byte[] pollEntryBytes() throws IOException, InterruptedException {
@ -98,6 +121,19 @@ public class DeadLetterQueueReadManager {
return event;
}
public void setCurrentReaderAndPosition(Path segmentPath, long position) throws IOException {
currentReader = new RecordIOReader(segmentPath);
currentReader.seekToOffset(position);
}
public Path getCurrentSegment() {
return currentReader.getPath();
}
public long getCurrentPosition() throws IOException {
return currentReader.getChannelPosition();
}
public void close() throws IOException {
if (currentReader != null) {
currentReader.close();

View file

@ -99,7 +99,7 @@ public class DeadLetterQueueWriteManager {
return Files.list(path).filter((p) -> p.toString().endsWith(".log"));
}
public void writeEntry(DLQEntry event) throws IOException {
public synchronized void writeEntry(DLQEntry event) throws IOException {
byte[] record = event.serialize();
int eventPayloadSize = RECORD_HEADER_SIZE + record.length;
if (currentQueueSize + eventPayloadSize > maxQueueSize) {
@ -112,7 +112,7 @@ public class DeadLetterQueueWriteManager {
currentQueueSize += currentWriter.writeEvent(record);
}
public void close() throws IOException {
public synchronized void close() throws IOException {
this.lock.release();
if (currentWriter != null) {
currentWriter.close();

View file

@ -18,11 +18,17 @@
*/
package org.logstash.common.io;
import org.logstash.Timestamp;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Comparator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@ -39,6 +45,7 @@ public class RecordIOReader {
private final ByteBuffer currentBlock;
private int currentBlockSizeReadFromChannel;
private final Path path;
private long channelPosition;
public RecordIOReader(Path path) throws IOException {
this.path = path;
@ -51,6 +58,7 @@ public class RecordIOReader {
if (versionBuffer.get() != VERSION) {
throw new RuntimeException("Invalid file. check version");
}
this.channelPosition = this.channel.position();
}
public Path getPath() {
@ -58,9 +66,57 @@ public class RecordIOReader {
}
public void seekToBlock(int bid) throws IOException {
seekToOffset(bid * BLOCK_SIZE + VERSION_SIZE);
}
public void seekToOffset(long channelOffset) throws IOException {
currentBlock.rewind();
currentBlockSizeReadFromChannel = 0;
channel.position(bid * BLOCK_SIZE + VERSION_SIZE);
channel.position(channelOffset);
channelPosition = channel.position();
}
public <T> byte[] seekToNextEventPosition(T target, Function<byte[], T> keyExtractor, Comparator<T> keyComparator) throws IOException {
int matchingBlock;
int lowBlock = 0;
int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE;
if (highBlock == 0) {
return null;
}
while (lowBlock < highBlock) {
int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0);
seekToBlock(middle);
T found = keyExtractor.apply(readEvent());
int compare = keyComparator.compare(found, target);
if (compare > 0) {
highBlock = middle - 1;
} else if (compare < 0) {
lowBlock = middle;
} else {
matchingBlock = middle;
break;
}
}
matchingBlock = lowBlock;
// now sequential scan to event
seekToBlock(matchingBlock);
int currentPosition = 0;
int compare = -1;
byte[] event = null;
while (compare < 0) {
currentPosition = currentBlock.position();
event = readEvent();
compare = keyComparator.compare(keyExtractor.apply(event), target);
}
currentBlock.position(currentPosition);
return event;
}
public long getChannelPosition() throws IOException {
return channelPosition;
}
/**
@ -153,22 +209,30 @@ public class RecordIOReader {
* @throws IOException
*/
public byte[] readEvent() throws IOException {
if (consumeToStartOfEvent() == false) {
try {
if (channel.isOpen() == false || consumeToStartOfEvent() == false) {
return null;
}
RecordHeader header = RecordHeader.get(currentBlock);
int cumReadSize = 0;
int bufferSize = header.getTotalEventSize().orElseGet(header::getSize);
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
getRecord(buffer, header);
cumReadSize += header.getSize();
while (cumReadSize < bufferSize) {
maybeRollToNextBlock();
RecordHeader nextHeader = RecordHeader.get(currentBlock);
getRecord(buffer, nextHeader);
cumReadSize += nextHeader.getSize();
}
return buffer.array();
} catch (ClosedByInterruptException e) {
return null;
} finally {
if (channel.isOpen()) {
channelPosition = channel.position();
}
}
RecordHeader header = RecordHeader.get(currentBlock);
int cumReadSize = 0;
int bufferSize = header.getTotalEventSize().orElseGet(header::getSize);
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
getRecord(buffer, header);
cumReadSize += header.getSize();
while (cumReadSize < bufferSize) {
maybeRollToNextBlock();
RecordHeader nextHeader = RecordHeader.get(currentBlock);
getRecord(buffer, nextHeader);
cumReadSize += nextHeader.getSize();
}
return buffer.array();
}
public void close() throws IOException {

View file

@ -24,9 +24,14 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.DLQEntry;
import org.logstash.Event;
import org.logstash.Timestamp;
import org.logstash.ackedqueue.StringElement;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -39,6 +44,10 @@ public class DeadLetterQueueReadManagerTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private static String segmentFileName(int i) {
return String.format(DeadLetterQueueWriteManager.SEGMENT_FILE_PATTERN, i);
}
@Before
public void setUp() throws Exception {
dir = temporaryFolder.newFolder().toPath();
@ -49,7 +58,7 @@ public class DeadLetterQueueReadManagerTest {
RecordIOWriter writer = null;
for (int i = 0; i < 5; i++) {
Path segmentPath = dir.resolve(String.format(DeadLetterQueueWriteManager.SEGMENT_FILE_PATTERN, i));
Path segmentPath = dir.resolve(segmentFileName(i));
writer = new RecordIOWriter(segmentPath);
for (int j = 0; j < 10; j++) {
writer.writeEvent((new StringElement("" + (i * 10 + j))).serialize());
@ -82,7 +91,7 @@ public class DeadLetterQueueReadManagerTest {
writer.close();
Path segmentPath = dir.resolve(String.format(DeadLetterQueueWriteManager.SEGMENT_FILE_PATTERN, 5));
Path segmentPath = dir.resolve(segmentFileName(5));
writer = new RecordIOWriter(segmentPath);
for (int j = 0; j < 10; j++) {
@ -102,4 +111,27 @@ public class DeadLetterQueueReadManagerTest {
manager.close();
}
@Test
public void testSeek() throws Exception {
DeadLetterQueueWriteManager writeManager = new DeadLetterQueueWriteManager(dir, 10000000, 10000000);
Event event = new Event(Collections.emptyMap());
Timestamp target = null;
long currentEpoch = System.currentTimeMillis();
for (int i = 0; i < 1000; i++){
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
writeManager.writeEntry(entry);
if (i == 543) {
target = entry.getEntryTime();
}
}
writeManager.close();
DeadLetterQueueReadManager readManager = new DeadLetterQueueReadManager(dir);
readManager.seekToNextEvent(target);
DLQEntry entry = readManager.pollEntry(100);
assertThat(entry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
assertThat(entry.getReason(), equalTo("543"));
}
}

View file

@ -6,8 +6,11 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.StringElement;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -166,4 +169,22 @@ public class RecordHeaderIOWriterTest {
writer.close();
reader.close();
}
@Test
public void testFind() throws Exception {
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
ByteBuffer intBuffer = ByteBuffer.wrap(new byte[4]);
for (int i = 0; i < 20000; i++) {
intBuffer.rewind();
intBuffer.putInt(i);
writer.writeEvent(intBuffer.array());
}
Function<byte[], Object> toInt = (b) -> ByteBuffer.wrap(b).getInt();
reader.seekToNextEventPosition(34, toInt, (o1, o2) -> ((Integer) o1).compareTo((Integer) o2));
int nextVal = (int) toInt.apply(reader.readEvent());
assertThat(nextVal, equalTo(34));
}
}