From dc168a10b55b8a59be6734fca04789f60d8ea153 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 11 May 2018 13:30:42 -0700 Subject: [PATCH] [X-Pack Monitoring] Report pipeline protocol (#9516) * Add getter for pipeline protocol * Refactoring: extract bare string into constant * Report pipeline protocol to Monitoring as part of logstash_state docs * Report all protocols, not just first one * Refactoring: rename constant to be more descriptive * De-dupe protocols list before sending to monitoring * Checking for single protocol * Adding missing require This was being masked and was uncovered when this test suite was run by itself * Raising error if multiple protocols are specified * Adding back comma --- .../lib/logstash/config/pipeline_config.rb | 13 +++++++++++-- .../spec/logstash/config/pipeline_config_spec.rb | 11 ++++++++++- .../lib/config_management/elasticsearch_source.rb | 3 ++- .../inputs/metrics/state_event_factory.rb | 1 + 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/logstash-core/lib/logstash/config/pipeline_config.rb b/logstash-core/lib/logstash/config/pipeline_config.rb index d93b444b4..d8e4a6abd 100644 --- a/logstash-core/lib/logstash/config/pipeline_config.rb +++ b/logstash-core/lib/logstash/config/pipeline_config.rb @@ -5,14 +5,23 @@ module LogStash module Config class PipelineConfig include LogStash::Util::Loggable - attr_reader :source, :pipeline_id, :config_parts, :settings, :read_at + attr_reader :source, :pipeline_id, :config_parts, :protocol, :settings, :read_at def initialize(source, pipeline_id, config_parts, settings) + config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts] + unique_protocols = config_parts_array + .map { |config_part| config_part.protocol.to_s } + .uniq + + if unique_protocols.length > 1 + raise(ArgumentError, "There should be exactly 1 unique protocol in config_parts. Found #{unique_protocols.length.to_s}.") + end + @source = source @pipeline_id = pipeline_id # We can't use Array() since config_parts may be a java object! - config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts] @config_parts = config_parts_array.sort_by { |config_part| [config_part.protocol.to_s, config_part.id] } + @protocol = unique_protocols[0] @settings = settings @read_at = Time.now end diff --git a/logstash-core/spec/logstash/config/pipeline_config_spec.rb b/logstash-core/spec/logstash/config/pipeline_config_spec.rb index d7ed1c565..5ccefab47 100644 --- a/logstash-core/spec/logstash/config/pipeline_config_spec.rb +++ b/logstash-core/spec/logstash/config/pipeline_config_spec.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/config/pipeline_config" require "logstash/config/source/local" +require_relative "../../support/helpers" describe LogStash::Config::PipelineConfig do let(:source) { LogStash::Config::Source::Local } @@ -13,7 +14,6 @@ describe LogStash::Config::PipelineConfig do org.logstash.common.SourceWithMetadata.new("file", "/tmp/4", 0, 0, "input { generator4 }"), org.logstash.common.SourceWithMetadata.new("file", "/tmp/5", 0, 0, "input { generator5 }"), org.logstash.common.SourceWithMetadata.new("file", "/tmp/6", 0, 0, "input { generator6 }"), - org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator1 }"), ] end @@ -72,4 +72,13 @@ describe LogStash::Config::PipelineConfig do end end end + + it "returns the pipeline's protocol" do + expect(subject.protocol).to eq((ordered_config_parts.uniq { | config_part | config_part.protocol })[0].protocol) + end + + it "raises an ArgumentError when multiple protocols are supplied" do + unordered_config_parts << org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator0 }") + expect { described_class.new(source, pipeline_id, unordered_config_parts, settings) }.to raise_error ArgumentError, /.+Found 2\./ + end end diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 9d738e322..951320290 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -32,6 +32,7 @@ module LogStash queue.max_bytes queue.checkpoint.writes ) + CENTRALLY_MANAGED_PIPELINE_PROTOCOL = "x-pack-config-management" def initialize(settings) super(settings) @@ -98,7 +99,7 @@ module LogStash raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty? - config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string) + config_part = org.logstash.common.SourceWithMetadata.new(CENTRALLY_MANAGED_PIPELINE_PROTOCOL, pipeline_id.to_s, config_string) # We don't support multiple pipelines, so use the global settings from the logstash.yml file settings = @settings.clone diff --git a/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb b/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb index d8abf2336..933cde95f 100644 --- a/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb +++ b/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb @@ -27,6 +27,7 @@ module LogStash; module Inputs; class Metrics; "id" => pipeline.pipeline_id, "hash" => pipeline.lir.unique_hash, "ephemeral_id" => pipeline.ephemeral_id, + "protocol" => pipeline.pipeline_config.protocol, "workers" => pipeline.settings.get("pipeline.workers"), "batch_size" => pipeline.settings.get("pipeline.batch.size"), "representation" => ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(pipeline.lir)