mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Make the test for the write WrappedWriteClient more robust
Instead of using sleep we start two threads and we join them to do the expectation. Fixes: #7031 #7032 Fixes #7103
This commit is contained in:
parent
55ad9850c8
commit
8707953d2c
1 changed files with 32 additions and 17 deletions
|
@ -18,39 +18,54 @@ describe LogStash::Instrument::WrappedWriteClient do
|
|||
|
||||
subject { described_class.new(write_client, pipeline, metric, plugin) }
|
||||
|
||||
def threaded_read_client(read_client)
|
||||
Thread.new(read_client) do |_read_client|
|
||||
started_at = Time.now
|
||||
|
||||
batch_size = 0
|
||||
loop {
|
||||
if Time.now - started_at > 60
|
||||
raise "Took too much time to read from the queue"
|
||||
end
|
||||
batch_size = _read_client.read_batch.size
|
||||
|
||||
break if batch_size > 0
|
||||
}
|
||||
expect(batch_size).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
shared_examples "queue tests" do
|
||||
it "pushes single event to the `WriteClient`" do
|
||||
t = Thread.new do
|
||||
subject.push(event)
|
||||
pusher_thread = Thread.new(subject, event) do |_subject, _event|
|
||||
_subject.push(event)
|
||||
end
|
||||
sleep(0.01) while !t.status
|
||||
expect(read_client.read_batch.size).to eq(1)
|
||||
t.kill rescue nil
|
||||
|
||||
reader_thread = threaded_read_client(read_client)
|
||||
|
||||
[pusher_thread, reader_thread].collect(&:join)
|
||||
end
|
||||
|
||||
it "pushes batch to the `WriteClient`" do
|
||||
batch = write_client.get_new_batch
|
||||
batch << event
|
||||
|
||||
t = Thread.new do
|
||||
subject.push_batch(batch)
|
||||
pusher_thread = Thread.new(subject, batch) do |_subject, _batch|
|
||||
subject.push_batch(_batch)
|
||||
end
|
||||
|
||||
sleep(0.01) while !t.status
|
||||
expect(read_client.read_batch.size).to eq(1)
|
||||
t.kill rescue nil
|
||||
reader_thread = threaded_read_client(read_client)
|
||||
[pusher_thread, reader_thread].collect(&:join)
|
||||
end
|
||||
|
||||
context "recorded metrics" do
|
||||
before do
|
||||
t = Thread.new do
|
||||
subject.push(event)
|
||||
pusher_thread = Thread.new(subject, event) do |_subject, _event|
|
||||
_subject.push(event)
|
||||
end
|
||||
sleep(0.01) while !t.status
|
||||
sleep(0.250) # make it block for some time, so duration isn't 0
|
||||
read_client.read_batch.size
|
||||
t.kill rescue nil
|
||||
|
||||
reader_thread = threaded_read_client(read_client)
|
||||
[pusher_thread, reader_thread].collect(&:join)
|
||||
end
|
||||
|
||||
let(:snapshot_store) { collector.snapshot_metric.metric_store }
|
||||
|
@ -97,7 +112,7 @@ describe LogStash::Instrument::WrappedWriteClient do
|
|||
end
|
||||
|
||||
context "AckedMemoryQueue" do
|
||||
let(:queue) { LogStash::Util::WrappedAckedQueue.create_memory_based("", 1024, 10, 1024) }
|
||||
let(:queue) { LogStash::Util::WrappedAckedQueue.create_memory_based("", 1024, 10, 4096) }
|
||||
|
||||
before do
|
||||
read_client.set_events_metric(metric.namespace([:stats, :events]))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue