logstash/x-pack/lib/config_management/elasticsearch_source.rb
kaisecheng e896cd727d
CPM handle 404 response gracefully with user-friendly log (#17052)
Starting from es-output 12.0.2, a 404 response is treated as an error. Previously, central pipeline management considered 404 as an empty pipeline, not an error.

This commit restores the expected behavior by handling 404 gracefully and logs a user-friendly message.
It also removes the redundant cache of pipeline in CPM

Fixes: #17035
2025-02-17 11:08:19 +00:00

318 lines
13 KiB
Ruby

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
require "logstash/config/source/base"
require "logstash/config/source_loader"
require "logstash/outputs/elasticsearch"
require "logstash/json"
require 'helpers/elasticsearch_options'
require 'helpers/loggable_try'
require "license_checker/licensed"
module LogStash
module ConfigManagement
class ElasticsearchSource < LogStash::Config::Source::Base
include LogStash::Util::Loggable, LogStash::LicenseChecker::Licensed,
LogStash::Helpers::ElasticsearchOptions
class RemoteConfigError < LogStash::Error; end
# exclude basic
VALID_LICENSES = %w(trial standard gold platinum enterprise)
FEATURE_INTERNAL = 'management'
FEATURE_EXTERNAL = 'logstash'
SUPPORTED_PIPELINE_SETTINGS = %w(
pipeline.workers
pipeline.batch.size
pipeline.batch.delay
pipeline.ecs_compatibility
pipeline.ordered
queue.type
queue.max_bytes
queue.checkpoint.writes
)
def initialize(settings)
super(settings)
if enabled?
@es_options = es_options_from_settings('management', settings)
setup_license_checker(FEATURE_INTERNAL)
license_check(true)
end
end
def match?
@settings.get("xpack.management.enabled")
end
def config_conflict?
false
end
# decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version
def get_pipeline_fetcher(es_version)
(es_version[:major] >= 8 || (es_version[:major] == 7 && es_version[:minor] >= 10)) ? SystemIndicesFetcher.new : LegacyHiddenIndicesFetcher.new
end
def pipeline_configs
logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids)
license_check(true)
es_version = get_es_version
fetcher = get_pipeline_fetcher(es_version)
begin
fetcher.fetch_config(es_version, pipeline_ids, client)
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
# es-output 12.0.2 throws 404 as error, but we want to handle it as empty config
return [] if e.response_code == 404
raise e
end
fetcher.get_pipeline_ids.collect do |pid|
get_pipeline(pid, fetcher)
end.compact
end
def get_es_version
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch ES version from Central Management')
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
client.get("/")
}
if response["error"]
raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end
logger.debug("Reading configuration from Elasticsearch version {}", response["version"]["number"])
version_number = response["version"]["number"].split(".")
{ major: version_number[0].to_i, minor: version_number[1].to_i }
end
def get_pipeline(pipeline_id, fetcher)
config_string = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline"]
pipeline_metadata_str = (fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_metadata"] || "").to_s
raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string, pipeline_metadata_str)
# We don't support multiple pipelines, so use the global settings from the logstash.yml file
settings = @settings.clone
settings.set("pipeline.id", pipeline_id)
# override global settings with pipeline settings from ES, if any
pipeline_settings = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_settings"]
unless pipeline_settings.nil?
pipeline_settings.each do |setting, value|
if SUPPORTED_PIPELINE_SETTINGS.include? setting
settings.set(setting, value) if value
else
logger.warn("Ignoring unsupported or unknown pipeline settings '#{setting}'")
end
end
end
Java::OrgLogstashConfigIr::PipelineConfig.new(self.class, pipeline_id.to_sym, [config_part], settings)
end
# This is a bit of a hack until we refactor the ElasticSearch plugins
# and extract correctly the http client, right now I am using the plugins
# to deal with the certificates and the other SSL options
#
# But we have to silence the logger from the plugin, to make sure the
# log originate from the `ElasticsearchSource`
def build_client
es = LogStash::Outputs::ElasticSearch.new(es_options_with_product_origin_header(@es_options))
new_logger = logger
es.instance_eval { @logger = new_logger }
es.build_client
end
def populate_license_state(xpack_info, is_serverless)
if xpack_info.failed?
{
:state => :error,
:log_level => :error,
:log_message => "Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster."
}
elsif !xpack_info.installed?
{
:state => :error,
:log_level => :error,
:log_message => "X-Pack is installed on Logstash but not on Elasticsearch. Please install X-Pack on Elasticsearch to use the monitoring feature. Other features may be available."
}
elsif !xpack_info.feature_enabled?("security")
{
:state => :error,
:log_level => :error,
:log_message => "X-Pack Security needs to be enabled in Elasticsearch. Please set xpack.security.enabled: true in elasticsearch.yml."
}
elsif !xpack_info.license_available?
{
:state => :error,
:log_level => :error,
:log_message => 'Configuration Management is not available: License information is currently unavailable. Please make sure you have added your production elasticsearch connection info in the xpack.monitoring.elasticsearch settings.'
}
elsif !xpack_info.license_one_of?(VALID_LICENSES)
{
:state => :error,
:log_level => :error,
:log_message => "Configuration Management is not available: #{xpack_info.license_type} is not a valid license for this feature."
}
elsif !xpack_info.license_active?
{
:state => :ok,
:log_level => :warn,
:log_message => 'Configuration Management feature requires a valid license. You can continue to monitor Logstash, but please contact your administrator to update your license'
}
else
unless xpack_info.feature_enabled?(FEATURE_EXTERNAL)
logger.warn('Central Pipeline Management is enabled in Logstash, but not enabled in Elasticsearch')
end
{ :state => :ok, :log_level => :info, :log_message => 'Configuration Management License OK' }
end
end
alias_method :enabled?, :match?
private
def pipeline_ids
@settings.get("xpack.management.pipeline.id")
end
def client
@client ||= build_client
end
end
module Fetcher
include LogStash::Util::Loggable
def get_pipeline_ids
@pipelines.keys
end
def fetch_config(es_version, pipeline_ids, client) end
def get_single_pipeline_setting(pipeline_id) end
def log_pipeline_not_found(pipeline_ids)
logger.debug("Could not find a remote configuration for specific `pipeline_id`", :pipeline_ids => pipeline_ids) if pipeline_ids.any?
end
end
class SystemIndicesFetcher
include LogStash::Util::Loggable, Fetcher
SYSTEM_INDICES_API_PATH = "_logstash/pipeline"
def fetch_config(es_version, pipeline_ids, client)
es_supports_pipeline_wildcard_search = es_supports_pipeline_wildcard_search?(es_version)
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management')
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
path = es_supports_pipeline_wildcard_search ?
"#{SYSTEM_INDICES_API_PATH}?id=#{ERB::Util.url_encode(pipeline_ids.join(","))}" :
"#{SYSTEM_INDICES_API_PATH}/"
client.get(path)
}
if response["error"]
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end
@pipelines = es_supports_pipeline_wildcard_search ?
response :
get_wildcard_pipelines(pipeline_ids, response)
end
def es_supports_pipeline_wildcard_search?(es_version)
(es_version[:major] > 8) || (es_version[:major] == 8 && es_version[:minor] >= 3)
end
def get_single_pipeline_setting(pipeline_id)
@pipelines.fetch(pipeline_id, {})
end
private
# get pipelines if pipeline_ids match wildcard patterns
# split user pipeline id setting into wildcard and non wildcard pattern
# take the non wildcard pipelines. take the wildcard pipelines by matching with glob pattern
def get_wildcard_pipelines(pipeline_ids, response)
wildcard_patterns, fix_pids = pipeline_ids.partition { |pattern| pattern.include?("*")}
fix_id_pipelines = fix_pids.map { |id|
response.has_key?(id) ? {id => response[id]} : {}
}.reduce({}, :merge)
fix_id_pipelines.keys.map { |id| response.delete(id)}
wildcard_matched_patterns = Set.new
wildcard_pipelines = response.keys.map { |id|
found_pattern = wildcard_patterns.any? { |pattern|
matched = ::File::fnmatch?(pattern, id)
wildcard_matched_patterns << pattern if matched
matched
}
found_pattern ? {id => response[id]} : {}
}.reduce({}, :merge)
log_pipeline_not_found((fix_pids - fix_id_pipelines.keys) + (wildcard_patterns - wildcard_matched_patterns.to_a))
fix_id_pipelines.merge(wildcard_pipelines)
end
end
# clean up LegacyHiddenIndicesFetcher https://github.com/elastic/logstash/issues/12291
class LegacyHiddenIndicesFetcher
include LogStash::Util::Loggable, Fetcher
PIPELINE_INDEX = ".logstash"
def fetch_config(es_version, pipeline_ids, client)
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } })
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management')
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string)
}
if response["error"]
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end
if response["docs"].nil?
logger.debug("Server returned an unknown or malformed document structure", :response => response)
raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure"
end
@pipelines = format_response(response)
log_wildcard_unsupported(pipeline_ids)
log_pipeline_not_found(pipeline_ids - @pipelines.keys)
@pipelines
end
def get_single_pipeline_setting(pipeline_id)
@pipelines.fetch(pipeline_id, {}).fetch("_source", {})
end
private
# transform legacy response to be similar to system indices response
def format_response(response)
response["docs"].map { |pipeline|
{pipeline["_id"] => pipeline} if pipeline.fetch("found", false)
}.compact
.reduce({}, :merge)
end
def log_wildcard_unsupported(pipeline_ids)
has_wildcard = pipeline_ids.any? { |id| id.include?("*") }
if has_wildcard
logger.warn("wildcard '*' in xpack.management.pipeline.id is not supported in Elasticsearch version < 7.10")
end
end
end
end
end