mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
add wildcard support in xpack pipeline id do the pattern matching with glob add warning msg to wildcard with legacy api check invalid pipeline in bootstrap test cases for invalid checking
296 lines
11 KiB
Ruby
296 lines
11 KiB
Ruby
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
# or more contributor license agreements. Licensed under the Elastic License;
|
|
# you may not use this file except in compliance with the Elastic License.
|
|
|
|
require "logstash/config/source/base"
|
|
require "logstash/config/source_loader"
|
|
require "logstash/outputs/elasticsearch"
|
|
require "logstash/json"
|
|
require 'helpers/elasticsearch_options'
|
|
require "license_checker/licensed"
|
|
|
|
|
|
module LogStash
|
|
module ConfigManagement
|
|
class ElasticsearchSource < LogStash::Config::Source::Base
|
|
include LogStash::Util::Loggable, LogStash::LicenseChecker::Licensed,
|
|
LogStash::Helpers::ElasticsearchOptions
|
|
|
|
class RemoteConfigError < LogStash::Error; end
|
|
|
|
# exclude basic
|
|
VALID_LICENSES = %w(trial standard gold platinum enterprise)
|
|
FEATURE_INTERNAL = 'management'
|
|
FEATURE_EXTERNAL = 'logstash'
|
|
SUPPORTED_PIPELINE_SETTINGS = %w(
|
|
pipeline.workers
|
|
pipeline.batch.size
|
|
pipeline.batch.delay
|
|
queue.type
|
|
queue.max_bytes
|
|
queue.checkpoint.writes
|
|
)
|
|
|
|
def initialize(settings)
|
|
super(settings)
|
|
|
|
if enabled?
|
|
@es_options = es_options_from_settings('management', settings)
|
|
setup_license_checker(FEATURE_INTERNAL)
|
|
license_check(true)
|
|
end
|
|
end
|
|
|
|
def match?
|
|
@settings.get("xpack.management.enabled")
|
|
end
|
|
|
|
def config_conflict?
|
|
false
|
|
end
|
|
|
|
# decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version
|
|
def get_pipeline_fetcher
|
|
response = client.get("/")
|
|
|
|
if response["error"]
|
|
raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
|
|
end
|
|
|
|
logger.debug("Reading configuration from Elasticsearch version {}", response["version"]["number"])
|
|
version_number = response["version"]["number"].split(".")
|
|
first = version_number[0].to_i
|
|
second = version_number[1].to_i
|
|
(first >= 8 || (first == 7 && second >= 10))? SystemIndicesFetcher.new: LegacyHiddenIndicesFetcher.new
|
|
end
|
|
|
|
def pipeline_configs
|
|
logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids)
|
|
|
|
begin
|
|
license_check(true)
|
|
rescue LogStash::LicenseChecker::LicenseError => e
|
|
if @cached_pipelines.nil?
|
|
raise e
|
|
else
|
|
return @cached_pipelines
|
|
end
|
|
end
|
|
|
|
fetcher = get_pipeline_fetcher
|
|
fetcher.fetch_config(pipeline_ids, client)
|
|
|
|
@cached_pipelines = fetcher.get_pipeline_ids.collect do |pid|
|
|
get_pipeline(pid, fetcher)
|
|
end.compact
|
|
end
|
|
|
|
def get_pipeline(pipeline_id, fetcher)
|
|
config_string = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline"]
|
|
|
|
raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?
|
|
|
|
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string)
|
|
|
|
# We don't support multiple pipelines, so use the global settings from the logstash.yml file
|
|
settings = @settings.clone
|
|
settings.set("pipeline.id", pipeline_id)
|
|
|
|
# override global settings with pipeline settings from ES, if any
|
|
pipeline_settings = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_settings"]
|
|
unless pipeline_settings.nil?
|
|
pipeline_settings.each do |setting, value|
|
|
if SUPPORTED_PIPELINE_SETTINGS.include? setting
|
|
settings.set(setting, value) if value
|
|
else
|
|
logger.warn("Ignoring unsupported or unknown pipeline settings '#{setting}'")
|
|
end
|
|
end
|
|
end
|
|
|
|
Java::OrgLogstashConfigIr::PipelineConfig.new(self.class, pipeline_id.to_sym, [config_part], settings)
|
|
end
|
|
|
|
# This is a bit of a hack until we refactor the ElasticSearch plugins
|
|
# and extract correctly the http client, right now I am using the plugins
|
|
# to deal with the certificates and the other SSL options
|
|
#
|
|
# But we have to silence the logger from the plugin, to make sure the
|
|
# log originate from the `ElasticsearchSource`
|
|
def build_client
|
|
es = LogStash::Outputs::ElasticSearch.new(@es_options)
|
|
new_logger = logger
|
|
es.instance_eval { @logger = new_logger }
|
|
es.build_client
|
|
end
|
|
|
|
def populate_license_state(xpack_info)
|
|
if xpack_info.failed?
|
|
{
|
|
:state => :error,
|
|
:log_level => :error,
|
|
:log_message => "Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster."
|
|
}
|
|
elsif !xpack_info.installed?
|
|
{
|
|
:state => :error,
|
|
:log_level => :error,
|
|
:log_message => "X-Pack is installed on Logstash but not on Elasticsearch. Please install X-Pack on Elasticsearch to use the monitoring feature. Other features may be available."
|
|
}
|
|
elsif !xpack_info.feature_enabled?("security")
|
|
{
|
|
:state => :error,
|
|
:log_level => :error,
|
|
:log_message => "X-Pack Security needs to be enabled in Elasticsearch. Please set xpack.security.enabled: true in elasticsearch.yml."
|
|
}
|
|
elsif !xpack_info.license_available?
|
|
{
|
|
:state => :error,
|
|
:log_level => :error,
|
|
:log_message => 'Configuration Management is not available: License information is currently unavailable. Please make sure you have added your production elasticsearch connection info in the xpack.monitoring.elasticsearch settings.'
|
|
}
|
|
elsif !xpack_info.license_one_of?(VALID_LICENSES)
|
|
{
|
|
:state => :error,
|
|
:log_level => :error,
|
|
:log_message => "Configuration Management is not available: #{xpack_info.license_type} is not a valid license for this feature."
|
|
}
|
|
elsif !xpack_info.license_active?
|
|
{
|
|
:state => :ok,
|
|
:log_level => :warn,
|
|
:log_message => 'Configuration Management feature requires a valid license. You can continue to monitor Logstash, but please contact your administrator to update your license'
|
|
}
|
|
else
|
|
unless xpack_info.feature_enabled?(FEATURE_EXTERNAL)
|
|
logger.warn('Monitoring installed and enabled in Logstash, but not enabled in Elasticsearch')
|
|
end
|
|
|
|
{ :state => :ok, :log_level => :info, :log_message => 'Configuration Management License OK' }
|
|
end
|
|
end
|
|
|
|
alias_method :enabled?, :match?
|
|
|
|
private
|
|
def pipeline_ids
|
|
@settings.get("xpack.management.pipeline.id")
|
|
end
|
|
|
|
def client
|
|
@client ||= build_client
|
|
end
|
|
end
|
|
|
|
module Fetcher
|
|
include LogStash::Util::Loggable
|
|
|
|
def get_pipeline_ids
|
|
@pipelines.keys
|
|
end
|
|
|
|
def fetch_config(pipeline_ids, client) end
|
|
def get_single_pipeline_setting(pipeline_id) end
|
|
|
|
def log_pipeline_not_found(pipeline_ids)
|
|
logger.debug("Could not find a remote configuration for specific `pipeline_id`", :pipeline_ids => pipeline_ids) if pipeline_ids.any?
|
|
end
|
|
end
|
|
|
|
class SystemIndicesFetcher
|
|
include LogStash::Util::Loggable, Fetcher
|
|
|
|
SYSTEM_INDICES_API_PATH = "_logstash/pipeline"
|
|
|
|
def fetch_config(pipeline_ids, client)
|
|
response = client.get("#{SYSTEM_INDICES_API_PATH}/")
|
|
|
|
if response["error"]
|
|
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
|
|
end
|
|
|
|
@pipelines = get_wildcard_pipelines(pipeline_ids, response)
|
|
end
|
|
|
|
def get_single_pipeline_setting(pipeline_id)
|
|
@pipelines.fetch(pipeline_id, {})
|
|
end
|
|
|
|
private
|
|
# get pipelines if pipeline_ids match wildcard patterns
|
|
# split user pipeline id setting into wildcard and non wildcard pattern
|
|
# take the non wildcard pipelines. take the wildcard pipelines by matching with glob pattern
|
|
def get_wildcard_pipelines(pipeline_ids, response)
|
|
wildcard_patterns, fix_pids = pipeline_ids.partition { |pattern| pattern.include?("*")}
|
|
|
|
fix_id_pipelines = fix_pids.map { |id|
|
|
response.has_key?(id) ? {id => response[id]}: {}
|
|
}.reduce({}, :merge)
|
|
fix_id_pipelines.keys.map { |id| response.delete(id)}
|
|
|
|
wildcard_matched_patterns = Set.new
|
|
wildcard_pipelines = response.keys.map { |id|
|
|
found_pattern = wildcard_patterns.any? { |pattern|
|
|
matched = ::File::fnmatch?(pattern, id)
|
|
wildcard_matched_patterns << pattern if matched
|
|
matched
|
|
}
|
|
found_pattern ? {id => response[id]}: {}
|
|
}.reduce({}, :merge)
|
|
|
|
log_pipeline_not_found((fix_pids - fix_id_pipelines.keys) + (wildcard_patterns - wildcard_matched_patterns.to_a))
|
|
|
|
fix_id_pipelines.merge(wildcard_pipelines)
|
|
end
|
|
end
|
|
|
|
# clean up LegacyHiddenIndicesFetcher https://github.com/elastic/logstash/issues/12291
|
|
class LegacyHiddenIndicesFetcher
|
|
include LogStash::Util::Loggable, Fetcher
|
|
|
|
PIPELINE_INDEX = ".logstash"
|
|
|
|
def fetch_config(pipeline_ids, client)
|
|
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } })
|
|
response = client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string)
|
|
|
|
if response["error"]
|
|
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
|
|
end
|
|
|
|
if response["docs"].nil?
|
|
logger.debug("Server returned an unknown or malformed document structure", :response => response)
|
|
raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure"
|
|
end
|
|
|
|
@pipelines = format_response(response)
|
|
|
|
log_wildcard_unsupported(pipeline_ids)
|
|
log_pipeline_not_found(pipeline_ids - @pipelines.keys)
|
|
|
|
@pipelines
|
|
end
|
|
|
|
def get_single_pipeline_setting(pipeline_id)
|
|
@pipelines.fetch(pipeline_id, {}).fetch("_source", {})
|
|
end
|
|
|
|
private
|
|
# transform legacy response to be similar to system indices response
|
|
def format_response(response)
|
|
response["docs"].map { |pipeline|
|
|
{pipeline["_id"] => pipeline} if pipeline.fetch("found", false)
|
|
}.compact
|
|
.reduce({}, :merge)
|
|
end
|
|
|
|
def log_wildcard_unsupported(pipeline_ids)
|
|
has_wildcard = pipeline_ids.any? { |id| id.include?("*") }
|
|
if has_wildcard
|
|
logger.warn("wildcard '*' in xpack.management.pipeline.id is not supported in Elasticsearch version < 7.10")
|
|
end
|
|
end
|
|
end
|
|
|
|
end
|
|
end
|