mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
b595b54362
commit
b3c4f343f6
2 changed files with 15 additions and 14 deletions
|
@ -1,12 +1,11 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
|
||||
public abstract class Page implements Closeable {
|
||||
protected final int pageNum;
|
||||
|
@ -54,16 +53,18 @@ public abstract class Page implements Closeable {
|
|||
this.pageIO.activate();
|
||||
|
||||
SequencedList<byte[]> serialized = this.pageIO.read(this.firstUnreadSeqNum, limit);
|
||||
List<Queueable> deserialized = serialized.getElements().stream().map(e -> this.queue.deserialize(e)).collect(Collectors.toList());
|
||||
|
||||
List<byte[]> elements = serialized.getElements();
|
||||
final int count = elements.size();
|
||||
List<Queueable> deserialized = new ArrayList<>(count);
|
||||
for (final byte[] element : elements) {
|
||||
deserialized.add(this.queue.deserialize(element));
|
||||
}
|
||||
assert serialized.getSeqNums().get(0) == this.firstUnreadSeqNum :
|
||||
String.format("firstUnreadSeqNum=%d != first result seqNum=%d", this.firstUnreadSeqNum, serialized.getSeqNums().get(0));
|
||||
|
||||
Batch batch = new Batch(deserialized, serialized.getSeqNums(), this.queue);
|
||||
this.firstUnreadSeqNum += count;
|
||||
|
||||
this.firstUnreadSeqNum += deserialized.size();
|
||||
|
||||
return batch;
|
||||
return new Batch(deserialized, serialized.getSeqNums(), this.queue);
|
||||
}
|
||||
|
||||
public boolean isFullyRead() {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package org.logstash.ackedqueue.io.wip;
|
||||
|
||||
import java.util.Collections;
|
||||
import org.logstash.ackedqueue.Checkpoint;
|
||||
import org.logstash.ackedqueue.SequencedList;
|
||||
import org.logstash.common.io.BufferedChecksumStreamInput;
|
||||
|
@ -142,7 +143,7 @@ public class MemoryPageIOStream implements PageIO {
|
|||
@Override
|
||||
public SequencedList<byte[]> read(long seqNum, int limit) throws IOException {
|
||||
if (elementCount == 0) {
|
||||
return new SequencedList<>(new ArrayList<>(), new ArrayList<>());
|
||||
return new SequencedList<>(Collections.emptyList(), Collections.emptyList());
|
||||
}
|
||||
setReadPoint(seqNum);
|
||||
return read(limit);
|
||||
|
@ -215,7 +216,7 @@ public class MemoryPageIOStream implements PageIO {
|
|||
return details;
|
||||
}
|
||||
|
||||
private void setReadPoint(long seqNum) throws IOException {
|
||||
private void setReadPoint(long seqNum) {
|
||||
int readPosition = offsetMap.get(calcRelativeSeqNum(seqNum));
|
||||
streamedInput.movePosition(readPosition);
|
||||
}
|
||||
|
@ -254,10 +255,9 @@ public class MemoryPageIOStream implements PageIO {
|
|||
}
|
||||
|
||||
private SequencedList<byte[]> read(int limit) throws IOException {
|
||||
List<byte[]> elements = new ArrayList<>();
|
||||
List<Long> seqNums = new ArrayList<>();
|
||||
|
||||
int upto = available(limit);
|
||||
List<byte[]> elements = new ArrayList<>(upto);
|
||||
List<Long> seqNums = new ArrayList<>(upto);
|
||||
for (int i = 0; i < upto; i++) {
|
||||
long seqNum = readSeqNum();
|
||||
byte[] data = readData();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue