diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 9e04121fc..fa5c3736c 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -18,7 +18,6 @@ module LogStash class RemoteConfigError < LogStash::Error; end - PIPELINE_INDEX = ".logstash" # exclude basic VALID_LICENSES = %w(trial standard gold platinum enterprise) FEATURE_INTERNAL = 'management' @@ -50,6 +49,21 @@ module LogStash false end + # decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version + def get_pipeline_fetcher + response = client.get("/") + + if response["error"] + raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" + end + + logger.debug("Reading configuration from Elasticsearch version {}", response["version"]["number"]) + version_number = response["version"]["number"].split(".") + first = version_number[0].to_i + second = version_number[1].to_i + (first >= 8 || (first == 7 && second >= 10))? SystemIndicesFetcher.new: LegacyHiddenIndicesFetcher.new + end + def pipeline_configs logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids) @@ -63,33 +77,21 @@ module LogStash end end - response = fetch_config(pipeline_ids) + fetcher = get_pipeline_fetcher + fetcher.fetch_config(pipeline_ids, client) - if response["error"] - raise RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" - end - - if response["docs"].nil? - logger.debug("Server returned an unknown or malformed document structure", :response => response) - raise RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" - end - - # Cache pipelines to handle the case where a remote configuration error can render a pipeline unusable - # it is not reloadable - @cached_pipelines = response["docs"].collect do |document| - get_pipeline(document) + @cached_pipelines = pipeline_ids.collect do |pid| + get_pipeline(pid, fetcher) end.compact end - def get_pipeline(response) - pipeline_id = response["_id"] - - if response["found"] == false + def get_pipeline(pipeline_id, fetcher) + unless fetcher.config_exist?(pipeline_id) logger.debug("Could not find a remote configuration for a specific `pipeline_id`", :pipeline_id => pipeline_id) return nil end - config_string = response.fetch("_source", {})["pipeline"] + config_string = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline"] raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty? @@ -100,7 +102,7 @@ module LogStash settings.set("pipeline.id", pipeline_id) # override global settings with pipeline settings from ES, if any - pipeline_settings = response["_source"]["pipeline_settings"] + pipeline_settings = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_settings"] unless pipeline_settings.nil? pipeline_settings.each do |setting, value| if SUPPORTED_PIPELINE_SETTINGS.include? setting @@ -127,15 +129,6 @@ module LogStash es.build_client end - def fetch_config(pipeline_ids) - request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } }) - client.post(config_path, {}, request_body_string) - end - - def config_path - "#{PIPELINE_INDEX}/_mget" - end - def populate_license_state(xpack_info) if xpack_info.failed? { @@ -193,5 +186,72 @@ module LogStash @client ||= build_client end end + + module Fetcher + def config_exist?(pipeline_id) + @response.has_key?(pipeline_id) + end + + def fetch_config(pipeline_ids, client) end + def get_single_pipeline_setting(pipeline_id) end + end + + class SystemIndicesFetcher + include LogStash::Util::Loggable, Fetcher + + SYSTEM_INDICES_API_PATH = "_logstash/pipeline" + + def fetch_config(pipeline_ids, client) + path_ids = pipeline_ids.join(",") + response = client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}") + + if response["error"] + raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" + end + + @response = response + end + + def get_single_pipeline_setting(pipeline_id) + @response.fetch(pipeline_id, {}) + end + end + + # clean up LegacyHiddenIndicesFetcher https://github.com/elastic/logstash/issues/12291 + class LegacyHiddenIndicesFetcher + include LogStash::Util::Loggable, Fetcher + + PIPELINE_INDEX = ".logstash" + + def fetch_config(pipeline_ids, client) + request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } }) + response = client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string) + + if response["error"] + raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" + end + + if response["docs"].nil? + logger.debug("Server returned an unknown or malformed document structure", :response => response) + raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure" + end + + @response = format_response(response) + end + + def get_single_pipeline_setting(pipeline_id) + @response.fetch(pipeline_id, {}).fetch("_source", {}) + end + + private + # transform legacy response to be similar to system indices response + def format_response(response) + response["docs"].map { |pipeline| + {pipeline["_id"] => pipeline} if pipeline.fetch("found", false) + }.compact + .reduce({}, :merge) + end + end + end end diff --git a/x-pack/qa/integration/management/multiple_pipelines_spec.rb b/x-pack/qa/integration/management/multiple_pipelines_spec.rb index b069fa89a..680f04976 100644 --- a/x-pack/qa/integration/management/multiple_pipelines_spec.rb +++ b/x-pack/qa/integration/management/multiple_pipelines_spec.rb @@ -17,7 +17,7 @@ describe "Read configuration from elasticsearch" do "hello" => nil } - cleanup_elasticsearch(".logstash*") + cleanup_system_indices(@pipelines.keys) cleanup_elasticsearch(".monitoring-logstash*") @pipelines.each do |pipeline_id, config| diff --git a/x-pack/qa/integration/management/read_configuration_spec.rb b/x-pack/qa/integration/management/read_configuration_spec.rb index 809d70c89..c60537f39 100644 --- a/x-pack/qa/integration/management/read_configuration_spec.rb +++ b/x-pack/qa/integration/management/read_configuration_spec.rb @@ -31,7 +31,7 @@ describe "Read configuration from elasticsearch" do def start_services(elasticsearch_options, logstash_options) @elasticsearch_service = elasticsearch(elasticsearch_options) - cleanup_elasticsearch(".logstash*") + cleanup_system_indices([PIPELINE_ID]) config = "input { generator { count => 100 } tcp { port => 6000 } } output { null {} }" push_elasticsearch_config(PIPELINE_ID, config) diff --git a/x-pack/qa/integration/support/helpers.rb b/x-pack/qa/integration/support/helpers.rb index 8b7e9dee4..bdc75ef73 100644 --- a/x-pack/qa/integration/support/helpers.rb +++ b/x-pack/qa/integration/support/helpers.rb @@ -8,6 +8,7 @@ require "elasticsearch" require "fileutils" require "stud/try" require "open3" +require "time" VERSIONS_YML_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "..", "versions.yml") VERSION_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "VERSION") @@ -94,8 +95,19 @@ def elasticsearch_client(options = { :url => "http://elastic:#{elastic_password} Elasticsearch::Client.new(options) end +def es_version + response = elasticsearch_client.perform_request(:get, "") + response.body["version"]["number"].gsub(/(\d+\.\d+)\..+/, '\1').to_f +end + def push_elasticsearch_config(pipeline_id, config) - elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config } + if es_version >= 7.10 + elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {}, + { :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" }, + :pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601}) + else + elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config } + end end def cleanup_elasticsearch(index = MONITORING_INDEXES) @@ -103,6 +115,17 @@ def cleanup_elasticsearch(index = MONITORING_INDEXES) elasticsearch_client.indices.refresh end +def cleanup_system_indices(pipeline_ids) + pipeline_ids.each do |id| + begin + elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}") + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + puts ".logstash can be empty #{e.message}" + end + end + elasticsearch_client.indices.refresh +end + def logstash_command_append(cmd, argument, value) if cmd !~ /#{Regexp.escape(argument)}/ cmd << " #{argument} #{value}" @@ -136,4 +159,4 @@ def verify_response!(cmd, response) unless response.successful? raise "Something went wrong when installing xpack,\ncmd: #{cmd}\nresponse: #{response}" end -end +end \ No newline at end of file diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index 927d853b2..28d10593a 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -12,6 +12,8 @@ require "monitoring/monitoring" require "stud/temporary" describe LogStash::ConfigManagement::ElasticsearchSource do + let(:system_indices_api) { LogStash::ConfigManagement::SystemIndicesFetcher::SYSTEM_INDICES_API_PATH } + let(:system_indices_url_regex) { Regexp.new("^#{system_indices_api}") } let(:elasticsearch_url) { ["https://localhost:9898"] } let(:elasticsearch_username) { "elastictest" } let(:elasticsearch_password) { "testchangeme" } @@ -41,7 +43,6 @@ describe LogStash::ConfigManagement::ElasticsearchSource do }" } - let(:valid_xpack_response) { { "license" => { @@ -83,7 +84,6 @@ describe LogStash::ConfigManagement::ElasticsearchSource do }") } - let(:settings) do { "xpack.management.enabled" => true, @@ -94,6 +94,38 @@ describe LogStash::ConfigManagement::ElasticsearchSource do } end + let(:es_version_response) { es_version_8_response } + let(:es_version_8_response) { generate_es_version_response("8.0.0-SNAPSHOT") } + let(:es_version_7_9_response) { generate_es_version_response("7.9.1") } + + let(:elasticsearch_7_9_err_response) { + {"error"=> + {"root_cause"=> + [{"type"=>"parse_exception", + "reason"=>"request body or source parameter is required"}], + "type"=>"parse_exception", + "reason"=>"request body or source parameter is required"}, + "status"=>400} + } + + let(:elasticsearch_8_err_response) { + {"error"=> + {"root_cause"=> + [{"type"=>"index_not_found_exception", + "reason"=>"no such index [.logstash]", + "resource.type"=>"index_expression", + "resource.id"=>".logstash", + "index_uuid"=>"_na_", + "index"=>".logstash"}], + "type"=>"index_not_found_exception", + "reason"=>"no such index [.logstash]", + "resource.type"=>"index_expression", + "resource.id"=>".logstash", + "index_uuid"=>"_na_", + "index"=>".logstash"}, + "status"=>404} + } + before do extension.additionals_settings(system_settings) apply_settings(settings, system_settings) @@ -162,22 +194,85 @@ describe LogStash::ConfigManagement::ElasticsearchSource do end end - describe "#config_path" do - before do - # we are testing the arguments here, not the behavior of the elasticsearch output - allow_any_instance_of(described_class).to receive(:build_client).and_return(nil) - end + describe LogStash::ConfigManagement::SystemIndicesFetcher do + subject { described_class.new } - let(:pipeline_id) { "foobar" } - let(:settings) do - { - "xpack.management.pipeline.id" => pipeline_id, - "xpack.management.elasticsearch.password" => "testpassword" + describe "system indices api" do + let(:mock_client) { double("http_client") } + let(:config) { "input { generator { count => 100 } tcp { port => 6005 } } output { }}" } + let(:pipeline_id) { "super_generator" } + let(:elasticsearch_response) { {"#{pipeline_id}"=> {"pipeline"=> "#{config}"}} } + + it "#fetch_config" do + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/#{pipeline_id}").and_return(elasticsearch_response) + expect(subject.fetch_config([pipeline_id], mock_client)).to eq(elasticsearch_response) + expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline"=> "#{config}"}) + end + + it "#fetch_config should raise error" do + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(elasticsearch_8_err_response) + expect{ subject.fetch_config(["apache", "nginx"], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + end + end + end + + describe LogStash::ConfigManagement::LegacyHiddenIndicesFetcher do + subject { described_class.new } + + describe "legacy api" do + let(:mock_client) { double("http_client") } + let(:config) { "input { generator { count => 100 } tcp { port => 6005 } } output { }}" } + let(:another_config) { "input { generator { count => 100 } tcp { port => 6006 } } output { }}" } + let(:pipeline_id) { "super_generator" } + let(:another_pipeline_id) { "another_generator" } + let(:elasticsearch_response) { + {"docs"=> + [{"_index"=>".logstash", + "_id"=>"#{pipeline_id}", + "_version"=>2, + "_seq_no"=>2, + "_primary_term"=>1, + "found"=>true, + "_source"=> + {"pipeline"=> "#{config}"}}, + {"_index"=>".logstash", + "_id"=>"#{another_pipeline_id}", + "_version"=>2, + "_seq_no"=>3, + "_primary_term"=>1, + "found"=>true, + "_source"=> + {"pipeline"=> "#{another_config}"}}, + {"_index"=>".logstash", "_id"=>"not_exists", "found"=>false}]} } - end - it "generates the path to get the configuration" do - expect(subject.config_path).to eq("#{described_class::PIPELINE_INDEX}/_mget") + let(:formatted_es_response) { + {"super_generator"=>{"_index"=>".logstash", "_id"=>"super_generator", "_version"=>2, "_seq_no"=>2, "_primary_term"=>1, "found"=>true, "_source"=>{"pipeline"=>"input { generator { count => 100 } tcp { port => 6005 } } output { }}"}}} + } + + it "#fetch_config" do + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(elasticsearch_response) + expect(subject.fetch_config([pipeline_id, another_pipeline_id], mock_client).size).to eq(2) + expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"}) + expect(subject.get_single_pipeline_setting(another_pipeline_id)).to eq({"pipeline" => "#{another_config}"}) + end + + it "#fetch_config should raise error" do + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(elasticsearch_7_9_err_response) + expect{ subject.fetch_config([pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + end + + it "#fetch_config should raise error when response is empty" do + expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(LogStash::Json.load("{}")) + expect{ subject.fetch_config([pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + end + + it "#format_response should return pipelines" do + result = subject.send(:format_response, elasticsearch_response) + expect(result.size).to eq(2) + expect(result.has_key?(pipeline_id)).to be_truthy + expect(result.has_key?(another_pipeline_id)).to be_truthy + end end end @@ -216,267 +311,312 @@ describe LogStash::ConfigManagement::ElasticsearchSource do let(:pipeline_id) { "apache" } let(:mock_client) { double("http_client") } let(:settings) { super.merge({ "xpack.management.pipeline.id" => pipeline_id }) } - let(:es_path) { "#{described_class::PIPELINE_INDEX}/_mget" } + let(:config) { "input { generator {} } filter { mutate {} } output { }" } + let(:elasticsearch_response) { elasticsearch_8_response } + let(:elasticsearch_8_response) { + "{\"#{pipeline_id}\":{ + \"username\":\"log.stash\", + \"modified_timestamp\":\"2017-02-28T23:02:17.023Z\", + \"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"}, + \"pipeline\":\"#{config}\", + \"pipeline_settings\":{\"pipeline.batch.delay\":\"50\", \"pipeline.workers\":\"99\", \"pipeline.output.workers\":\"99\", \"nonsensical.invalid.setting\":\"-9999\"}}}" } + + let(:elasticsearch_7_9_response) { + "{ \"docs\":[{ + \"_index\":\".logstash\", + \"_type\":\"pipelines\", + \"_id\":\"#{pipeline_id}\", + \"_version\":8, + \"found\":true, + \"_source\":{ + \"id\":\"apache\", + \"description\":\"Process apache logs\", + \"modified_timestamp\":\"2017-02-28T23:02:17.023Z\", + \"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"}, + \"pipeline\":\"#{config}\", + \"pipeline_settings\":{\"pipeline.workers\":\"99\", \"pipeline.output.workers\":\"99\", \"nonsensical.invalid.setting\":\"-9999\"}}}]}" } + let(:es_path) { ".logstash/_mget" } let(:request_body_string) { LogStash::Json.dump({ "docs" => [{ "_id" => pipeline_id }] }) } before do - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_response)) + allow(mock_client).to receive(:get).with(system_indices_url_regex).and_return(LogStash::Json.load(elasticsearch_response)) + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response) allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:client).and_return(mock_license_client) - end - context "with one `pipeline_id` configured" do - context "when successfully fetching a remote configuration" do + describe "system indices [8] and legacy api [7.9]" do + [8, 7.9].each { |es_version| + let(:elasticsearch_response) { (es_version >= 8)? elasticsearch_8_response: elasticsearch_7_9_response } before :each do - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + allow(mock_client).to receive(:get).with("/").and_return(es_version >= 8? es_version_response: es_version_7_9_response) end - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:whitelisted_pipeline_setting_name) {"pipeline.workers"} - let(:whitelisted_pipeline_setting_value) {"99"} - let(:non_whitelisted_pipeline_setting_name) {"pipeline.output.workers"} - let(:non_whitelisted_pipeline_setting_value) {"99"} - let(:invalid_pipeline_setting) {"nonsensical.invalid.setting"} - let(:elasticsearch_response) { - %{ - { "docs": [{ - "_index":".logstash", - "_type":"pipelines", - "_id":"#{pipeline_id}", - "_version":8, - "found":true, - "_source": { - "id":"apache", - "description":"Process apache logs", - "modified_timestamp":"2017-02-28T23:02:17.023Z", - "pipeline_metadata":{ - "version":5, - "type":"logstash_pipeline", - "username":"elastic" - }, - "pipeline":"#{config}", - "pipeline_settings": { - "#{whitelisted_pipeline_setting_name}":#{whitelisted_pipeline_setting_value}, - "#{non_whitelisted_pipeline_setting_name}":#{non_whitelisted_pipeline_setting_value}, - "#{invalid_pipeline_setting}":-9999 - } + context "with one `pipeline_id` configured [#{es_version}]" do + context "when successfully fetching a remote configuration" do + before :each do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) + end + + let(:config) { "input { generator {} } filter { mutate {} } output { }" } + + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs + + expect(pipeline_config.first.config_string).to match(config) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + end + + it "ignores non-whitelisted and invalid settings" do + pipeline_config = subject.pipeline_configs + settings_hash = pipeline_config[0].settings.to_hash + + expect(settings_hash["pipeline.workers"]).to eq(99) + expect(settings_hash["pipeline.output.workers"]).not_to eq(99) + expect(settings_hash["nonsensical.invalid.setting"]).to be_falsey + end + end + + context "when the license has expired [#{es_version}]" do + let(:license_status) { 'expired'} + let(:license_expiry_date) { Time.now - (60 * 60 * 24)} + + before :each do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + end + + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs + + expect(pipeline_config.first.config_string).to match(config) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + end + end + + context "when the license server is not available [#{es_version}]" do + before :each do + allow(mock_license_client).to receive(:get).with('_xpack').and_raise("An error is here") + allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) + end + + it 'should raise an error' do + expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end + + context "when the xpack is not installed [#{es_version}]" do + before :each do + expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response) + allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) + end + + it 'should raise an error' do + expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end + + describe "security enabled/disabled in Elasticsearch [#{es_version}]" do + let(:xpack_response) do + { + "license"=> { + "status"=> license_status, + "uid"=> "9a48c67c-ce2c-4169-97bf-37d324b8ab80", + "type"=> license_type, + "expiry_date_in_millis"=> license_expiry_in_millis + }, + "features" => { + "security" => { + "description" => "Security for the Elastic Stack", + "available" => true, + "enabled" => security_enabled + } + } } - }] - } - } - } + end - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs + before :each do + allow(mock_license_client).to receive(:get).with('_xpack').and_return(xpack_response) + allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) + end - expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) - end + context "when security is disabled in Elasticsearch [#{es_version}]" do + let(:security_enabled) { false } + it 'should raise an error' do + expect { subject.pipeline_configs }.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end - it "ignores non-whitelisted and invalid settings" do - pipeline_config = subject.pipeline_configs - settings_hash = pipeline_config[0].settings.to_hash + context "when security is enabled in Elasticsearch [#{es_version}]" do + let(:security_enabled) { true } + it 'should not raise an error' do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + expect { subject.pipeline_configs }.not_to raise_error + end + end + end - expect(settings_hash[whitelisted_pipeline_setting_name]).to eq(whitelisted_pipeline_setting_value.to_i) - expect(settings_hash[non_whitelisted_pipeline_setting_name]).not_to eq(non_whitelisted_pipeline_setting_value.to_i) - expect(settings_hash[invalid_pipeline_setting]).to be_falsey - end - end - context 'when the license has expired' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } - let(:license_status) { 'expired'} - let(:license_expiry_date) { Time.now - (60 * 60 * 24)} + context "With an invalid basic license, it should raise an error [#{es_version}]" do + let(:license_type) { 'basic' } - before :each do - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - end + it 'should raise an error' do + expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) + end + end - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs + # config management can be used with any license type except basic + (::LogStash::LicenseChecker::LICENSE_TYPES - ["basic"]).each do |license_type| + context "With a valid #{license_type} license, it should return a pipeline [#{es_version}]" do + before do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + end - expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) - end - end + let(:license_type) { license_type } - context 'when the license server is not available' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs - before :each do - allow(mock_license_client).to receive(:get).with('_xpack').and_raise("An error is here") - allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) - end - - it 'should raise an error' do - expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) - end - end - - context 'when the xpack is not installed' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } - - before :each do - expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response) - allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) - end - - it 'should raise an error' do - expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) - end - end - - describe 'security enabled/disabled in Elasticsearch' do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } - - let(:xpack_response) do - { - "license"=> { - "status"=> license_status, - "uid"=> "9a48c67c-ce2c-4169-97bf-37d324b8ab80", - "type"=> license_type, - "expiry_date_in_millis"=> license_expiry_in_millis - }, - "features" => { - "security" => { - "description" => "Security for the Elastic Stack", - "available" => true, - "enabled" => security_enabled - } - } - } - end - - before :each do - allow(mock_license_client).to receive(:get).with('_xpack').and_return(xpack_response) - allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:build_client).and_return(mock_license_client) - end - - context 'when security is disabled in Elasticsearch' do - let(:security_enabled) { false } - it 'should raise an error' do - expect { subject.pipeline_configs }.to raise_error(LogStash::LicenseChecker::LicenseError) + expect(pipeline_config.first.config_string).to match(config) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + end + end end end - context 'when security is enabled in Elasticsearch' do - let(:security_enabled) { true } - it 'should not raise an error' do + context "with multiples `pipeline_id` configured [#{es_version}]" do + before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - expect { subject.pipeline_configs }.not_to raise_error + end + + context "when successfully fetching multiple remote configuration" do + let(:pipelines) do + { + "apache" => config_apache, + "firewall" => config_firewall + } + end + let(:pipeline_id) { pipelines.keys } + + let(:config_apache) { "input { generator { id => '123'} } filter { mutate {} } output { }" } + let(:config_firewall) { "input { generator { id => '321' } } filter { mutate {} } output { }" } + let(:elasticsearch_response) do + content = "{" + content << pipelines.collect do |pipeline_id, config| + "\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}" + end.join(",") + content << "}" + content + end + + let(:elasticsearch_7_9_response) do + content = "{ \"docs\":[" + content << pipelines.collect do |pipeline_id, config| + "{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}" + end.join(",") + content << "]}" + content + end + let(:request_body_string) { LogStash::Json.dump({ "docs" => pipeline_id.collect { |pipeline_id| { "_id" => pipeline_id } } }) } + + it "returns a valid pipeline config" do + pipeline_config = subject.pipeline_configs + + expect(pipeline_config.collect(&:config_string)).to include(*pipelines.values) + expect(pipeline_config.map(&:pipeline_id).collect(&:to_sym)).to include(*pipelines.keys.collect(&:to_sym)) + end end end - end - - context "With an invalid basic license, it should raise an error" do - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } - let(:license_type) { 'basic' } - - it 'should raise an error' do - expect{subject.pipeline_configs}.to raise_error(LogStash::LicenseChecker::LicenseError) - end - end - - # config management can be used with any license type except basic - (::LogStash::LicenseChecker::LICENSE_TYPES - ["basic"]).each do |license_type| - context "With a valid #{license_type} license, it should return a pipeline" do + context "when the configuration is not found [#{es_version}]" do + let(:elasticsearch_8_response) { "{}" } + let(:elasticsearch_7_9_response) { "{ \"docs\": [{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"donotexist\",\"found\":false}]}" } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end - let(:config) { "input { generator {} } filter { mutate {} } output { }" } - let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" } - let(:license_type) { license_type } - - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs - - expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) + it "returns no pipeline config" do + expect(subject.pipeline_configs).to be_empty end end - end + + context "when any error returned from elasticsearch [#{es_version}]" do + let(:elasticsearch_8_response){"{\"error\" : \"no handler found for uri [/_logstash/pipelines?pretty] and method [GET]\"}" } + let(:elasticsearch_7_9_response) { '{ "error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"}],"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"},"status":400}' } + + before do + expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) + end + + it "raises a `RemoteConfigError`" do + expect { subject.pipeline_configs }.to raise_error LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError + end + end + + } end - context "with multiples `pipeline_id` configured" do - - before do - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_response)) - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - end - - context "when successfully fetching multiple remote configuration" do - let(:pipelines) do - { - "apache" => config_apache, - "firewall" => config_firewall - } - end - - let(:config_apache) { "input { generator { id => '123'} } filter { mutate {} } output { }" } - let(:config_firewall) { "input { generator { id => '321' } } filter { mutate {} } output { }" } - let(:elasticsearch_response) do - content = "{ \"docs\":[" - content << pipelines.collect do |pipeline_id, config| - "{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}" - end.join(",") - content << "]}" - content - end - - it "returns a valid pipeline config" do - pipeline_config = subject.pipeline_configs - - expect(pipeline_config.collect(&:config_string)).to include(*pipelines.values) - expect(pipeline_config.map(&:pipeline_id).collect(&:to_sym)).to include(*pipelines.keys.collect(&:to_sym)) - end - end - end - - context "when the configuration is not found" do - let(:elasticsearch_response) { "{ \"docs\": [{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"donotexist\",\"found\":false}]}" } - + describe "create pipeline fetcher by es version" do before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end - it "returns no pipeline config" do - expect(subject.pipeline_configs).to be_empty + it "should give SystemIndicesFetcher in [8]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + expect(subject.get_pipeline_fetcher).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher + end + + it "should give SystemIndicesFetcher in [7.10]" do + allow(mock_client).to receive(:get).with("/").and_return(generate_es_version_response("7.10.0-SNAPSHOT")) + expect(subject.get_pipeline_fetcher).to be_an_instance_of LogStash::ConfigManagement::SystemIndicesFetcher + end + + it "should give LegacyHiddenIndicesFetcher in [7.9]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) + expect(subject.get_pipeline_fetcher).to be_an_instance_of LogStash::ConfigManagement::LegacyHiddenIndicesFetcher end end - context "when any error returned from elasticsearch" do - let(:elasticsearch_response){'{ "error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"}],"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"},"status":400}' } - - before do - expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) - end - - it "raises a `RemoteConfigError`" do - expect { subject.pipeline_configs }.to raise_error /illegal_argument_exception/ - end - end - - context "when exception occur" do + describe "when exception occur" do let(:elasticsearch_response) { "" } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) end - it "raises the exception upstream" do + it "raises the exception upstream in [8]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + allow(mock_client).to receive(:get).with(system_indices_url_regex).and_raise("Something bad") + expect { subject.pipeline_configs }.to raise_error /Something bad/ + end + + + it "raises the exception upstream in [7.9]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) expect(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_raise("Something bad") expect { subject.pipeline_configs }.to raise_error /Something bad/ end end + + end + + def generate_es_version_response(version) + {"name"=>"MacBook-Pro", + "cluster_name"=>"elasticsearch", + "cluster_uuid"=>"YgpKq8VkTJuGTSb9aidlIA", + "version"=> + {"number"=>"#{version}", + "build_flavor"=>"default", + "build_type"=>"tar", + "build_hash"=>"26eb422dc55236a1c5625e8a73e5d866e54610a2", + "build_date"=>"2020-09-24T09:37:06.459350Z", + "build_snapshot"=>true, + "lucene_version"=>"8.7.0", + "minimum_wire_compatibility_version"=>"7.10.0", + "minimum_index_compatibility_version"=>"7.0.0"}, + "tagline"=>"You Know, for Search"} end end