add queue drain option support

wip queue drain option

metrics on empty batches

reenabled spec

cosmetic fixes

stats collection, mutex, specs, empty batch handling

start_metrics
This commit is contained in:
Colin Surprenant 2016-12-15 10:03:08 -05:00
parent 4bd0ea99a0
commit 154ce65c3a
10 changed files with 161 additions and 66 deletions

View file

@ -7,6 +7,7 @@ import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyBoolean;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
@ -43,7 +44,6 @@ public class JrubyAckedQueueExtLibrary implements Library {
// as a simplified first prototyping implementation, the Settings class is not exposed and the queue elements
// are assumed to be logstash Event.
@JRubyClass(name = "AckedQueue", parent = "Object")
public static class RubyAckedQueue extends RubyObject {
private Queue queue;
@ -171,6 +171,11 @@ public class JrubyAckedQueueExtLibrary implements Library {
return (b == null) ? context.nil : new JrubyAckedBatchExtLibrary.RubyAckedBatch(context.runtime, b);
}
@JRubyMethod(name = "is_fully_acked?")
public IRubyObject ruby_is_fully_acked(ThreadContext context)
{
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
}
@JRubyMethod(name = "close")
public IRubyObject ruby_close(ThreadContext context)
@ -183,6 +188,5 @@ public class JrubyAckedQueueExtLibrary implements Library {
return context.nil;
}
}
}

View file

@ -7,6 +7,7 @@ import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyBoolean;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
@ -166,6 +167,11 @@ public class JrubyAckedQueueMemoryExtLibrary implements Library {
return (b == null) ? context.nil : new JrubyAckedBatchExtLibrary.RubyAckedBatch(context.runtime, b);
}
@JRubyMethod(name = "is_fully_acked?")
public IRubyObject ruby_is_fully_acked(ThreadContext context)
{
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
}
@JRubyMethod(name = "close")
public IRubyObject ruby_close(ThreadContext context)

View file

@ -41,6 +41,7 @@ module LogStash
Setting::PortRange.new("http.port", 9600..9700),
Setting::String.new("http.environment", "production"),
Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]),
Setting::Boolean.new("queue.drain", false),
Setting::Bytes.new("queue.page_capacity", "250mb"),
Setting::Bytes.new("queue.max_bytes", "1024mb"),
Setting::Numeric.new("queue.max_events", 0), # 0 is unlimited

View file

@ -154,6 +154,7 @@ module LogStash; class Pipeline < BasePipeline
@filter_queue_client.set_pipeline_metric(
metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
)
@drain_queue = @settings.get_value("queue.drain")
@events_filtered = Concurrent::AtomicFixnum.new(0)
@events_consumed = Concurrent::AtomicFixnum.new(0)
@ -326,26 +327,32 @@ module LogStash; class Pipeline < BasePipeline
# Main body of what a worker thread does
# Repeatedly takes batches off the queue, filters, then outputs them
def worker_loop(batch_size, batch_delay)
running = true
shutdown_requested = false
@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
while running
batch = @filter_queue_client.take_batch
while true
signal = @signal_queue.empty? ? NO_SIGNAL : @signal_queue.pop
running = !signal.shutdown?
shutdown_requested |= signal.shutdown? # latch on shutdown signal
batch = @filter_queue_client.read_batch # metrics are started in read_batch
@events_consumed.increment(batch.size)
filter_batch(batch)
if signal.flush? || signal.shutdown?
flush_filters_to_batch(batch, :final => signal.shutdown?)
end
flush_filters_to_batch(batch, :final => false) if signal.flush?
output_batch(batch)
@filter_queue_client.close_batch(batch)
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
break if shutdown_requested && !draining_queue?
end
# we are shutting down, queue is drained if it was required, now perform a final flush.
# for this we need to create a new empty batch to contain the final flushed events
batch = @filter_queue_client.new_batch
@filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
flush_filters_to_batch(batch, :final => true)
output_batch(batch)
@filter_queue_client.close_batch(batch)
end
def filter_batch(batch)
@ -604,4 +611,10 @@ module LogStash; class Pipeline < BasePipeline
:flushing => @flushing
}
end
end end
private
def draining_queue?
@drain_queue ? !@filter_queue_client.empty? : false
end
end; end

View file

@ -125,6 +125,10 @@ module LogStash; module Util
@queue.close
end
def empty?
@mutex.synchronize { @queue.is_fully_acked? }
end
def set_batch_dimensions(batch_size, wait_for)
@batch_size = batch_size
@wait_for = wait_for
@ -157,16 +161,29 @@ module LogStash; module Util
@inflight_batches.fetch(Thread.current, [])
end
def take_batch
# create a new empty batch
# @return [ReadBatch] a new empty read batch
def new_batch
ReadBatch.new(@queue, @batch_size, @wait_for)
end
def read_batch
if @queue.closed?
raise QueueClosedError.new("Attempt to take a batch from a closed AckedQueue")
end
batch = new_batch
@mutex.synchronize { batch.read_next }
start_metrics(batch)
batch
end
def start_metrics(batch)
@mutex.synchronize do
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
# there seems to be concurrency issues with metrics, keep it in the mutex
add_starting_metrics(batch)
set_current_thread_inflight_batch(batch)
start_clock
batch
end
end
@ -177,21 +194,30 @@ module LogStash; module Util
def close_batch(batch)
@mutex.synchronize do
batch.close
# there seems to be concurrency issues with metrics, keep it in the mutex
@inflight_batches.delete(Thread.current)
stop_clock
stop_clock(batch)
end
end
def start_clock
@inflight_clocks[Thread.current] = [
@event_metric.time(:duration_in_millis),
@pipeline_metric.time(:duration_in_millis)
@event_metric.time(:duration_in_millis),
@pipeline_metric.time(:duration_in_millis)
]
end
def stop_clock
@inflight_clocks[Thread.current].each(&:stop)
@inflight_clocks.delete(Thread.current)
def stop_clock(batch)
unless @inflight_clocks[Thread.current].nil?
if batch.size > 0
# onl/y stop (which also records) the metrics if the batch is non-empty.
# start_clock is now called at empty batch creation and an empty batch could
# stay empty all the way down to the close_batch call.
@inflight_clocks[Thread.current].each(&:stop)
end
@inflight_clocks.delete(Thread.current)
end
end
def add_starting_metrics(batch)
@ -213,6 +239,10 @@ module LogStash; module Util
class ReadBatch
def initialize(queue, size, wait)
@queue = queue
@size = size
@wait = wait
@originals = Hash.new
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
@ -221,7 +251,13 @@ module LogStash; module Util
@generated = Hash.new
@iterating_temp = Hash.new
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
take_originals_from_queue(queue, size, wait) # this sets a reference to @acked_batch
@acked_batch = nil
end
def read_next
@acked_batch = @queue.read_batch(@size, @wait)
return if @acked_batch.nil?
@acked_batch.get_elements.each { |e| @originals[e] = true }
end
def close
@ -301,14 +337,6 @@ module LogStash; module Util
@generated.update(@iterating_temp)
@iterating_temp.clear
end
def take_originals_from_queue(queue, size, wait)
@acked_batch = queue.read_batch(size, wait)
return if @acked_batch.nil?
@acked_batch.get_elements.each do |e|
@originals[e] = true
end
end
end
class WriteClient

View file

@ -72,6 +72,10 @@ module LogStash; module Util
# noop, compat with acked queue read client
end
def empty?
true # synchronous queue is alway empty
end
def set_batch_dimensions(batch_size, wait_for)
@batch_size = batch_size
@wait_for = wait_for
@ -104,18 +108,25 @@ module LogStash; module Util
@inflight_batches.fetch(Thread.current, [])
end
def take_batch
@mutex.synchronize do
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
set_current_thread_inflight_batch(batch)
# create a new empty batch
# @return [ReadBatch] a new empty read batch
def new_batch
ReadBatch.new(@queue, @batch_size, @wait_for)
end
# We dont actually have any events to work on so lets
# not bother with recording metrics for them
if batch.size > 0
add_starting_metrics(batch)
start_clock
end
batch
def read_batch
batch = new_batch
@mutex.synchronize { batch.read_next }
start_metrics(batch)
batch
end
def start_metrics(batch)
@mutex.synchronize do
# there seems to be concurrency issues with metrics, keep it in the mutex
add_starting_metrics(batch)
set_current_thread_inflight_batch(batch)
start_clock
end
end
@ -125,8 +136,9 @@ module LogStash; module Util
def close_batch(batch)
@mutex.synchronize do
# there seems to be concurrency issues with metrics, keep it in the mutex
@inflight_batches.delete(Thread.current)
stop_clock
stop_clock(batch)
end
end
@ -137,9 +149,14 @@ module LogStash; module Util
]
end
def stop_clock
def stop_clock(batch)
unless @inflight_clocks[Thread.current].nil?
@inflight_clocks[Thread.current].each(&:stop)
if batch.size > 0
# only stop (which also records) the metrics if the batch is non-empty.
# start_clock is now called at empty batch creation and an empty batch could
# stay empty all the way down to the close_batch call.
@inflight_clocks[Thread.current].each(&:stop)
end
@inflight_clocks.delete(Thread.current)
end
end
@ -162,6 +179,10 @@ module LogStash; module Util
class ReadBatch
def initialize(queue, size, wait)
@queue = queue
@size = size
@wait = wait
@originals = Hash.new
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
@ -170,7 +191,16 @@ module LogStash; module Util
@generated = Hash.new
@iterating_temp = Hash.new
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
take_originals_from_queue(queue, size, wait)
@acked_batch = nil
end
def read_next
@size.times do |t|
event = @queue.poll(@wait)
return if event.nil? # queue poll timed out
@originals[event] = true
end
end
def merge(event)
@ -235,15 +265,6 @@ module LogStash; module Util
@generated.update(@iterating_temp)
@iterating_temp.clear
end
def take_originals_from_queue(queue, size, wait)
size.times do |t|
event = queue.poll(wait)
return if event.nil? # queue poll timed out
@originals[event] = true
end
end
end
class WriteClient

View file

@ -82,6 +82,13 @@ describe LogStash::Pipeline do
let(:queue_type) { "persisted" } # "memory" "memory_acked"
let(:times) { [] }
let(:pipeline_thread) do
# subject has to be called for the first time outside the thread because it will create a race condition
# with the subject.ready? call since subject is lazily initialized
s = subject
Thread.new { s.run }
end
before :each do
FileUtils.mkdir_p(this_queue_folder)
@ -97,19 +104,22 @@ describe LogStash::Pipeline do
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
pipeline_settings_obj.set("queue.page_capacity", page_capacity)
pipeline_settings_obj.set("queue.max_bytes", max_bytes)
Thread.new do
# make sure we have received all the generated events
while counting_output.event_count < number_of_events do
sleep 1
end
subject.shutdown
end
times.push(Time.now.to_f)
subject.run
pipeline_thread
sleep(0.1) until subject.ready?
# make sure we have received all the generated events
while counting_output.event_count < number_of_events do
sleep(0.5)
end
times.unshift(Time.now.to_f - times.first)
end
after :each do
subject.shutdown
pipeline_thread.join
# Dir.rm_rf(this_queue_folder)
end

View file

@ -3,6 +3,7 @@ require "spec_helper"
require "logstash/inputs/generator"
require "logstash/filters/multiline"
require_relative "../support/mocks_classes"
require_relative "../logstash/pipeline_reporter_spec" # for DummyOutput class
class DummyInput < LogStash::Inputs::Base
config_name "dummyinput"

View file

@ -60,7 +60,7 @@ describe LogStash::Util::WrappedSynchronousQueue do
context "when the queue is empty" do
it "doesnt record the `duration_in_millis`" do
batch = read_client.take_batch
batch = read_client.read_batch
read_client.close_batch(batch)
store = collector.snapshot_metric.metric_store
@ -95,7 +95,8 @@ describe LogStash::Util::WrappedSynchronousQueue do
batch = write_client.get_new_batch
5.times {|i| batch.push("value-#{i}")}
write_client.push_batch(batch)
read_batch = read_client.take_batch
read_batch = read_client.read_batch
sleep(0.1) # simulate some work for the `duration_in_millis`
# TODO: this interaction should be cleaned in an upcoming PR,
# This is what the current pipeline does.
@ -126,7 +127,7 @@ describe LogStash::Util::WrappedSynchronousQueue do
batch = write_client.get_new_batch
5.times {|i| batch.push(LogStash::Event.new({"message" => "value-#{i}"}))}
write_client.push_batch(batch)
read_batch = read_client.take_batch
read_batch = read_client.read_batch
expect(read_batch.size).to eq(5)
i = 0
read_batch.each do |data|

View file

@ -371,6 +371,16 @@ public class Queue implements Closeable {
}
}
// @return true if the queue is fully acked, which implies that it is fully read which works as an "empty" state.
public boolean isFullyAcked() {
lock.lock();
try {
return this.tailPages.isEmpty() ? this.headPage.isFullyAcked() : false;
} finally {
lock.unlock();
}
}
// @param seqNum the element sequence number upper bound for which persistence should be garanteed (by fsync'ing)
public void ensurePersistedUpto(long seqNum) throws IOException{
lock.lock();