mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
bef5994dd1
commit
a97611abaa
10 changed files with 102 additions and 19 deletions
|
@ -4,16 +4,17 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
|
||||
public class Batch implements Closeable {
|
||||
|
||||
private final List<Queueable> elements;
|
||||
|
||||
private final List<Long> seqNums;
|
||||
private final LongVector seqNums;
|
||||
private final Queue queue;
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
public Batch(List<Queueable> elements, List<Long> seqNums, Queue q) {
|
||||
public Batch(List<Queueable> elements, LongVector seqNums, Queue q) {
|
||||
this.elements = elements;
|
||||
this.seqNums = seqNums;
|
||||
this.queue = q;
|
||||
|
@ -38,7 +39,7 @@ public class Batch implements Closeable {
|
|||
return elements;
|
||||
}
|
||||
|
||||
public List<Long> getSeqNums() { return this.seqNums; }
|
||||
public LongVector getSeqNums() { return this.seqNums; }
|
||||
|
||||
public Queue getQueue() {
|
||||
return queue;
|
||||
|
|
|
@ -5,6 +5,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.List;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
|
||||
public abstract class Page implements Closeable {
|
||||
|
@ -102,8 +103,10 @@ public abstract class Page implements Closeable {
|
|||
//
|
||||
// @param seqNums the list of same-page seqNums to ack
|
||||
// @param checkpointMaxAcks the number of acks that will trigger a page checkpoint
|
||||
public void ack(List<Long> seqNums, int checkpointMaxAcks) throws IOException {
|
||||
for (long seqNum : seqNums) {
|
||||
public void ack(LongVector seqNums, int checkpointMaxAcks) throws IOException {
|
||||
final int count = seqNums.size();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
final long seqNum = seqNums.get(i);
|
||||
// TODO: eventually refactor to use new bit handling class
|
||||
|
||||
assert seqNum >= this.minSeqNum :
|
||||
|
|
|
@ -5,6 +5,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.logstash.FileLockFactory;
|
||||
import org.logstash.LockException;
|
||||
import org.logstash.ackedqueue.io.CheckpointIO;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
import org.logstash.ackedqueue.io.PageIO;
|
||||
import org.logstash.ackedqueue.io.PageIOFactory;
|
||||
|
||||
|
@ -623,7 +624,7 @@ public class Queue implements Closeable {
|
|||
// same-page elements. A fully acked page will trigger a checkpoint for that page. Also if a page has more than checkpointMaxAcks
|
||||
// acks since last checkpoint it will also trigger a checkpoint.
|
||||
// @param seqNums the list of same-page sequence numbers to ack
|
||||
public void ack(List<Long> seqNums) throws IOException {
|
||||
public void ack(LongVector seqNums) throws IOException {
|
||||
// as a first implementation we assume that all batches are created from the same page
|
||||
// so we will avoid multi pages acking here for now
|
||||
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import java.util.List;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
|
||||
public class SequencedList<E> {
|
||||
private final List<E> elements;
|
||||
private final List<Long> seqNums;
|
||||
private final LongVector seqNums;
|
||||
|
||||
public SequencedList(List<E> elements, List<Long> seqNums) {
|
||||
public SequencedList(List<E> elements, LongVector seqNums) {
|
||||
this.elements = elements;
|
||||
this.seqNums = seqNums;
|
||||
}
|
||||
|
@ -15,7 +16,7 @@ public class SequencedList<E> {
|
|||
return elements;
|
||||
}
|
||||
|
||||
public List<Long> getSeqNums() {
|
||||
public LongVector getSeqNums() {
|
||||
return seqNums;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +1,25 @@
|
|||
package org.logstash.ackedqueue.ext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.RubyModule;
|
||||
import org.jruby.RubyObject;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ObjectAllocator;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.jruby.runtime.load.Library;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ackedqueue.Batch;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.ackedqueue.Queueable;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
import java.io.IOException;
|
||||
|
||||
public final class JrubyAckedBatchExtLibrary implements Library {
|
||||
|
||||
|
@ -62,8 +64,12 @@ public final class JrubyAckedBatchExtLibrary implements Library {
|
|||
if (! (queue instanceof JrubyAckedQueueExtLibrary.RubyAckedQueue)) {
|
||||
context.runtime.newArgumentError("expected queue AckedQueue");
|
||||
}
|
||||
|
||||
this.batch = new Batch((List<Queueable>) events, (List<Long>) seqNums, ((JrubyAckedQueueExtLibrary.RubyAckedQueue)queue).getQueue());
|
||||
final Collection<Long> seqList = (List<Long>) seqNums;
|
||||
final LongVector seqs = new LongVector(seqList.size());
|
||||
for (final long seq : seqList) {
|
||||
seqs.add(seq);
|
||||
}
|
||||
this.batch = new Batch((List<Queueable>) events, seqs, ((JrubyAckedQueueExtLibrary.RubyAckedQueue)queue).getQueue());
|
||||
|
||||
return context.nil;
|
||||
}
|
||||
|
|
|
@ -224,7 +224,7 @@ public abstract class AbstractByteBufferPageIO implements PageIO {
|
|||
String.format("seqNum=%d is > maxSeqNum=%d", seqNum, maxSeqNum());
|
||||
|
||||
List<byte[]> elements = new ArrayList<>();
|
||||
List<Long> seqNums = new ArrayList<>();
|
||||
final LongVector seqNums = new LongVector(limit);
|
||||
|
||||
int offset = this.offsetMap.get((int)(seqNum - this.minSeqNum));
|
||||
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
package org.logstash.ackedqueue.io;
|
||||
|
||||
public final class LongVector {
|
||||
|
||||
private int count;
|
||||
|
||||
private long[] data;
|
||||
|
||||
public LongVector(final int size) {
|
||||
data = new long[size];
|
||||
count = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the {@code long} to the underlying {@code long[]}, resizing it if necessary.
|
||||
* @param num Long to store
|
||||
*/
|
||||
public void add(final long num) {
|
||||
if (data.length < count + 1) {
|
||||
final long[] old = data;
|
||||
data = new long[(data.length << 1) + 1];
|
||||
System.arraycopy(old, 0, data, 0, old.length);
|
||||
}
|
||||
data[count++] = num;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get value stored at given index.
|
||||
* @param index Array index (only values smaller than {@link LongVector#count} are valid)
|
||||
* @return Int
|
||||
*/
|
||||
public long get(final int index) {
|
||||
return data[index];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Number of elements stored in this instance
|
||||
*/
|
||||
public int size() {
|
||||
return count;
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package org.logstash.ackedqueue.io.wip;
|
|||
import java.util.Collections;
|
||||
import org.logstash.ackedqueue.Checkpoint;
|
||||
import org.logstash.ackedqueue.SequencedList;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
import org.logstash.common.io.BufferedChecksumStreamInput;
|
||||
import org.logstash.common.io.BufferedChecksumStreamOutput;
|
||||
import org.logstash.common.io.ByteArrayStreamOutput;
|
||||
|
@ -143,7 +144,7 @@ public class MemoryPageIOStream implements PageIO {
|
|||
@Override
|
||||
public SequencedList<byte[]> read(long seqNum, int limit) throws IOException {
|
||||
if (elementCount == 0) {
|
||||
return new SequencedList<>(Collections.emptyList(), Collections.emptyList());
|
||||
return new SequencedList<>(Collections.emptyList(), new LongVector(0));
|
||||
}
|
||||
setReadPoint(seqNum);
|
||||
return read(limit);
|
||||
|
@ -257,7 +258,7 @@ public class MemoryPageIOStream implements PageIO {
|
|||
private SequencedList<byte[]> read(int limit) throws IOException {
|
||||
int upto = available(limit);
|
||||
List<byte[]> elements = new ArrayList<>(upto);
|
||||
List<Long> seqNums = new ArrayList<>(upto);
|
||||
final LongVector seqNums = new LongVector(upto);
|
||||
for (int i = 0; i < upto; i++) {
|
||||
long seqNum = readSeqNum();
|
||||
byte[] data = readData();
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.logstash.ackedqueue.io.AbstractByteBufferPageIO;
|
||||
import org.logstash.ackedqueue.io.LongVector;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
@ -637,7 +638,9 @@ public class QueueTest {
|
|||
assertThat(b.getElements().get(0), is(element2));
|
||||
assertThat(b.getElements().get(1), is(element3));
|
||||
|
||||
q.ack(Collections.singletonList(firstSeqNum));
|
||||
final LongVector seqs = new LongVector(1);
|
||||
seqs.add(firstSeqNum);
|
||||
q.ack(seqs);
|
||||
}
|
||||
|
||||
try(Queue q = new Queue(settings)) {
|
||||
|
@ -646,7 +649,10 @@ public class QueueTest {
|
|||
b = q.nonBlockReadBatch(2);
|
||||
assertThat(b.getElements().size(), is(2));
|
||||
|
||||
q.ack(Arrays.asList(secondSeqNum, thirdSeqNum));
|
||||
final LongVector seqs = new LongVector(2);
|
||||
seqs.add(secondSeqNum);
|
||||
seqs.add(thirdSeqNum);
|
||||
q.ack(seqs);
|
||||
|
||||
assertThat(q.getAckedCount(), equalTo(0L));
|
||||
assertThat(q.getUnackedCount(), equalTo(0L));
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package org.logstash.ackedqueue.io;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
public class LongVectorTest {
|
||||
|
||||
@Test
|
||||
public void storesAndResizes() {
|
||||
final int count = 10_000;
|
||||
final LongVector vector = new LongVector(1000);
|
||||
for (long i = 0L; i < count; ++i) {
|
||||
vector.add(i);
|
||||
}
|
||||
assertThat(vector.size(), is(count));
|
||||
for (int i = 0; i < count; ++i) {
|
||||
assertThat((long) i, is(vector.get(i)));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue