diff --git a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb index 25dec93a5..1cc885beb 100644 --- a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb +++ b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb @@ -33,6 +33,9 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input if !listen_successful raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines." end + # add address to the plugin stats + metric.gauge(:address, address) + end def run(queue) diff --git a/logstash-core/lib/logstash/plugins/builtin/pipeline/output.rb b/logstash-core/lib/logstash/plugins/builtin/pipeline/output.rb index 05835f8e2..07107cb60 100644 --- a/logstash-core/lib/logstash/plugins/builtin/pipeline/output.rb +++ b/logstash-core/lib/logstash/plugins/builtin/pipeline/output.rb @@ -30,6 +30,8 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Output def register @pipeline_bus = execution_context.agent.pipeline_bus + # add list of pipelines to send to the plugin metrics + metric.gauge(:send_to, send_to) pipeline_bus.registerSender(self, @send_to) end diff --git a/logstash-core/spec/logstash/plugins/builtin/pipeline_input_output_spec.rb b/logstash-core/spec/logstash/plugins/builtin/pipeline_input_output_spec.rb index f1184842d..5e130b9db 100644 --- a/logstash-core/spec/logstash/plugins/builtin/pipeline_input_output_spec.rb +++ b/logstash-core/spec/logstash/plugins/builtin/pipeline_input_output_spec.rb @@ -31,6 +31,11 @@ describe ::LogStash::Plugins::Builtin::Pipeline do let(:input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(input_options) } let(:output) { ::LogStash::Plugins::Builtin::Pipeline::Output.new(output_options) } let(:inputs) { [input] } + let(:metric) { + LogStash::Instrument::NamespacedMetric.new( + LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new), [:filter] + ) + } let(:event) { ::LogStash::Event.new("foo" => "bar") } @@ -39,8 +44,10 @@ describe ::LogStash::Plugins::Builtin::Pipeline do allow(agent).to receive(:pipeline_bus).and_return(pipeline_bus) inputs.each do |i| allow(i).to receive(:execution_context).and_return(execution_context) + i.metric = metric end allow(output).to receive(:execution_context).and_return(execution_context) + output.metric = metric end def wait_input_running(input_plugin) @@ -92,6 +99,20 @@ describe ::LogStash::Plugins::Builtin::Pipeline do event.set("baz", "bot") expect(subject.to_hash_with_metadata).not_to match(event.to_hash_with_metadata) end + + it 'should add `address` to the plugin metrics' do + event_metrics = input.metric.collector.snapshot_metric.metric_store.get_with_path( + "filter" + )[:filter] + expect(event_metrics[:address].value).to eq(address) + end + it 'should add `send_to` to the plugin metrics' do + event_metrics = output.metric.collector.snapshot_metric.metric_store.get_with_path( + "filter" + )[:filter] + expect(event_metrics[:send_to].value).to eq([address]) + end + end after(:each) do @@ -140,9 +161,10 @@ describe ::LogStash::Plugins::Builtin::Pipeline do describe "one output to multiple inputs" do describe "with all plugins up" do let(:other_address) { "other" } + let(:send_addresses) { [address, other_address]} let(:other_input_options) { { "address" => other_address } } let(:other_input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(other_input_options) } - let(:output_options) { { "send_to" => [address, other_address] } } + let(:output_options) { { "send_to" => send_addresses } } let(:inputs) { [input, other_input] } let(:queues) { [Queue.new, Queue.new] } let(:inputs_queues) { Hash[inputs.zip(queues)] } @@ -162,6 +184,13 @@ describe ::LogStash::Plugins::Builtin::Pipeline do end end + it 'should add multiple `send_to` addresses to the plugin metrics' do + event_metrics = output.metric.collector.snapshot_metric.metric_store.get_with_path( + "filter" + )[:filter] + expect(event_metrics[:send_to].value).to eq(send_addresses) + end + describe "sending a message" do before(:each) do output.multi_receive([event])