mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
only start monitoring pipeline after valid license (#10106)
Avoid unecessary logging errors and resource usage by only starting the monitoring pipeline if we can validate the license.
This commit is contained in:
parent
5a7220f946
commit
fe7607abd4
14 changed files with 245 additions and 228 deletions
|
@ -96,3 +96,6 @@ logger.slowlog.level = trace
|
||||||
logger.slowlog.appenderRef.console_slowlog.ref = ${sys:ls.log.format}_console_slowlog
|
logger.slowlog.appenderRef.console_slowlog.ref = ${sys:ls.log.format}_console_slowlog
|
||||||
logger.slowlog.appenderRef.rolling_slowlog.ref = ${sys:ls.log.format}_rolling_slowlog
|
logger.slowlog.appenderRef.rolling_slowlog.ref = ${sys:ls.log.format}_rolling_slowlog
|
||||||
logger.slowlog.additivity = false
|
logger.slowlog.additivity = false
|
||||||
|
|
||||||
|
logger.licensereader.name = logstash.licensechecker.licensereader
|
||||||
|
logger.licensereader.level = error
|
||||||
|
|
|
@ -348,6 +348,8 @@ class LogStash::Runner < Clamp::StrictCommand
|
||||||
# lock path.data before starting the agent
|
# lock path.data before starting the agent
|
||||||
@data_path_lock = FileLockFactory.obtainLock(java.nio.file.Paths.get(setting("path.data")).to_absolute_path, ".lock")
|
@data_path_lock = FileLockFactory.obtainLock(java.nio.file.Paths.get(setting("path.data")).to_absolute_path, ".lock")
|
||||||
|
|
||||||
|
logger.info("Starting Logstash", "logstash.version" => LOGSTASH_VERSION)
|
||||||
|
|
||||||
@dispatcher.fire(:before_agent)
|
@dispatcher.fire(:before_agent)
|
||||||
@agent = create_agent(@settings, @source_loader)
|
@agent = create_agent(@settings, @source_loader)
|
||||||
@dispatcher.fire(:after_agent)
|
@dispatcher.fire(:after_agent)
|
||||||
|
@ -357,8 +359,6 @@ class LogStash::Runner < Clamp::StrictCommand
|
||||||
sigint_id = trap_sigint()
|
sigint_id = trap_sigint()
|
||||||
sigterm_id = trap_sigterm()
|
sigterm_id = trap_sigterm()
|
||||||
|
|
||||||
logger.info("Starting Logstash", "logstash.version" => LOGSTASH_VERSION)
|
|
||||||
|
|
||||||
@agent_task = Stud::Task.new { @agent.execute }
|
@agent_task = Stud::Task.new { @agent.execute }
|
||||||
|
|
||||||
# no point in enabling config reloading before the agent starts
|
# no point in enabling config reloading before the agent starts
|
||||||
|
|
|
@ -142,7 +142,13 @@ module LogStash
|
||||||
end
|
end
|
||||||
|
|
||||||
def populate_license_state(xpack_info)
|
def populate_license_state(xpack_info)
|
||||||
if !xpack_info.installed?
|
if xpack_info.failed?
|
||||||
|
{
|
||||||
|
:state => :error,
|
||||||
|
:log_level => :error,
|
||||||
|
:log_message => "Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster."
|
||||||
|
}
|
||||||
|
elsif !xpack_info.installed?
|
||||||
{
|
{
|
||||||
:state => :error,
|
:state => :error,
|
||||||
:log_level => :error,
|
:log_level => :error,
|
||||||
|
@ -193,4 +199,4 @@ module LogStash
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -39,9 +39,6 @@ module LogStash
|
||||||
def fetch_xpack_info
|
def fetch_xpack_info
|
||||||
xpack_info = @license_reader.fetch_xpack_info
|
xpack_info = @license_reader.fetch_xpack_info
|
||||||
|
|
||||||
# TODO: we should be more lenient when we're having issues
|
|
||||||
xpack_info ||= XPackInfo.xpack_not_installed
|
|
||||||
|
|
||||||
update_xpack_info(xpack_info)
|
update_xpack_info(xpack_info)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ module LogStash
|
||||||
@namespace = "xpack.#{feature}"
|
@namespace = "xpack.#{feature}"
|
||||||
@settings = settings
|
@settings = settings
|
||||||
@es_options = options
|
@es_options = options
|
||||||
|
@es_options.merge!("resurrect_delay" => 30)
|
||||||
end
|
end
|
||||||
|
|
||||||
##
|
##
|
||||||
|
@ -37,8 +38,12 @@ module LogStash
|
||||||
XPackInfo.xpack_not_installed
|
XPackInfo.xpack_not_installed
|
||||||
end
|
end
|
||||||
rescue => e
|
rescue => e
|
||||||
logger.error('Unable to retrieve license information from license server', :message => e.message, :class => e.class.name, :backtrace => e.backtrace)
|
if logger.debug?
|
||||||
nil
|
logger.error('Unable to retrieve license information from license server', :message => e.message, :class => e.class.name, :backtrace => e.backtrace)
|
||||||
|
else
|
||||||
|
logger.error('Unable to retrieve license information from license server', :message => e.message)
|
||||||
|
end
|
||||||
|
XPackInfo.failed_to_fetch
|
||||||
end
|
end
|
||||||
|
|
||||||
##
|
##
|
||||||
|
|
|
@ -14,10 +14,11 @@ module LogStash
|
||||||
|
|
||||||
LICENSE_TYPES = :trial, :basic, :standard, :gold, :platinum
|
LICENSE_TYPES = :trial, :basic, :standard, :gold, :platinum
|
||||||
|
|
||||||
def initialize(license, features = nil, installed=true)
|
def initialize(license, features = nil, installed=true, failed = false)
|
||||||
@license = license
|
@license = license
|
||||||
@installed = installed
|
@installed = installed
|
||||||
@features = features
|
@features = features
|
||||||
|
@failed = failed
|
||||||
|
|
||||||
freeze
|
freeze
|
||||||
end
|
end
|
||||||
|
@ -31,6 +32,10 @@ module LogStash
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def failed?
|
||||||
|
@failed
|
||||||
|
end
|
||||||
|
|
||||||
def installed?
|
def installed?
|
||||||
@installed
|
@installed
|
||||||
end
|
end
|
||||||
|
@ -86,7 +91,11 @@ module LogStash
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.xpack_not_installed
|
def self.xpack_not_installed
|
||||||
XPackInfo.new(nil, nil,false)
|
XPackInfo.new(nil, nil, false)
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.failed_to_fetch
|
||||||
|
XPackInfo.new(nil, nil, false, true)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -37,7 +37,13 @@ module LogStash
|
||||||
end
|
end
|
||||||
|
|
||||||
def populate_license_state(xpack_info)
|
def populate_license_state(xpack_info)
|
||||||
if !xpack_info.installed?
|
if xpack_info.failed?
|
||||||
|
{
|
||||||
|
:state => :error,
|
||||||
|
:log_level => :error,
|
||||||
|
:log_message => "Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster."
|
||||||
|
}
|
||||||
|
elsif !xpack_info.installed?
|
||||||
{
|
{
|
||||||
:state => :error,
|
:state => :error,
|
||||||
:log_level => :error,
|
:log_level => :error,
|
||||||
|
|
|
@ -5,7 +5,6 @@
|
||||||
require "logstash/event"
|
require "logstash/event"
|
||||||
require "logstash/inputs/base"
|
require "logstash/inputs/base"
|
||||||
require "logstash/instrument/collector"
|
require "logstash/instrument/collector"
|
||||||
require 'license_checker/licensed'
|
|
||||||
require 'helpers/elasticsearch_options'
|
require 'helpers/elasticsearch_options'
|
||||||
require "concurrent"
|
require "concurrent"
|
||||||
require "thread"
|
require "thread"
|
||||||
|
@ -17,17 +16,12 @@ module LogStash module Inputs
|
||||||
# This input further transform it into a `Logstash::Event`, which can be consumed by the shipper and
|
# This input further transform it into a `Logstash::Event`, which can be consumed by the shipper and
|
||||||
# shipped to Elasticsearch
|
# shipped to Elasticsearch
|
||||||
class Metrics < LogStash::Inputs::Base
|
class Metrics < LogStash::Inputs::Base
|
||||||
include LogStash::LicenseChecker::Licensed, LogStash::Helpers::ElasticsearchOptions
|
|
||||||
|
|
||||||
require "monitoring/inputs/metrics/state_event_factory"
|
require "monitoring/inputs/metrics/state_event_factory"
|
||||||
require "monitoring/inputs/metrics/stats_event_factory"
|
require "monitoring/inputs/metrics/stats_event_factory"
|
||||||
|
|
||||||
@pipelines_mutex = Mutex.new
|
@pipelines_mutex = Mutex.new
|
||||||
@pipelines = {}
|
@pipelines = {}
|
||||||
|
|
||||||
VALID_LICENSES = %w(basic trial standard gold platinum)
|
|
||||||
FEATURE = 'monitoring'
|
|
||||||
|
|
||||||
require "monitoring/inputs/timer_task_logger"
|
require "monitoring/inputs/timer_task_logger"
|
||||||
|
|
||||||
attr_reader :queue, :agent
|
attr_reader :queue, :agent
|
||||||
|
@ -52,16 +46,12 @@ module LogStash module Inputs
|
||||||
@agent = nil
|
@agent = nil
|
||||||
@settings = LogStash::SETTINGS.clone
|
@settings = LogStash::SETTINGS.clone
|
||||||
@last_updated_pipeline_hashes = []
|
@last_updated_pipeline_hashes = []
|
||||||
@es_options = es_options_from_settings_or_modules(FEATURE, @settings)
|
@agent = execution_context.agent if execution_context
|
||||||
setup_license_checker(FEATURE)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def pipeline_started(agent, pipeline)
|
def pipeline_started(agent, pipeline)
|
||||||
@agent = agent
|
@agent = agent
|
||||||
|
update_pipeline_state(pipeline)
|
||||||
with_license_check do
|
|
||||||
update_pipeline_state(pipeline)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def configure_snapshot_poller
|
def configure_snapshot_poller
|
||||||
|
@ -104,10 +94,8 @@ module LogStash module Inputs
|
||||||
end
|
end
|
||||||
|
|
||||||
def update(snapshot)
|
def update(snapshot)
|
||||||
with_license_check do
|
update_stats(snapshot)
|
||||||
update_stats(snapshot)
|
update_states
|
||||||
update_states
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def update_stats(snapshot)
|
def update_stats(snapshot)
|
||||||
|
@ -166,40 +154,6 @@ module LogStash module Inputs
|
||||||
queue << event
|
queue << event
|
||||||
end
|
end
|
||||||
|
|
||||||
def populate_license_state(xpack_info)
|
|
||||||
if !xpack_info.installed?
|
|
||||||
{
|
|
||||||
:state => :error,
|
|
||||||
:log_level => :error,
|
|
||||||
:log_message => "X-Pack is installed on Logstash but not on Elasticsearch. Please install X-Pack on Elasticsearch to use the monitoring feature. Other features may be available."
|
|
||||||
}
|
|
||||||
elsif !xpack_info.license_available?
|
|
||||||
{
|
|
||||||
:state => :error,
|
|
||||||
:log_level => :error,
|
|
||||||
:log_message => 'Monitoring is not available: License information is currently unavailable. Please make sure you have added your production elasticsearch connection info in the xpack.monitoring.elasticsearch settings.'
|
|
||||||
}
|
|
||||||
elsif !xpack_info.license_one_of?(VALID_LICENSES)
|
|
||||||
{
|
|
||||||
:state => :error,
|
|
||||||
:log_level => :error,
|
|
||||||
:log_message => "Monitoring is not available: #{xpack_info.license_type} is not a valid license for this feature."
|
|
||||||
}
|
|
||||||
elsif !xpack_info.license_active?
|
|
||||||
{
|
|
||||||
:state => :ok,
|
|
||||||
:log_level => :warn,
|
|
||||||
:log_message => 'Monitoring requires a valid license. You can continue to monitor Logstash, but please contact your administrator to update your license'
|
|
||||||
}
|
|
||||||
else
|
|
||||||
unless xpack_info.feature_enabled?(FEATURE)
|
|
||||||
logger.warn('Monitoring installed and enabled in Logstash, but not enabled in Elasticsearch')
|
|
||||||
end
|
|
||||||
|
|
||||||
{ :state => :ok, :log_level => :info, :log_message => 'Monitoring License OK' }
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
private
|
||||||
def remove_reserved_fields(event)
|
def remove_reserved_fields(event)
|
||||||
event.remove("@timestamp")
|
event.remove("@timestamp")
|
||||||
|
|
|
@ -3,20 +3,91 @@
|
||||||
# you may not use this file except in compliance with the Elastic License.
|
# you may not use this file except in compliance with the Elastic License.
|
||||||
|
|
||||||
require "logstash/config/source/base"
|
require "logstash/config/source/base"
|
||||||
|
require 'license_checker/licensed'
|
||||||
|
require 'helpers/elasticsearch_options'
|
||||||
|
|
||||||
module LogStash module Monitoring
|
module LogStash module Monitoring
|
||||||
class InternalPipelineSource < LogStash::Config::Source::Base
|
class InternalPipelineSource < LogStash::Config::Source::Base
|
||||||
def initialize(pipeline_config)
|
include LogStash::LicenseChecker::Licensed
|
||||||
|
include LogStash::Helpers::ElasticsearchOptions
|
||||||
|
include LogStash::Util::Loggable
|
||||||
|
VALID_LICENSES = %w(basic trial standard gold platinum)
|
||||||
|
FEATURE = 'monitoring'
|
||||||
|
|
||||||
|
def initialize(pipeline_config, agent)
|
||||||
super(pipeline_config.settings)
|
super(pipeline_config.settings)
|
||||||
@pipeline_config = pipeline_config
|
@pipeline_config = pipeline_config
|
||||||
|
@settings = LogStash::SETTINGS.clone
|
||||||
|
@agent = agent
|
||||||
|
@es_options = es_options_from_settings_or_modules(FEATURE, @settings)
|
||||||
|
setup_license_checker(FEATURE)
|
||||||
end
|
end
|
||||||
|
|
||||||
def pipeline_configs
|
def pipeline_configs
|
||||||
return @pipeline_config
|
@pipeline_config
|
||||||
end
|
end
|
||||||
|
|
||||||
def match?
|
def match?
|
||||||
true
|
valid_basic_license?
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def update_license_state(xpack_info)
|
||||||
|
return if valid_basic_license?
|
||||||
|
super(xpack_info) if xpack_info
|
||||||
|
if valid_basic_license?
|
||||||
|
logger.info("Validated license for monitoring. Enabling monitoring pipeline.")
|
||||||
|
enable_monitoring()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
def valid_basic_license?
|
||||||
|
@license_state ? license_check : false
|
||||||
|
end
|
||||||
|
|
||||||
|
def enable_monitoring
|
||||||
|
@agent.converge_state_and_update
|
||||||
|
end
|
||||||
|
|
||||||
|
def populate_license_state(xpack_info)
|
||||||
|
if xpack_info.failed?
|
||||||
|
{
|
||||||
|
:state => :error,
|
||||||
|
:log_level => :error,
|
||||||
|
:log_message => "Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster."
|
||||||
|
}
|
||||||
|
elsif !xpack_info.installed?
|
||||||
|
{
|
||||||
|
:state => :error,
|
||||||
|
:log_level => :error,
|
||||||
|
:log_message => "X-Pack is installed on Logstash but not on Elasticsearch. Please install X-Pack on Elasticsearch to use the monitoring feature. Other features may be available."
|
||||||
|
}
|
||||||
|
elsif !xpack_info.license_available?
|
||||||
|
{
|
||||||
|
:state => :error,
|
||||||
|
:log_level => :error,
|
||||||
|
:log_message => 'Monitoring is not available: License information is currently unavailable. Please make sure you have added your production elasticsearch connection info in the xpack.monitoring.elasticsearch settings.'
|
||||||
|
}
|
||||||
|
elsif !xpack_info.license_one_of?(VALID_LICENSES)
|
||||||
|
{
|
||||||
|
:state => :error,
|
||||||
|
:log_level => :error,
|
||||||
|
:log_message => "Monitoring is not available: #{xpack_info.license_type} is not a valid license for this feature."
|
||||||
|
}
|
||||||
|
elsif !xpack_info.license_active?
|
||||||
|
{
|
||||||
|
:state => :ok,
|
||||||
|
:log_level => :warn,
|
||||||
|
:log_message => 'Monitoring requires a valid license. You can continue to monitor Logstash, but please contact your administrator to update your license'
|
||||||
|
}
|
||||||
|
else
|
||||||
|
unless xpack_info.feature_enabled?(FEATURE)
|
||||||
|
logger.warn('Monitoring installed and enabled in Logstash, but not enabled in Elasticsearch')
|
||||||
|
end
|
||||||
|
|
||||||
|
{ :state => :ok, :log_level => :info, :log_message => 'Monitoring License OK' }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end end
|
end end
|
||||||
|
|
|
@ -95,7 +95,7 @@ module LogStash
|
||||||
|
|
||||||
logger.trace("registering the metrics pipeline")
|
logger.trace("registering the metrics pipeline")
|
||||||
LogStash::SETTINGS.set("node.uuid", runner.agent.id)
|
LogStash::SETTINGS.set("node.uuid", runner.agent.id)
|
||||||
internal_pipeline_source = LogStash::Monitoring::InternalPipelineSource.new(setup_metrics_pipeline)
|
internal_pipeline_source = LogStash::Monitoring::InternalPipelineSource.new(setup_metrics_pipeline, runner.agent)
|
||||||
runner.source_loader.add_source(internal_pipeline_source)
|
runner.source_loader.add_source(internal_pipeline_source)
|
||||||
rescue => e
|
rescue => e
|
||||||
logger.error("Failed to set up the metrics pipeline", :message => e.message, :backtrace => e.backtrace)
|
logger.error("Failed to set up the metrics pipeline", :message => e.message, :backtrace => e.backtrace)
|
||||||
|
|
|
@ -64,8 +64,8 @@ describe LogStash::LicenseChecker::LicenseReader do
|
||||||
before(:each) do
|
before(:each) do
|
||||||
expect(mock_client).to receive(:get).with('_xpack').and_raise(Puma::ConnectionError)
|
expect(mock_client).to receive(:get).with('_xpack').and_raise(Puma::ConnectionError)
|
||||||
end
|
end
|
||||||
it 'returns nil' do
|
it 'returns failed to fetch' do
|
||||||
expect(subject.fetch_xpack_info).to be_nil
|
expect(subject.fetch_xpack_info.failed?).to be_truthy
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
context 'when client raises a 5XX' do
|
context 'when client raises a 5XX' do
|
||||||
|
@ -74,7 +74,7 @@ describe LogStash::LicenseChecker::LicenseReader do
|
||||||
expect(mock_client).to receive(:get).with('_xpack').and_raise(exception_500)
|
expect(mock_client).to receive(:get).with('_xpack').and_raise(exception_500)
|
||||||
end
|
end
|
||||||
it 'returns nil' do
|
it 'returns nil' do
|
||||||
expect(subject.fetch_xpack_info).to be_nil
|
expect(subject.fetch_xpack_info.failed?).to be_truthy
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
context 'when client raises a 404' do
|
context 'when client raises a 404' do
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
require "modules/module_license_checker"
|
require "modules/module_license_checker"
|
||||||
require "logstash/modules/settings_merger"
|
require "logstash/modules/settings_merger"
|
||||||
|
require 'license_checker/x_pack_info'
|
||||||
|
|
||||||
describe LogStash::LicenseChecker::ModuleLicenseChecker do
|
describe LogStash::LicenseChecker::ModuleLicenseChecker do
|
||||||
|
|
||||||
|
@ -15,7 +16,8 @@ describe LogStash::LicenseChecker::ModuleLicenseChecker do
|
||||||
|
|
||||||
before(:each) {
|
before(:each) {
|
||||||
expect(subject).to receive(:license_reader).and_return(mock_reader)
|
expect(subject).to receive(:license_reader).and_return(mock_reader)
|
||||||
expect(mock_reader).to receive(:fetch_xpack_info).and_return(nil)
|
expect(mock_reader).to receive(:fetch_xpack_info).and_return(LogStash::LicenseChecker::XPackInfo.failed_to_fetch)
|
||||||
|
|
||||||
}
|
}
|
||||||
let(:mock_reader) {double("reader")}
|
let(:mock_reader) {double("reader")}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,6 @@ require "rspec/wait"
|
||||||
require 'spec_helper'
|
require 'spec_helper'
|
||||||
require "json"
|
require "json"
|
||||||
require "json-schema"
|
require "json-schema"
|
||||||
require 'license_checker/x_pack_info'
|
|
||||||
require 'monitoring/monitoring'
|
require 'monitoring/monitoring'
|
||||||
|
|
||||||
describe LogStash::Inputs::Metrics do
|
describe LogStash::Inputs::Metrics do
|
||||||
|
@ -121,15 +120,8 @@ describe LogStash::Inputs::Metrics do
|
||||||
agent.shutdown
|
agent.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:license_state_ok) do
|
|
||||||
{:state => :ok, :log_level => :info, :log_message => 'Monitoring License OK'}
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'after the pipeline is setup' do
|
context 'after the pipeline is setup' do
|
||||||
before do
|
before do
|
||||||
allow(subject).to receive(:setup_license_checker)
|
|
||||||
allow(subject).to receive(:es_options_from_settings_or_modules).and_return(es_options)
|
|
||||||
allow(subject).to receive(:get_current_license_state).and_return(license_state_ok)
|
|
||||||
allow(subject).to receive(:exec_timer_task)
|
allow(subject).to receive(:exec_timer_task)
|
||||||
allow(subject).to receive(:sleep_till_stop)
|
allow(subject).to receive(:sleep_till_stop)
|
||||||
setup_pipeline
|
setup_pipeline
|
||||||
|
@ -142,9 +134,6 @@ describe LogStash::Inputs::Metrics do
|
||||||
describe "#update" do
|
describe "#update" do
|
||||||
before :each do
|
before :each do
|
||||||
allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
|
allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
|
||||||
allow(subject).to receive(:setup_license_checker)
|
|
||||||
allow(subject).to receive(:es_options_from_settings_or_modules).and_return(es_options)
|
|
||||||
allow(subject).to receive(:get_current_license_state).and_return(license_state_ok)
|
|
||||||
allow(subject).to receive(:exec_timer_task)
|
allow(subject).to receive(:exec_timer_task)
|
||||||
allow(subject).to receive(:sleep_till_stop)
|
allow(subject).to receive(:sleep_till_stop)
|
||||||
setup_pipeline
|
setup_pipeline
|
||||||
|
@ -175,151 +164,6 @@ describe LogStash::Inputs::Metrics do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'license testing' do
|
|
||||||
let(:elasticsearch_url) { ["https://localhost:9898"] }
|
|
||||||
let(:elasticsearch_username) { "elastictest" }
|
|
||||||
let(:elasticsearch_password) { "testchangeme" }
|
|
||||||
let(:mock_license_client) { double("es_client")}
|
|
||||||
let(:license_subject) { subject { described_class.new(options) }}
|
|
||||||
let(:license_reader) { LogStash::LicenseChecker::LicenseReader.new(system_settings, 'monitoring', es_options)}
|
|
||||||
let(:extension) { LogStash::MonitoringExtension.new }
|
|
||||||
let(:system_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone }
|
|
||||||
let(:license_status) { 'active'}
|
|
||||||
let(:license_type) { 'trial' }
|
|
||||||
let(:license_expiry_date) { Time.now + (60 * 60 * 24)}
|
|
||||||
let(:license_expiry_in_millis) { license_expiry_date.to_i * 1000 }
|
|
||||||
|
|
||||||
let(:xpack_response) {
|
|
||||||
LogStash::Json.load("{
|
|
||||||
\"license\": {
|
|
||||||
\"status\": \"#{license_status}\",
|
|
||||||
\"uid\": \"9a48c67c-ce2c-4169-97bf-37d324b8ab80\",
|
|
||||||
\"type\": \"#{license_type}\",
|
|
||||||
\"expiry_date_in_millis\": #{license_expiry_in_millis}
|
|
||||||
}
|
|
||||||
}")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
let(:no_xpack_response) {
|
|
||||||
LogStash::Json.load("{
|
|
||||||
\"error\": {
|
|
||||||
\"root_cause\": [
|
|
||||||
{
|
|
||||||
\"type\": \"index_not_found_exception\",
|
|
||||||
\"reason\": \"no such index\",
|
|
||||||
\"resource.type\": \"index_or_alias\",
|
|
||||||
\"resource.id\": \"_xpack\",
|
|
||||||
\"index_uuid\": \"_na_\",
|
|
||||||
\"index\": \"_xpack\"
|
|
||||||
}],
|
|
||||||
\"type\": \"index_not_found_exception\",
|
|
||||||
\"reason\": \"no such index\",
|
|
||||||
\"resource.type\": \"index_or_alias\",
|
|
||||||
\"resource.id\": \"_xpack\",
|
|
||||||
\"index_uuid\": \"_na_\",
|
|
||||||
\"index\": \"_xpack\"
|
|
||||||
},
|
|
||||||
\"status\": 404
|
|
||||||
}")
|
|
||||||
}
|
|
||||||
|
|
||||||
let(:no_xpack_response_5_6) {
|
|
||||||
LogStash::Json.load("{
|
|
||||||
\"error\": {
|
|
||||||
\"root_cause\":
|
|
||||||
[{
|
|
||||||
\"type\":\"illegal_argument_exception\",
|
|
||||||
\"reason\": \"No endpoint or operation is available at [_xpack]\"
|
|
||||||
}],
|
|
||||||
\"type\":\"illegal_argument_exception\",
|
|
||||||
\"reason\": \"No endpoint or operation is available at [_xpack]\"
|
|
||||||
},
|
|
||||||
\"status\": 400
|
|
||||||
}")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
let(:settings) do
|
|
||||||
{
|
|
||||||
"xpack.monitoring.enabled" => true,
|
|
||||||
"xpack.monitoring.elasticsearch.url" => elasticsearch_url,
|
|
||||||
"xpack.monitoring.elasticsearch.username" => elasticsearch_username,
|
|
||||||
"xpack.monitoring.elasticsearch.password" => elasticsearch_password,
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
before :each do
|
|
||||||
extension.additionals_settings(system_settings)
|
|
||||||
apply_settings(settings, system_settings)
|
|
||||||
allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
|
|
||||||
allow(subject).to receive(:es_options_from_settings_or_modules).and_return(es_options)
|
|
||||||
allow(subject).to receive(:exec_timer_task)
|
|
||||||
allow(subject).to receive(:sleep_till_stop)
|
|
||||||
|
|
||||||
allow(subject).to receive(:license_reader).and_return(license_reader)
|
|
||||||
allow(license_reader).to receive(:build_client).and_return(mock_license_client)
|
|
||||||
end
|
|
||||||
|
|
||||||
describe 'with licensing' do
|
|
||||||
context 'when xpack has not been installed on es 6' do
|
|
||||||
before :each do
|
|
||||||
expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response)
|
|
||||||
setup_pipeline
|
|
||||||
subject.update(collector.snapshot_metric)
|
|
||||||
end
|
|
||||||
|
|
||||||
it_behaves_like 'events are not added to the queue'
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when xpack has not been installed on 5.6' do
|
|
||||||
before :each do
|
|
||||||
expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response_5_6)
|
|
||||||
setup_pipeline
|
|
||||||
subject.update(collector.snapshot_metric)
|
|
||||||
end
|
|
||||||
|
|
||||||
it_behaves_like 'events are not added to the queue'
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when the license has expired' do
|
|
||||||
let(:license_status) { 'expired'}
|
|
||||||
let(:license_expiry_date) { Time.now - (60 * 60 * 24)}
|
|
||||||
|
|
||||||
before :each do
|
|
||||||
expect(mock_license_client).to receive(:get).with('_xpack').and_return(xpack_response)
|
|
||||||
setup_pipeline
|
|
||||||
subject.update(collector.snapshot_metric)
|
|
||||||
end
|
|
||||||
|
|
||||||
it_behaves_like 'events are added to the queue'
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when the license server is not available' do
|
|
||||||
let(:mock_license_client) { double('license_client')}
|
|
||||||
before :each do
|
|
||||||
expect(mock_license_client).to receive(:get).and_raise("An error is here")
|
|
||||||
setup_pipeline
|
|
||||||
subject.update(collector.snapshot_metric)
|
|
||||||
end
|
|
||||||
|
|
||||||
it_behaves_like 'events are not added to the queue'
|
|
||||||
end
|
|
||||||
|
|
||||||
%w(basic standard trial standard gold platinum).sample(1).each do |license_type|
|
|
||||||
context "With a valid #{license_type} license" do
|
|
||||||
let(:license_type) { license_type }
|
|
||||||
before :each do
|
|
||||||
expect(mock_license_client).to receive(:get).with('_xpack').and_return(xpack_response)
|
|
||||||
setup_pipeline
|
|
||||||
subject.update(collector.snapshot_metric)
|
|
||||||
end
|
|
||||||
it_behaves_like 'events are added to the queue'
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
context "unit tests" do
|
context "unit tests" do
|
||||||
|
@ -335,7 +179,6 @@ describe LogStash::Inputs::Metrics do
|
||||||
|
|
||||||
describe "system pipelines" do
|
describe "system pipelines" do
|
||||||
before(:each) do
|
before(:each) do
|
||||||
allow(subject).to receive(:valid_license?).and_return(true)
|
|
||||||
allow(pipeline).to receive(:system?).and_return(true)
|
allow(pipeline).to receive(:system?).and_return(true)
|
||||||
allow(subject).to receive(:emit_event)
|
allow(subject).to receive(:emit_event)
|
||||||
subject.update_pipeline_state(pipeline)
|
subject.update_pipeline_state(pipeline)
|
||||||
|
@ -348,7 +191,6 @@ describe LogStash::Inputs::Metrics do
|
||||||
|
|
||||||
describe "normal pipelines" do
|
describe "normal pipelines" do
|
||||||
before(:each) do
|
before(:each) do
|
||||||
allow(subject).to receive(:valid_license?).and_return(true)
|
|
||||||
allow(pipeline).to receive(:system?).and_return(false)
|
allow(pipeline).to receive(:system?).and_return(false)
|
||||||
allow(subject).to receive(:state_event_for).with(pipeline).and_return(state_event)
|
allow(subject).to receive(:state_event_for).with(pipeline).and_return(state_event)
|
||||||
allow(subject).to receive(:emit_event)
|
allow(subject).to receive(:emit_event)
|
||||||
|
|
122
x-pack/spec/monitoring/internal_pipeline_source_spec.rb
Normal file
122
x-pack/spec/monitoring/internal_pipeline_source_spec.rb
Normal file
|
@ -0,0 +1,122 @@
|
||||||
|
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
# or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
# you may not use this file except in compliance with the Elastic License.
|
||||||
|
|
||||||
|
require "logstash-core"
|
||||||
|
require "logstash/agent"
|
||||||
|
require "logstash/agent"
|
||||||
|
require "monitoring/inputs/metrics"
|
||||||
|
require "logstash/config/pipeline_config"
|
||||||
|
require "logstash/config/source/local"
|
||||||
|
require 'license_checker/x_pack_info'
|
||||||
|
require "rspec/wait"
|
||||||
|
require 'spec_helper'
|
||||||
|
require "json"
|
||||||
|
require "json-schema"
|
||||||
|
require 'license_checker/x_pack_info'
|
||||||
|
require 'monitoring/monitoring'
|
||||||
|
|
||||||
|
|
||||||
|
describe LogStash::Monitoring::InternalPipelineSource do
|
||||||
|
context 'license testing' do
|
||||||
|
let(:xpack_monitoring_interval) { 1 }
|
||||||
|
let(:options) { { "collection_interval" => xpack_monitoring_interval,
|
||||||
|
"collection_timeout_interval" => 600 } }
|
||||||
|
|
||||||
|
subject { described_class.new(pipeline_config, mock_agent) }
|
||||||
|
let(:mock_agent) { double("agent")}
|
||||||
|
let(:mock_license_client) { double("es_client")}
|
||||||
|
let(:license_reader) { LogStash::LicenseChecker::LicenseReader.new(system_settings, 'monitoring', es_options)}
|
||||||
|
let(:system_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone }
|
||||||
|
let(:license_status) { 'active'}
|
||||||
|
let(:license_type) { 'trial' }
|
||||||
|
let(:license_expiry_date) { Time.now + (60 * 60 * 24)}
|
||||||
|
let(:source) { LogStash::Config::Source::Local }
|
||||||
|
let(:pipeline_id) { :main }
|
||||||
|
let(:ordered_config_parts) do
|
||||||
|
[
|
||||||
|
org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, "input { generator1 }"),
|
||||||
|
org.logstash.common.SourceWithMetadata.new("file", "/tmp/2", 0, 0, "input { generator2 }"),
|
||||||
|
org.logstash.common.SourceWithMetadata.new("file", "/tmp/3", 0, 0, "input { generator3 }"),
|
||||||
|
org.logstash.common.SourceWithMetadata.new("file", "/tmp/4", 0, 0, "input { generator4 }"),
|
||||||
|
org.logstash.common.SourceWithMetadata.new("file", "/tmp/5", 0, 0, "input { generator5 }"),
|
||||||
|
org.logstash.common.SourceWithMetadata.new("file", "/tmp/6", 0, 0, "input { generator6 }"),
|
||||||
|
org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator1 }"),
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:unordered_config_parts) { ordered_config_parts.shuffle }
|
||||||
|
|
||||||
|
let(:pipeline_config) { LogStash::Config::PipelineConfig.new(source, pipeline_id, unordered_config_parts, system_settings) }
|
||||||
|
|
||||||
|
let(:es_options) do
|
||||||
|
{
|
||||||
|
'url' => elasticsearch_url,
|
||||||
|
'user' => elasticsearch_username,
|
||||||
|
'password' => elasticsearch_password
|
||||||
|
}
|
||||||
|
end
|
||||||
|
let(:elasticsearch_url) { ["https://localhost:9898"] }
|
||||||
|
let(:elasticsearch_username) { "elastictest" }
|
||||||
|
let(:elasticsearch_password) { "testchangeme" }
|
||||||
|
|
||||||
|
let(:settings) do
|
||||||
|
{
|
||||||
|
"xpack.monitoring.enabled" => true,
|
||||||
|
"xpack.monitoring.elasticsearch.url" => elasticsearch_url,
|
||||||
|
"xpack.monitoring.elasticsearch.username" => elasticsearch_username,
|
||||||
|
"xpack.monitoring.elasticsearch.password" => elasticsearch_password,
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
before :each do
|
||||||
|
allow(subject).to receive(:es_options_from_settings_or_modules).and_return(es_options)
|
||||||
|
allow(subject).to receive(:license_reader).and_return(license_reader)
|
||||||
|
allow(license_reader).to receive(:build_client).and_return(mock_license_client)
|
||||||
|
end
|
||||||
|
|
||||||
|
describe 'with licensing' do
|
||||||
|
context 'when xpack has not been installed on es 6' do
|
||||||
|
let(:xpack_info) { LogStash::LicenseChecker::XPackInfo.xpack_not_installed }
|
||||||
|
it "does not start the pipeline" do
|
||||||
|
expect(subject).to_not receive(:enable_monitoring)
|
||||||
|
subject.update_license_state(xpack_info)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
context 'when the license has expired' do
|
||||||
|
let(:license) do
|
||||||
|
{ "status" => "inactive", "type" => license_type }
|
||||||
|
end
|
||||||
|
let(:xpack_info) { LogStash::LicenseChecker::XPackInfo.new(license, nil) }
|
||||||
|
it "still starts the pipeline" do
|
||||||
|
expect(subject).to receive(:enable_monitoring)
|
||||||
|
subject.update_license_state(xpack_info)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
context 'when the license server is not available' do
|
||||||
|
let(:xpack_info) { LogStash::LicenseChecker::XPackInfo.new(nil, nil, nil, true) }
|
||||||
|
it "does not start the pipeline" do
|
||||||
|
expect(subject).to_not receive(:enable_monitoring)
|
||||||
|
subject.update_license_state(xpack_info)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
%w(basic standard trial gold platinum).each do |license_type|
|
||||||
|
context "With a valid #{license_type} license" do
|
||||||
|
let(:license_type) { license_type }
|
||||||
|
let(:license) do
|
||||||
|
{ "status" => "active", "type" => license_type }
|
||||||
|
end
|
||||||
|
let(:features) do
|
||||||
|
{ "monitoring" => { "enabled" => true } }
|
||||||
|
end
|
||||||
|
let(:xpack_info) { LogStash::LicenseChecker::XPackInfo.new(license, features) }
|
||||||
|
it "starts the pipeline" do
|
||||||
|
expect(subject).to receive(:enable_monitoring)
|
||||||
|
subject.update_license_state(xpack_info)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Add table
Add a link
Reference in a new issue