From 6fb0f7db2116bb65a8d734080cb691c53ceb7646 Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 13 Jul 2017 22:29:22 +0200 Subject: [PATCH] #7690 Fix needless waiting by using an ArrayBlockingQueue Fixes #7691 --- logstash-core/lib/logstash/queue_factory.rb | 4 +- .../util/wrapped_synchronous_queue.rb | 30 ++++++++------ .../instrument/wrapped_write_client_spec.rb | 2 +- .../spec/logstash/queue_factory_spec.rb | 7 +++- .../util/wrapped_synchronous_queue_spec.rb | 39 ++++++++++--------- 5 files changed, 48 insertions(+), 34 deletions(-) diff --git a/logstash-core/lib/logstash/queue_factory.rb b/logstash-core/lib/logstash/queue_factory.rb index 70b215557..007d9f604 100644 --- a/logstash-core/lib/logstash/queue_factory.rb +++ b/logstash-core/lib/logstash/queue_factory.rb @@ -29,7 +29,9 @@ module LogStash LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes) when "memory" # memory is the legacy and default setting - LogStash::Util::WrappedSynchronousQueue.new + LogStash::Util::WrappedSynchronousQueue.new( + settings.get("pipeline.batch.size") * settings.get("pipeline.workers") * 2 + ) else raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory_acked', 'memory', 'persisted'" end diff --git a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb index d217e80bb..676d47e69 100644 --- a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb @@ -2,13 +2,16 @@ module LogStash; module Util class WrappedSynchronousQueue + java_import java.util.concurrent.ArrayBlockingQueue java_import java.util.concurrent.SynchronousQueue java_import java.util.concurrent.TimeUnit - def initialize - @queue = SynchronousQueue.new + def initialize (size) + @queue = ArrayBlockingQueue.new(size) end + attr_reader :queue + # Push an object to the queue if the queue is full # it will block until the object can be added to the queue. # @@ -58,7 +61,7 @@ module LogStash; module Util end def empty? - true # synchronous queue is alway empty + @queue.queue.isEmpty end def set_batch_dimensions(batch_size, wait_for) @@ -170,11 +173,11 @@ module LogStash; module Util @size = size @wait = wait - @originals = Hash.new - # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor # @cancelled = Hash.new + #Sizing HashSet to size/load_factor to ensure no rehashing + @originals = java.util.HashSet.new(size * 4 / 3 + 1, 0.75) @generated = Hash.new @iterating_temp = Hash.new @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads @@ -182,16 +185,18 @@ module LogStash; module Util end def read_next - @size.times do |t| - event = @queue.poll(@wait) - return if event.nil? # queue poll timed out - - @originals[event] = true + read_size = @queue.queue.drainTo(@originals, @size) + if read_size < @size + (@size - read_size).times do |_| + e = @queue.poll(@wait) + return if e.nil? + @originals.add(e) + end end end def merge(event) - return if event.nil? || @originals.key?(event) + return if event.nil? || @originals.contains(event) # take care not to cause @generated to change during iteration # @iterating_temp is merged after the iteration if iterating? @@ -214,9 +219,10 @@ module LogStash; module Util # below the checks for @cancelled.include?(e) have been replaced by e.cancelled? # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor - @originals.each do |e, _| + @originals.each do |e| blk.call(e) unless e.cancelled? end + @generated.each do |e, _| blk.call(e) unless e.cancelled? end diff --git a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb index d01ae4dd2..6cc828d00 100644 --- a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb +++ b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb @@ -101,7 +101,7 @@ describe LogStash::Instrument::WrappedWriteClient do end context "WrappedSynchronousQueue" do - let(:queue) { LogStash::Util::WrappedSynchronousQueue.new } + let(:queue) { LogStash::Util::WrappedSynchronousQueue.new(1024) } before do read_client.set_events_metric(metric.namespace([:stats, :events])) diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb index 9182c9c95..1460ec106 100644 --- a/logstash-core/spec/logstash/queue_factory_spec.rb +++ b/logstash-core/spec/logstash/queue_factory_spec.rb @@ -15,7 +15,9 @@ describe LogStash::QueueFactory do LogStash::Setting::Numeric.new("queue.checkpoint.acks", 1024), LogStash::Setting::Numeric.new("queue.checkpoint.writes", 1024), LogStash::Setting::Numeric.new("queue.checkpoint.interval", 1000), - LogStash::Setting::String.new("pipeline.id", pipeline_id) + LogStash::Setting::String.new("pipeline.id", pipeline_id), + LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125), + LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum) ] end @@ -72,9 +74,10 @@ describe LogStash::QueueFactory do context "when `queue.type` is `memory`" do before do settings.set("queue.type", "memory") + settings.set("pipeline.batch.size", 1024) end - it "returns a `WrappedAckedQueue`" do + it "returns a `WrappedSynchronousQueue`" do queue = subject.create(settings) expect(queue).to be_kind_of(LogStash::Util::WrappedSynchronousQueue) queue.close diff --git a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb index 89b3e26d2..7fcf4bec4 100644 --- a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb +++ b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb @@ -5,6 +5,8 @@ require "logstash/instrument/collector" describe LogStash::Util::WrappedSynchronousQueue do + subject {LogStash::Util::WrappedSynchronousQueue.new(5)} + describe "queue clients" do context "when requesting a write client" do it "returns a client" do @@ -18,15 +20,9 @@ describe LogStash::Util::WrappedSynchronousQueue do end end - class DummyQueue < Array - def take() shift(); end - def poll(*) shift(); end - end - describe "WriteClient | ReadClient" do - let(:queue) { DummyQueue.new } - let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(queue)} - let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(queue)} + let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(subject)} + let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(subject)} context "when reading from the queue" do let(:collector) { LogStash::Instrument::Collector.new } @@ -95,25 +91,32 @@ describe LogStash::Util::WrappedSynchronousQueue do it "appends batches to the queue" do batch = write_client.get_new_batch - 5.times {|i| batch.push(LogStash::Event.new({"message" => "value-#{i}"}))} + messages = [] + 5.times do |i| + message = "value-#{i}" + batch.push(LogStash::Event.new({"message" => message})) + messages << message + end write_client.push_batch(batch) read_batch = read_client.read_batch expect(read_batch.size).to eq(5) - i = 0 read_batch.each do |data| - expect(data.get("message")).to eq("value-#{i}") + message = data.get("message") + expect(messages).to include(message) + messages.delete(message) # read_batch.cancel("value-#{i}") if i > 2 # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor - data.cancel if i > 2 - read_batch.merge(LogStash::Event.new({"message" => "generated-#{i}"})) if i > 2 - i += 1 + if message.match /value-[3-4]/ + data.cancel + read_batch.merge(LogStash::Event.new({ "message" => message.gsub(/value/, 'generated') })) + end end # expect(read_batch.cancelled_size).to eq(2) # disabled for https://github.com/elastic/logstash/issues/6055 - i = 0 + received = [] read_batch.each do |data| - expect(data.get("message")).to eq("value-#{i}") if i < 3 - expect(data.get("message")).to eq("generated-#{i}") if i > 2 - i += 1 + received << data.get("message") end + (0..2).each {|i| expect(received).to include("value-#{i}")} + (3..4).each {|i| expect(received).to include("generated-#{i}")} end end end