mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
MINOR: Remove unused take and timed offer from queue wrappers
Fixes #7782
This commit is contained in:
parent
d9436dd760
commit
bff61d59fe
3 changed files with 0 additions and 55 deletions
|
@ -57,24 +57,6 @@ module LogStash; module Util
|
||||||
end
|
end
|
||||||
alias_method(:<<, :push)
|
alias_method(:<<, :push)
|
||||||
|
|
||||||
# TODO - fix doc for this noop method
|
|
||||||
# Offer an object to the queue, wait for the specified amount of time.
|
|
||||||
# If adding to the queue was successful it will return true, false otherwise.
|
|
||||||
#
|
|
||||||
# @param [Object] Object to add to the queue
|
|
||||||
# @param [Integer] Time in milliseconds to wait before giving up
|
|
||||||
# @return [Boolean] True if adding was successful if not it return false
|
|
||||||
def offer(obj, timeout_ms)
|
|
||||||
raise NotImplementedError.new("The offer method is not implemented. There is no non blocking write operation yet.")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Blocking
|
|
||||||
def take
|
|
||||||
check_closed("read a batch")
|
|
||||||
# TODO - determine better arbitrary timeout millis
|
|
||||||
@queue.read_batch(1, 200).get_elements.first
|
|
||||||
end
|
|
||||||
|
|
||||||
# Block for X millis
|
# Block for X millis
|
||||||
def poll(millis)
|
def poll(millis)
|
||||||
check_closed("read")
|
check_closed("read")
|
||||||
|
|
|
@ -18,21 +18,6 @@ module LogStash; module Util
|
||||||
end
|
end
|
||||||
alias_method(:<<, :push)
|
alias_method(:<<, :push)
|
||||||
|
|
||||||
# Offer an object to the queue, wait for the specified amount of time.
|
|
||||||
# If adding to the queue was successful it wil return true, false otherwise.
|
|
||||||
#
|
|
||||||
# @param [Object] Object to add to the queue
|
|
||||||
# @param [Integer] Time in milliseconds to wait before giving up
|
|
||||||
# @return [Boolean] True if adding was successful if not it return false
|
|
||||||
def offer(obj, timeout_ms)
|
|
||||||
@queue.offer(obj, timeout_ms, TimeUnit::MILLISECONDS)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Blocking
|
|
||||||
def take
|
|
||||||
@queue.take
|
|
||||||
end
|
|
||||||
|
|
||||||
# Block for X millis
|
# Block for X millis
|
||||||
def poll(millis)
|
def poll(millis)
|
||||||
@queue.poll(millis, TimeUnit::MILLISECONDS)
|
@queue.poll(millis, TimeUnit::MILLISECONDS)
|
||||||
|
|
|
@ -4,28 +4,6 @@ require "logstash/util/wrapped_synchronous_queue"
|
||||||
require "logstash/instrument/collector"
|
require "logstash/instrument/collector"
|
||||||
|
|
||||||
describe LogStash::Util::WrappedSynchronousQueue do
|
describe LogStash::Util::WrappedSynchronousQueue do
|
||||||
context "#offer" do
|
|
||||||
context "queue is blocked" do
|
|
||||||
it "fails and give feedback" do
|
|
||||||
expect(subject.offer("Bonjour", 2)).to be_falsey
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "queue is not blocked" do
|
|
||||||
before do
|
|
||||||
@consumer = Thread.new { loop { subject.take } }
|
|
||||||
sleep(0.1)
|
|
||||||
end
|
|
||||||
|
|
||||||
after do
|
|
||||||
@consumer.kill
|
|
||||||
end
|
|
||||||
|
|
||||||
it "inserts successfully" do
|
|
||||||
expect(subject.offer("Bonjour", 20)).to be_truthy
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "queue clients" do
|
describe "queue clients" do
|
||||||
context "when requesting a write client" do
|
context "when requesting a write client" do
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue