mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
c307b18ff0
commit
10007948c8
4 changed files with 9 additions and 29 deletions
|
@ -1,18 +0,0 @@
|
|||
# encoding: utf-8
|
||||
require "spec_helper"
|
||||
require "logstash/util/wrapped_acked_queue"
|
||||
require "logstash/instrument/collector"
|
||||
|
||||
describe LogStash::Util::WrappedAckedQueue do
|
||||
let(:path) {Stud::Temporary.directory}
|
||||
let(:queue_capacity) {1024 ** 2}
|
||||
let(:queue) do
|
||||
described_class.create_file_based(path, queue_capacity / 2, 0, 1024, 1024, 1024, queue_capacity)
|
||||
end
|
||||
|
||||
context "ReadClient #empty?" do
|
||||
it "returns true for an empty queue" do
|
||||
expect(queue.read_client.empty?).to be_truthy
|
||||
end
|
||||
end
|
||||
end
|
|
@ -76,8 +76,7 @@ public abstract class Page implements Closeable {
|
|||
// TODO: it should be something similar to this when we use a proper bitset class like ES
|
||||
// this.ackedSeqNum.firstUnackedBit >= this.elementCount;
|
||||
// TODO: for now use a naive & inefficient mechanism with a simple Bitset
|
||||
return this.elementCount == 0 ||
|
||||
this.elementCount > 0 && this.ackedSeqNums.cardinality() >= this.elementCount;
|
||||
return this.elementCount > 0 && this.ackedSeqNums.cardinality() >= this.elementCount;
|
||||
}
|
||||
|
||||
public long unreadCount() {
|
||||
|
|
|
@ -23,7 +23,7 @@ public class HeadPageTest {
|
|||
try(final HeadPage p = new HeadPage(0, q, pageIO)) {
|
||||
assertThat(p.getPageNum(), is(equalTo(0)));
|
||||
assertThat(p.isFullyRead(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(true));
|
||||
assertThat(p.isFullyAcked(), is(false));
|
||||
assertThat(p.hasSpace(10), is(true));
|
||||
assertThat(p.hasSpace(100), is(false));
|
||||
}
|
||||
|
|
|
@ -81,16 +81,15 @@ public class QueueTest {
|
|||
public void singleWriteMultiRead() throws IOException {
|
||||
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {
|
||||
q.open();
|
||||
assertThat(q.isFullyAcked(), is(true));
|
||||
|
||||
Queueable element = new StringElement("foobarbaz");
|
||||
q.write(element);
|
||||
try (Batch b = q.nonBlockReadBatch(2)) {
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
assertThat(b.getElements().get(0).toString(), is(element.toString()));
|
||||
assertThat(q.nonBlockReadBatch(2), nullValue());
|
||||
assertThat(q.isFullyAcked(), is(false));
|
||||
}
|
||||
assertThat(q.isFullyAcked(), is(true));
|
||||
|
||||
Batch b = q.nonBlockReadBatch(2);
|
||||
|
||||
assertThat(b.getElements().size(), is(1));
|
||||
assertThat(b.getElements().get(0).toString(), is(element.toString()));
|
||||
assertThat(q.nonBlockReadBatch(2), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue