[X-Pack Monitoring] Report pipeline protocol (#9516)

* Add getter for pipeline protocol

* Refactoring: extract bare string into constant

* Report pipeline protocol to Monitoring as part of logstash_state docs

* Report all protocols, not just first one

* Refactoring: rename constant to be more descriptive

* De-dupe protocols list before sending to monitoring

* Checking for single protocol

* Adding missing require

This was being masked and was uncovered when this test suite was run by itself

* Raising error if multiple protocols are specified

* Adding back comma
This commit is contained in:
Shaunak Kashyap 2018-05-11 13:30:42 -07:00 committed by GitHub
parent 6886a14fb6
commit dc168a10b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 4 deletions

View file

@ -5,14 +5,23 @@ module LogStash module Config
class PipelineConfig
include LogStash::Util::Loggable
attr_reader :source, :pipeline_id, :config_parts, :settings, :read_at
attr_reader :source, :pipeline_id, :config_parts, :protocol, :settings, :read_at
def initialize(source, pipeline_id, config_parts, settings)
config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts]
unique_protocols = config_parts_array
.map { |config_part| config_part.protocol.to_s }
.uniq
if unique_protocols.length > 1
raise(ArgumentError, "There should be exactly 1 unique protocol in config_parts. Found #{unique_protocols.length.to_s}.")
end
@source = source
@pipeline_id = pipeline_id
# We can't use Array() since config_parts may be a java object!
config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts]
@config_parts = config_parts_array.sort_by { |config_part| [config_part.protocol.to_s, config_part.id] }
@protocol = unique_protocols[0]
@settings = settings
@read_at = Time.now
end

View file

@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/config/pipeline_config"
require "logstash/config/source/local"
require_relative "../../support/helpers"
describe LogStash::Config::PipelineConfig do
let(:source) { LogStash::Config::Source::Local }
@ -13,7 +14,6 @@ describe LogStash::Config::PipelineConfig do
org.logstash.common.SourceWithMetadata.new("file", "/tmp/4", 0, 0, "input { generator4 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/5", 0, 0, "input { generator5 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/6", 0, 0, "input { generator6 }"),
org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator1 }"),
]
end
@ -72,4 +72,13 @@ describe LogStash::Config::PipelineConfig do
end
end
end
it "returns the pipeline's protocol" do
expect(subject.protocol).to eq((ordered_config_parts.uniq { | config_part | config_part.protocol })[0].protocol)
end
it "raises an ArgumentError when multiple protocols are supplied" do
unordered_config_parts << org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator0 }")
expect { described_class.new(source, pipeline_id, unordered_config_parts, settings) }.to raise_error ArgumentError, /.+Found 2\./
end
end

View file

@ -32,6 +32,7 @@ module LogStash
queue.max_bytes
queue.checkpoint.writes
)
CENTRALLY_MANAGED_PIPELINE_PROTOCOL = "x-pack-config-management"
def initialize(settings)
super(settings)
@ -98,7 +99,7 @@ module LogStash
raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string)
config_part = org.logstash.common.SourceWithMetadata.new(CENTRALLY_MANAGED_PIPELINE_PROTOCOL, pipeline_id.to_s, config_string)
# We don't support multiple pipelines, so use the global settings from the logstash.yml file
settings = @settings.clone

View file

@ -27,6 +27,7 @@ module LogStash; module Inputs; class Metrics;
"id" => pipeline.pipeline_id,
"hash" => pipeline.lir.unique_hash,
"ephemeral_id" => pipeline.ephemeral_id,
"protocol" => pipeline.pipeline_config.protocol,
"workers" => pipeline.settings.get("pipeline.workers"),
"batch_size" => pipeline.settings.get("pipeline.batch.size"),
"representation" => ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(pipeline.lir)