mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
edd8844670
commit
2f7a5327ad
3 changed files with 13 additions and 36 deletions
|
@ -53,11 +53,10 @@ module LogStash; class PipelineReporter
|
|||
|
||||
def to_hash
|
||||
# pipeline.filter_queue_client.inflight_batches is synchronized
|
||||
pipeline.filter_queue_client.inflight_batches do |batch_map|
|
||||
worker_states_snap = worker_states(batch_map) # We only want to run this once
|
||||
inflight_count = worker_states_snap.map {|s| s[:inflight_count] }.reduce(0, :+)
|
||||
|
||||
{
|
||||
batch_map = pipeline.filter_queue_client.inflight_batches
|
||||
worker_states_snap = worker_states(batch_map) # We only want to run this once
|
||||
inflight_count = worker_states_snap.map {|s| s[:inflight_count]}.reduce(0, :+)
|
||||
{
|
||||
:events_filtered => events_filtered,
|
||||
:events_consumed => events_consumed,
|
||||
:inflight_count => inflight_count,
|
||||
|
@ -65,8 +64,7 @@ module LogStash; class PipelineReporter
|
|||
:output_info => output_info,
|
||||
:thread_info => pipeline.plugin_threads_info,
|
||||
:stalling_threads_info => pipeline.stalling_threads_info
|
||||
}
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -103,7 +101,7 @@ module LogStash; class PipelineReporter
|
|||
{
|
||||
:type => output_delegator.config_name,
|
||||
:id => output_delegator.id,
|
||||
:concurrency => output_delegator.concurrency,
|
||||
:concurrency => output_delegator.concurrency,
|
||||
}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -96,10 +96,9 @@ module LogStash; module Util
|
|||
|
||||
def initialize(queue, batch_size = 125, wait_for = 50)
|
||||
@queue = queue
|
||||
@mutex = Mutex.new
|
||||
# Note that @inflight_batches as a central mechanism for tracking inflight
|
||||
# batches will fail if we have multiple read clients in the pipeline.
|
||||
@inflight_batches = {}
|
||||
@inflight_batches = Concurrent::Map.new
|
||||
# allow the worker thread to report the execution time of the filter + output
|
||||
@inflight_clocks = Concurrent::Map.new
|
||||
@batch_size = batch_size
|
||||
|
@ -111,12 +110,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def empty?
|
||||
@mutex.lock
|
||||
begin
|
||||
@queue.is_empty?
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
@queue.is_empty?
|
||||
end
|
||||
|
||||
def set_batch_dimensions(batch_size, wait_for)
|
||||
|
@ -141,12 +135,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def inflight_batches
|
||||
@mutex.lock
|
||||
begin
|
||||
yield(@inflight_batches)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
@inflight_batches
|
||||
end
|
||||
|
||||
# create a new empty batch
|
||||
|
@ -168,24 +157,14 @@ module LogStash; module Util
|
|||
|
||||
def start_metrics(batch)
|
||||
thread = Thread.current
|
||||
@mutex.lock
|
||||
begin
|
||||
@inflight_batches[thread] = batch
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
@inflight_batches[thread] = batch
|
||||
@inflight_clocks[thread] = java.lang.System.nano_time
|
||||
end
|
||||
|
||||
def close_batch(batch)
|
||||
thread = Thread.current
|
||||
@mutex.lock
|
||||
begin
|
||||
batch.close
|
||||
@inflight_batches.delete(thread)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
batch.close
|
||||
@inflight_batches.delete(thread)
|
||||
start_time = @inflight_clocks.get_and_set(thread, nil)
|
||||
unless start_time.nil?
|
||||
if batch.size > 0
|
||||
|
|
|
@ -77,7 +77,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def inflight_batches
|
||||
yield(@inflight_batches)
|
||||
@inflight_batches
|
||||
end
|
||||
|
||||
# create a new empty batch
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue