mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
DLQ: Fix reading while writing boundary issues
Fixes an issue where if the DLQ is actively being read and written to, the consumeBlock method would write data to the position where the data was last read from, rather than written to, leading to plugin crashes. Fixes #8024
This commit is contained in:
parent
5b71afcdb9
commit
9f64fccef9
2 changed files with 31 additions and 4 deletions
|
@ -137,10 +137,15 @@ public final class RecordIOReader implements Closeable {
|
|||
// already read enough, no need to read more
|
||||
return;
|
||||
}
|
||||
int originalPosition = currentBlock.position();
|
||||
int read = channel.read(currentBlock);
|
||||
currentBlockSizeReadFromChannel += (read > 0) ? read : 0;
|
||||
currentBlock.position(originalPosition);
|
||||
int processedPosition = currentBlock.position();
|
||||
try {
|
||||
// Move to last written to position
|
||||
currentBlock.position(currentBlockSizeReadFromChannel);
|
||||
channel.read(currentBlock);
|
||||
currentBlockSizeReadFromChannel = currentBlock.position();
|
||||
} finally {
|
||||
currentBlock.position(processedPosition);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -99,6 +99,28 @@ public class RecordIOWriterTest {
|
|||
reader.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWhileWriteAcrossBoundary() throws Exception {
|
||||
char[] tooBig = fillArray( BLOCK_SIZE/4);
|
||||
StringElement input = new StringElement(new String(tooBig));
|
||||
byte[] inputSerialized = input.serialize();
|
||||
try(RecordIOWriter writer = new RecordIOWriter(file);
|
||||
RecordIOReader reader = new RecordIOReader(file)){
|
||||
|
||||
for (int j = 0; j < 2; j++) {
|
||||
writer.writeEvent(inputSerialized);
|
||||
}
|
||||
assertThat(reader.readEvent(), equalTo(inputSerialized));
|
||||
for (int j = 0; j < 2; j++) {
|
||||
writer.writeEvent(inputSerialized);
|
||||
}
|
||||
for (int j = 0; j < 3; j++) {
|
||||
assertThat(reader.readEvent(), equalTo(inputSerialized));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private char[] fillArray(final int fillSize) {
|
||||
char[] blockSize= new char[fillSize];
|
||||
Arrays.fill(blockSize, 'e');
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue