[7.x backport] Add addresses to p2pipeline input and output plugin metrics (#12413)

Clean backport of #12394

This commit adds context to the pipeline to pipeline input and output
plugins by adding a string containing the `address` field to the input
plugin, and an array containing the `send_to` field to the output plugin.
This helps gain a picture of how pipeline to pipeline enabled configurations
are communicating with each other, without having to refer back to the pipeline
definition
This commit is contained in:
Rob Bavey 2020-11-04 16:09:22 -05:00 committed by GitHub
parent 958ffa71f0
commit 92f304a132
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 1 deletions

View file

@ -33,6 +33,9 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Input
if !listen_successful
raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines."
end
# add address to the plugin stats
metric.gauge(:address, address)
end
def run(queue)

View file

@ -30,6 +30,8 @@ module ::LogStash; module Plugins; module Builtin; module Pipeline; class Output
def register
@pipeline_bus = execution_context.agent.pipeline_bus
# add list of pipelines to send to the plugin metrics
metric.gauge(:send_to, send_to)
pipeline_bus.registerSender(self, @send_to)
end

View file

@ -31,6 +31,11 @@ describe ::LogStash::Plugins::Builtin::Pipeline do
let(:input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(input_options) }
let(:output) { ::LogStash::Plugins::Builtin::Pipeline::Output.new(output_options) }
let(:inputs) { [input] }
let(:metric) {
LogStash::Instrument::NamespacedMetric.new(
LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new), [:filter]
)
}
let(:event) { ::LogStash::Event.new("foo" => "bar") }
@ -39,8 +44,10 @@ describe ::LogStash::Plugins::Builtin::Pipeline do
allow(agent).to receive(:pipeline_bus).and_return(pipeline_bus)
inputs.each do |i|
allow(i).to receive(:execution_context).and_return(execution_context)
i.metric = metric
end
allow(output).to receive(:execution_context).and_return(execution_context)
output.metric = metric
end
def wait_input_running(input_plugin)
@ -92,6 +99,20 @@ describe ::LogStash::Plugins::Builtin::Pipeline do
event.set("baz", "bot")
expect(subject.to_hash_with_metadata).not_to match(event.to_hash_with_metadata)
end
it 'should add `address` to the plugin metrics' do
event_metrics = input.metric.collector.snapshot_metric.metric_store.get_with_path(
"filter"
)[:filter]
expect(event_metrics[:address].value).to eq(address)
end
it 'should add `send_to` to the plugin metrics' do
event_metrics = output.metric.collector.snapshot_metric.metric_store.get_with_path(
"filter"
)[:filter]
expect(event_metrics[:send_to].value).to eq([address])
end
end
after(:each) do
@ -140,9 +161,10 @@ describe ::LogStash::Plugins::Builtin::Pipeline do
describe "one output to multiple inputs" do
describe "with all plugins up" do
let(:other_address) { "other" }
let(:send_addresses) { [address, other_address]}
let(:other_input_options) { { "address" => other_address } }
let(:other_input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(other_input_options) }
let(:output_options) { { "send_to" => [address, other_address] } }
let(:output_options) { { "send_to" => send_addresses } }
let(:inputs) { [input, other_input] }
let(:queues) { [Queue.new, Queue.new] }
let(:inputs_queues) { Hash[inputs.zip(queues)] }
@ -162,6 +184,13 @@ describe ::LogStash::Plugins::Builtin::Pipeline do
end
end
it 'should add multiple `send_to` addresses to the plugin metrics' do
event_metrics = output.metric.collector.snapshot_metric.metric_store.get_with_path(
"filter"
)[:filter]
expect(event_metrics[:send_to].value).to eq(send_addresses)
end
describe "sending a message" do
before(:each) do
output.multi_receive([event])