mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
c7ca11e2ec
commit
6fb0f7db21
5 changed files with 48 additions and 34 deletions
|
@ -29,7 +29,9 @@ module LogStash
|
|||
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
|
||||
when "memory"
|
||||
# memory is the legacy and default setting
|
||||
LogStash::Util::WrappedSynchronousQueue.new
|
||||
LogStash::Util::WrappedSynchronousQueue.new(
|
||||
settings.get("pipeline.batch.size") * settings.get("pipeline.workers") * 2
|
||||
)
|
||||
else
|
||||
raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory_acked', 'memory', 'persisted'"
|
||||
end
|
||||
|
|
|
@ -2,13 +2,16 @@
|
|||
|
||||
module LogStash; module Util
|
||||
class WrappedSynchronousQueue
|
||||
java_import java.util.concurrent.ArrayBlockingQueue
|
||||
java_import java.util.concurrent.SynchronousQueue
|
||||
java_import java.util.concurrent.TimeUnit
|
||||
|
||||
def initialize
|
||||
@queue = SynchronousQueue.new
|
||||
def initialize (size)
|
||||
@queue = ArrayBlockingQueue.new(size)
|
||||
end
|
||||
|
||||
attr_reader :queue
|
||||
|
||||
# Push an object to the queue if the queue is full
|
||||
# it will block until the object can be added to the queue.
|
||||
#
|
||||
|
@ -58,7 +61,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def empty?
|
||||
true # synchronous queue is alway empty
|
||||
@queue.queue.isEmpty
|
||||
end
|
||||
|
||||
def set_batch_dimensions(batch_size, wait_for)
|
||||
|
@ -170,11 +173,11 @@ module LogStash; module Util
|
|||
@size = size
|
||||
@wait = wait
|
||||
|
||||
@originals = Hash.new
|
||||
|
||||
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
|
||||
# @cancelled = Hash.new
|
||||
|
||||
#Sizing HashSet to size/load_factor to ensure no rehashing
|
||||
@originals = java.util.HashSet.new(size * 4 / 3 + 1, 0.75)
|
||||
@generated = Hash.new
|
||||
@iterating_temp = Hash.new
|
||||
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
|
||||
|
@ -182,16 +185,18 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def read_next
|
||||
@size.times do |t|
|
||||
event = @queue.poll(@wait)
|
||||
return if event.nil? # queue poll timed out
|
||||
|
||||
@originals[event] = true
|
||||
read_size = @queue.queue.drainTo(@originals, @size)
|
||||
if read_size < @size
|
||||
(@size - read_size).times do |_|
|
||||
e = @queue.poll(@wait)
|
||||
return if e.nil?
|
||||
@originals.add(e)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def merge(event)
|
||||
return if event.nil? || @originals.key?(event)
|
||||
return if event.nil? || @originals.contains(event)
|
||||
# take care not to cause @generated to change during iteration
|
||||
# @iterating_temp is merged after the iteration
|
||||
if iterating?
|
||||
|
@ -214,9 +219,10 @@ module LogStash; module Util
|
|||
|
||||
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
|
||||
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
|
||||
@originals.each do |e, _|
|
||||
@originals.each do |e|
|
||||
blk.call(e) unless e.cancelled?
|
||||
end
|
||||
|
||||
@generated.each do |e, _|
|
||||
blk.call(e) unless e.cancelled?
|
||||
end
|
||||
|
|
|
@ -101,7 +101,7 @@ describe LogStash::Instrument::WrappedWriteClient do
|
|||
end
|
||||
|
||||
context "WrappedSynchronousQueue" do
|
||||
let(:queue) { LogStash::Util::WrappedSynchronousQueue.new }
|
||||
let(:queue) { LogStash::Util::WrappedSynchronousQueue.new(1024) }
|
||||
|
||||
before do
|
||||
read_client.set_events_metric(metric.namespace([:stats, :events]))
|
||||
|
|
|
@ -15,7 +15,9 @@ describe LogStash::QueueFactory do
|
|||
LogStash::Setting::Numeric.new("queue.checkpoint.acks", 1024),
|
||||
LogStash::Setting::Numeric.new("queue.checkpoint.writes", 1024),
|
||||
LogStash::Setting::Numeric.new("queue.checkpoint.interval", 1000),
|
||||
LogStash::Setting::String.new("pipeline.id", pipeline_id)
|
||||
LogStash::Setting::String.new("pipeline.id", pipeline_id),
|
||||
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
|
||||
LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
|
||||
]
|
||||
end
|
||||
|
||||
|
@ -72,9 +74,10 @@ describe LogStash::QueueFactory do
|
|||
context "when `queue.type` is `memory`" do
|
||||
before do
|
||||
settings.set("queue.type", "memory")
|
||||
settings.set("pipeline.batch.size", 1024)
|
||||
end
|
||||
|
||||
it "returns a `WrappedAckedQueue`" do
|
||||
it "returns a `WrappedSynchronousQueue`" do
|
||||
queue = subject.create(settings)
|
||||
expect(queue).to be_kind_of(LogStash::Util::WrappedSynchronousQueue)
|
||||
queue.close
|
||||
|
|
|
@ -5,6 +5,8 @@ require "logstash/instrument/collector"
|
|||
|
||||
describe LogStash::Util::WrappedSynchronousQueue do
|
||||
|
||||
subject {LogStash::Util::WrappedSynchronousQueue.new(5)}
|
||||
|
||||
describe "queue clients" do
|
||||
context "when requesting a write client" do
|
||||
it "returns a client" do
|
||||
|
@ -18,15 +20,9 @@ describe LogStash::Util::WrappedSynchronousQueue do
|
|||
end
|
||||
end
|
||||
|
||||
class DummyQueue < Array
|
||||
def take() shift(); end
|
||||
def poll(*) shift(); end
|
||||
end
|
||||
|
||||
describe "WriteClient | ReadClient" do
|
||||
let(:queue) { DummyQueue.new }
|
||||
let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(queue)}
|
||||
let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(queue)}
|
||||
let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(subject)}
|
||||
let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(subject)}
|
||||
|
||||
context "when reading from the queue" do
|
||||
let(:collector) { LogStash::Instrument::Collector.new }
|
||||
|
@ -95,25 +91,32 @@ describe LogStash::Util::WrappedSynchronousQueue do
|
|||
|
||||
it "appends batches to the queue" do
|
||||
batch = write_client.get_new_batch
|
||||
5.times {|i| batch.push(LogStash::Event.new({"message" => "value-#{i}"}))}
|
||||
messages = []
|
||||
5.times do |i|
|
||||
message = "value-#{i}"
|
||||
batch.push(LogStash::Event.new({"message" => message}))
|
||||
messages << message
|
||||
end
|
||||
write_client.push_batch(batch)
|
||||
read_batch = read_client.read_batch
|
||||
expect(read_batch.size).to eq(5)
|
||||
i = 0
|
||||
read_batch.each do |data|
|
||||
expect(data.get("message")).to eq("value-#{i}")
|
||||
message = data.get("message")
|
||||
expect(messages).to include(message)
|
||||
messages.delete(message)
|
||||
# read_batch.cancel("value-#{i}") if i > 2 # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
|
||||
data.cancel if i > 2
|
||||
read_batch.merge(LogStash::Event.new({"message" => "generated-#{i}"})) if i > 2
|
||||
i += 1
|
||||
if message.match /value-[3-4]/
|
||||
data.cancel
|
||||
read_batch.merge(LogStash::Event.new({ "message" => message.gsub(/value/, 'generated') }))
|
||||
end
|
||||
end
|
||||
# expect(read_batch.cancelled_size).to eq(2) # disabled for https://github.com/elastic/logstash/issues/6055
|
||||
i = 0
|
||||
received = []
|
||||
read_batch.each do |data|
|
||||
expect(data.get("message")).to eq("value-#{i}") if i < 3
|
||||
expect(data.get("message")).to eq("generated-#{i}") if i > 2
|
||||
i += 1
|
||||
end
|
||||
received << data.get("message")
|
||||
end
|
||||
(0..2).each {|i| expect(received).to include("value-#{i}")}
|
||||
(3..4).each {|i| expect(received).to include("generated-#{i}")}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue