diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index af33ab86f..e7ca3eaad 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -3,12 +3,16 @@ require "clamp" # gem 'clamp' require "logstash/environment" require "logstash/errors" require "logstash/config/cpu_core_strategy" +require "stud/trap" +require "logstash/config/loader" require "uri" require "net/http" require "logstash/pipeline" -LogStash::Environment.load_locale! class LogStash::Agent < Clamp::Command + + attr_reader :pipelines + DEFAULT_INPUT = "input { stdin { type => stdin } }" DEFAULT_OUTPUT = "output { stdout { codec => rubydebug } }" @@ -22,18 +26,18 @@ class LogStash::Agent < Clamp::Command :default => "", :attribute_name => :config_string option ["-w", "--pipeline-workers"], "COUNT", - I18n.t("logstash.runner.flag.pipeline-workers"), + I18n.t("logstash.agent.flag.pipeline-workers"), :attribute_name => :pipeline_workers, :default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] option ["-b", "--pipeline-batch-size"], "SIZE", - I18n.t("logstash.runner.flag.pipeline-batch-size"), + I18n.t("logstash.agent.flag.pipeline-batch-size"), :attribute_name => :pipeline_batch_size, :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size] option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS", - I18n.t("logstash.runner.flag.pipeline-batch-delay"), + I18n.t("logstash.agent.flag.pipeline-batch-delay"), :attribute_name => :pipeline_batch_delay, :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay] @@ -71,9 +75,25 @@ class LogStash::Agent < Clamp::Command :attribute_name => :unsafe_shutdown, :default => false - def initialize(*args) - super(*args) - @pipeline_settings ||= { :pipeline_id => "base" } + option ["-r", "--[no-]auto-reload"], :flag, + I18n.t("logstash.agent.flag.auto_reload"), + :attribute_name => :auto_reload, :default => false + + option ["--reload-interval"], "RELOAD_INTERVAL", + I18n.t("logstash.agent.flag.reload_interval"), + :attribute_name => :reload_interval, :default => 3, &:to_i + + option ["-n", "--node-name"], "NAME", + I18n.t("logstash.runner.flag.node_name"), + :attribute_name => :node_name, :default => Socket.gethostname + + def initialize(*params) + super(*params) + @logger = Cabin::Channel.get(LogStash) + @pipelines = {} + @pipeline_settings ||= { :pipeline_id => "main" } + @upgrade_mutex = Mutex.new + @config_loader = LogStash::Config::Loader.new(@logger) end def pipeline_workers=(pipeline_workers_value) @@ -103,7 +123,6 @@ class LogStash::Agent < Clamp::Command raise LogStash::ConfigurationError, message end # def warn - # Emit a failure message and abort. def fail(message) raise LogStash::ConfigurationError, message end # def fail @@ -114,7 +133,6 @@ class LogStash::Agent < Clamp::Command require "logstash/pipeline" require "cabin" # gem 'cabin' require "logstash/plugin" - @logger = Cabin::Channel.get(LogStash) LogStash::ShutdownWatcher.unsafe_shutdown = unsafe_shutdown? LogStash::ShutdownWatcher.logger = @logger @@ -140,70 +158,63 @@ class LogStash::Agent < Clamp::Command end # You must specify a config_string or config_path - if @config_string.nil? && @config_path.nil? - fail(help + "\n" + I18n.t("logstash.agent.missing-configuration")) + if config_string.nil? && config_path.nil? + fail(I18n.t("logstash.agent.missing-configuration")) end - @config_string = @config_string.to_s - - if @config_path - # Append the config string. - # This allows users to provide both -f and -e flags. The combination - # is rare, but useful for debugging. - @config_string = @config_string + load_config(@config_path) - else - # include a default stdin input if no inputs given - if @config_string !~ /input *{/ - @config_string += DEFAULT_INPUT - end - # include a default stdout output if no outputs given - if @config_string !~ /output *{/ - @config_string += DEFAULT_OUTPUT - end + if auto_reload? && config_path.nil? + # there's nothing to reload + fail(I18n.t("logstash.agent.reload-without-config-path")) end - - begin - pipeline = LogStash::Pipeline.new(@config_string, @pipeline_settings) - rescue LoadError => e - fail("Configuration problem.") - end - - # Make SIGINT shutdown the pipeline. - sigint_id = Stud::trap("INT") do - - if @interrupted_once - @logger.fatal(I18n.t("logstash.agent.forced_sigint")) - exit - else - @logger.warn(I18n.t("logstash.agent.sigint")) - Thread.new(@logger) {|logger| sleep 5; logger.warn(I18n.t("logstash.agent.slow_shutdown")) } - @interrupted_once = true - shutdown(pipeline) - end - end - - # Make SIGTERM shutdown the pipeline. - sigterm_id = Stud::trap("TERM") do - @logger.warn(I18n.t("logstash.agent.sigterm")) - shutdown(pipeline) - end - - Stud::trap("HUP") do - @logger.info(I18n.t("logstash.agent.sighup")) - configure_logging(log_file) - end - - # Stop now if we are only asking for a config test. if config_test? - @logger.terminal "Configuration OK" - return + config_loader = LogStash::Config::Loader.new(@logger) + config_str = config_loader.format_config(config_path, config_string) + config_error = LogStash::Pipeline.config_valid?(config_str) + if config_error == true + @logger.terminal "Configuration OK" + return 0 + else + @logger.fatal I18n.t("logstash.error", :error => config_error) + return 1 + end end + register_pipeline("main", @pipeline_settings.merge({ + :config_string => config_string, + :config_path => config_path + })) + + sigint_id = trap_sigint() + sigterm_id = trap_sigterm() + sighup_id = trap_sighup() + @logger.unsubscribe(stdout_logs) if show_startup_errors - # TODO(sissel): Get pipeline completion status. - pipeline.run + @logger.info("starting agent") + + start_pipelines + + return 1 if clean_state? + + @thread = Thread.current # this var is implicilty used by Stud.stop? + + Stud.stoppable_sleep(reload_interval) # sleep before looping + + if auto_reload? + Stud.interval(reload_interval) { reload_state! } + else + while !Stud.stop? + if clean_state? || running_pipelines? + sleep 0.5 + else + break + end + end + end + + shutdown + return 0 rescue LogStash::ConfigurationError => e @logger.unsubscribe(stdout_logs) if show_startup_errors @@ -220,45 +231,9 @@ class LogStash::Agent < Clamp::Command @log_fd.close if @log_fd Stud::untrap("INT", sigint_id) unless sigint_id.nil? Stud::untrap("TERM", sigterm_id) unless sigterm_id.nil? + Stud::untrap("HUP", sighup_id) unless sighup_id.nil? end # def execute - def shutdown(pipeline) - pipeline.shutdown do - ::LogStash::ShutdownWatcher.start(pipeline) - end - end - - def show_version - show_version_logstash - - if [:info, :debug].include?(verbosity?) || debug? || verbose? - show_version_ruby - show_version_java if LogStash::Environment.jruby? - show_gems if [:debug].include?(verbosity?) || debug? - end - end # def show_version - - def show_version_logstash - require "logstash/version" - puts "logstash #{LOGSTASH_VERSION}" - end # def show_version_logstash - - def show_version_ruby - puts RUBY_DESCRIPTION - end # def show_version_ruby - - def show_version_java - properties = java.lang.System.getProperties - puts "java #{properties["java.version"]} (#{properties["java.vendor"]})" - puts "jvm #{properties["java.vm.name"]} / #{properties["java.vm.version"]}" - end # def show_version_java - - def show_gems - require "rubygems" - Gem::Specification.each do |spec| - puts "gem #{spec.name} #{spec.version}" - end - end # def show_gems # Do any start-time configuration. # @@ -324,63 +299,195 @@ class LogStash::Agent < Clamp::Command end end - def load_config(path) - begin - uri = URI.parse(path) - - case uri.scheme - when nil then - local_config(path) - when /http/ then - fetch_config(uri) - when "file" then - local_config(uri.path) + ## Signal Trapping ## + def trap_sigint + Stud::trap("INT") do + if @interrupted_once + @logger.fatal(I18n.t("logstash.agent.forced_sigint")) + exit else - fail(I18n.t("logstash.agent.configuration.scheme-not-supported", :path => path)) + @logger.warn(I18n.t("logstash.agent.sigint")) + Thread.new(@logger) {|logger| sleep 5; logger.warn(I18n.t("logstash.agent.slow_shutdown")) } + @interrupted_once = true + Stud.stop!(@thread) end - rescue URI::InvalidURIError - # fallback for windows. - # if the parsing of the file failed we assume we can reach it locally. - # some relative path on windows arent parsed correctly (.\logstash.conf) - local_config(path) end end - def local_config(path) - path = File.expand_path(path) - path = File.join(path, "*") if File.directory?(path) - - if Dir.glob(path).length == 0 - fail(I18n.t("logstash.agent.configuration.file-not-found", :path => path)) + def trap_sigterm + Stud::trap("TERM") do + @logger.warn(I18n.t("logstash.agent.sigterm")) + Stud.stop!(@thread) end + end - config = "" - encoding_issue_files = [] - Dir.glob(path).sort.each do |file| - next unless File.file?(file) - if file.match(/~$/) - @logger.debug("NOT reading config file because it is a temp file", :file => file) - next + def trap_sighup + Stud::trap("HUP") do + @logger.warn(I18n.t("logstash.agent.sighup")) + reload_state! + end + end + + ## Pipeline CRUD ## + def shutdown(pipeline) + pipeline.shutdown do + ::LogStash::ShutdownWatcher.start(pipeline) + end + end + # + # register_pipeline - adds a pipeline to the agent's state + # @param pipeline_id [String] pipeline string identifier + # @param settings [Hash] settings that will be passed when creating the pipeline. + # keys should be symbols such as :pipeline_workers and :pipeline_batch_delay + def register_pipeline(pipeline_id, settings) + pipeline = create_pipeline(settings.merge(:pipeline_id => pipeline_id)) + return unless pipeline.is_a?(LogStash::Pipeline) + @pipelines[pipeline_id] = pipeline + end + + def reload_state! + @upgrade_mutex.synchronize do + @pipelines.each do |pipeline_id, _| + begin + reload_pipeline!(pipeline_id) + rescue => e + @logger.error(I18n.t("oops"), :error => e, :backtrace => e.backtrace) + end end - @logger.debug("Reading config file", :file => file) - cfg = File.read(file) - if !cfg.ascii_only? && !cfg.valid_encoding? - encoding_issue_files << file - end - config << cfg + "\n" end - if (encoding_issue_files.any?) - fail("The following config files contains non-ascii characters but are not UTF-8 encoded #{encoding_issue_files}") - end - return config - end # def load_config + end - def fetch_config(uri) + def create_pipeline(settings) begin - Net::HTTP.get(uri) + "\n" - rescue Exception => e - fail(I18n.t("logstash.agent.configuration.fetch-failed", :path => uri.to_s, :message => e.message)) + config = fetch_config(settings) + rescue => e + @logger.error("failed to fetch pipeline configuration", :message => e.message) + return + end + + begin + LogStash::Pipeline.new(config, settings) + rescue => e + @logger.error("fetched an invalid config", :config => config, :reason => e.message) + return end end + def start_pipelines + @pipelines.each { |id, _| start_pipeline(id) } + end + + def shutdown + shutdown_pipelines + end + + def shutdown_pipelines + @pipelines.each { |id, _| stop_pipeline(id) } + end + + def stop_pipeline(id) + pipeline = @pipelines[id] + return unless pipeline + @logger.log("stopping pipeline", :id => id) + pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) } + @pipelines[id].thread.join + end + + def running_pipelines? + @upgrade_mutex.synchronize do + @pipelines.select {|pipeline_id, _| running_pipeline?(pipeline_id) }.any? + end + end + + def running_pipeline?(pipeline_id) + thread = @pipelines[pipeline_id].thread + thread.is_a?(Thread) && thread.alive? + end + + def upgrade_pipeline(pipeline_id, new_pipeline) + stop_pipeline(pipeline_id) + @pipelines[pipeline_id] = new_pipeline + start_pipeline(pipeline_id) + end + + def clean_state? + @pipelines.empty? + end + + # since this method modifies the @pipelines hash it is + # wrapped in @upgrade_mutex in the parent call `reload_state!` + def reload_pipeline!(id) + old_pipeline = @pipelines[id] + new_pipeline = create_pipeline(old_pipeline.original_settings) + return if new_pipeline.nil? + + if old_pipeline.config_str == new_pipeline.config_str + @logger.debug("no configuration change for pipeline", + :pipeline => id, :config => old_pipeline.config_str) + else + @logger.log("fetched new config for pipeline. upgrading..", + :pipeline => id, :config => new_pipeline.config_str) + upgrade_pipeline(id, new_pipeline) + end + end + + def start_pipeline(id) + pipeline = @pipelines[id] + return unless pipeline.is_a?(LogStash::Pipeline) + return if pipeline.ready? + @logger.info("starting pipeline", :id => id) + Thread.new do + LogStash::Util.set_thread_name("pipeline.#{id}") + begin + pipeline.run + rescue => e + @logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace) + end + end + sleep 0.01 until pipeline.ready? + end + + ## Pipeline Aux methods ## + def fetch_config(settings) + @config_loader.format_config(settings[:config_path], settings[:config_string]) + end + + private + def node_uuid + @node_uuid ||= SecureRandom.uuid + end + + ### Version actions ### + def show_version + show_version_logstash + + if [:info, :debug].include?(verbosity?) || debug? || verbose? + show_version_ruby + show_version_java if LogStash::Environment.jruby? + show_gems if [:debug].include?(verbosity?) || debug? + end + end # def show_version + + def show_version_logstash + require "logstash/version" + puts "logstash #{LOGSTASH_VERSION}" + end # def show_version_logstash + + def show_version_ruby + puts RUBY_DESCRIPTION + end # def show_version_ruby + + def show_version_java + properties = java.lang.System.getProperties + puts "java #{properties["java.version"]} (#{properties["java.vendor"]})" + puts "jvm #{properties["java.vm.name"]} / #{properties["java.vm.version"]}" + end # def show_version_java + + def show_gems + require "rubygems" + Gem::Specification.each do |spec| + puts "gem #{spec.name} #{spec.version}" + end + end # def show_gems + end # class LogStash::Agent diff --git a/logstash-core/lib/logstash/config/defaults.rb b/logstash-core/lib/logstash/config/defaults.rb index ac3466f77..c0c18fd7c 100644 --- a/logstash-core/lib/logstash/config/defaults.rb +++ b/logstash-core/lib/logstash/config/defaults.rb @@ -6,6 +6,14 @@ module LogStash module Config module Defaults extend self + def input + "input { stdin { type => stdin } }" + end + + def output + "output { stdout { codec => rubydebug } }" + end + def cpu_cores Concurrent.processor_count end diff --git a/logstash-core/lib/logstash/config/loader.rb b/logstash-core/lib/logstash/config/loader.rb new file mode 100644 index 000000000..37179518e --- /dev/null +++ b/logstash-core/lib/logstash/config/loader.rb @@ -0,0 +1,90 @@ +require "logstash/config/defaults" + +module LogStash; module Config; class Loader + def initialize(logger) + @logger = logger + end + + def format_config(config_path, config_string) + config_string = config_string.to_s + if config_path + # Append the config string. + # This allows users to provide both -f and -e flags. The combination + # is rare, but useful for debugging. + config_string = config_string + load_config(config_path) + else + # include a default stdin input if no inputs given + if config_string !~ /input *{/ + config_string += LogStash::Config::Defaults.input + end + # include a default stdout output if no outputs given + if config_string !~ /output *{/ + config_string += LogStash::Config::Defaults.output + end + end + config_string + end + + def load_config(path) + begin + uri = URI.parse(path) + + case uri.scheme + when nil then + local_config(path) + when /http/ then + fetch_config(uri) + when "file" then + local_config(uri.path) + else + fail(I18n.t("logstash.runner.configuration.scheme-not-supported", :path => path)) + end + rescue URI::InvalidURIError + # fallback for windows. + # if the parsing of the file failed we assume we can reach it locally. + # some relative path on windows arent parsed correctly (.\logstash.conf) + local_config(path) + end + end + + def local_config(path) + path = ::File.expand_path(path) + path = ::File.join(path, "*") if ::File.directory?(path) + + if Dir.glob(path).length == 0 + fail(I18n.t("logstash.runner.configuration.file-not-found", :path => path)) + end + + config = "" + encoding_issue_files = [] + Dir.glob(path).sort.each do |file| + next unless ::File.file?(file) + if file.match(/~$/) + @logger.debug("NOT reading config file because it is a temp file", :config_file => file) + next + end + @logger.debug("Reading config file", :config_file => file) + cfg = ::File.read(file) + if !cfg.ascii_only? && !cfg.valid_encoding? + encoding_issue_files << file + end + config << cfg + "\n" + @logger.debug? && @logger.debug("\nThe following is the content of a file", :config_file => file.to_s) + @logger.debug? && @logger.debug("\n" + cfg + "\n\n") + end + if encoding_issue_files.any? + fail("The following config files contains non-ascii characters but are not UTF-8 encoded #{encoding_issue_files}") + end + @logger.debug? && @logger.debug("\nThe following is the merged configuration") + @logger.debug? && @logger.debug("\n" + config + "\n\n") + return config + end # def load_config + + def fetch_config(uri) + begin + Net::HTTP.get(uri) + "\n" + rescue Exception => e + fail(I18n.t("logstash.runner.configuration.fetch-failed", :path => uri.to_s, :message => e.message)) + end + end +end end end diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index d29206aa2..da8b26cb9 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -17,7 +17,7 @@ require "logstash/pipeline_reporter" require "logstash/output_delegator" module LogStash; class Pipeline - attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger + attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger, :thread, :config_str, :original_settings DEFAULT_SETTINGS = { :default_pipeline_workers => LogStash::Config::CpuCoreStrategy.maximum, @@ -28,9 +28,20 @@ module LogStash; class Pipeline } MAX_INFLIGHT_WARN_THRESHOLD = 10_000 + def self.validate_config(config_str, settings = {}) + begin + # There should be a better way to test this + self.new(config_str, settings) + rescue => e + e.message + end + end + def initialize(config_str, settings = {}) - @pipeline_id = settings[:pipeline_id] || self.object_id + @config_str = config_str + @original_settings = settings @logger = Cabin::Channel.get(LogStash) + @pipeline_id = settings[:pipeline_id] || self.object_id @settings = DEFAULT_SETTINGS.clone settings.each {|setting, value| configure(setting, value) } @reporter = LogStash::PipelineReporter.new(@logger, self) @@ -117,13 +128,12 @@ module LogStash; class Pipeline end def run - LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager") @logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings)) + @thread = Thread.current start_workers - @logger.info("Pipeline started") - @logger.terminal("Logstash startup completed") + @logger.log("Pipeline #{@pipeline_id} started") # Block until all inputs have stopped # Generally this happens if SIGINT is sent and `shutdown` is called from an external thread @@ -138,8 +148,7 @@ module LogStash; class Pipeline shutdown_flusher shutdown_workers - @logger.info("Pipeline shutdown complete.") - @logger.terminal("Logstash shutdown completed") + @logger.log("Pipeline #{@pipeline_id} has been shutdown") # exit code return 0 diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 4831d5330..6d64f0fca 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -9,11 +9,15 @@ LogStash::Environment.load_locale! require "logstash/namespace" require "logstash/program" +require "logstash/config/defaults" class LogStash::Runner include LogStash::Program + attr_reader :agent + def main(args) + require "logstash/util" require "logstash/util/java_version" require "stud/trap" @@ -121,4 +125,5 @@ Available commands: def show_help(command) puts command.help end + end # class LogStash::Runner diff --git a/logstash-core/lib/logstash/shutdown_watcher.rb b/logstash-core/lib/logstash/shutdown_watcher.rb index 82d5aa216..fa0d1f01f 100644 --- a/logstash-core/lib/logstash/shutdown_watcher.rb +++ b/logstash-core/lib/logstash/shutdown_watcher.rb @@ -34,8 +34,8 @@ module LogStash end def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) - watcher = self.new(pipeline, cycle_period, report_every, abort_threshold) - Thread.new(watcher) { |watcher| watcher.start } + controller = self.new(pipeline, cycle_period, report_every, abort_threshold) + Thread.new(controller) { |controller| controller.start } end def logger @@ -47,6 +47,7 @@ module LogStash cycle_number = 0 stalled_count = 0 Stud.interval(@cycle_period) do + break unless @pipeline.thread.alive? @reports << pipeline_report_snapshot @reports.delete_at(0) if @reports.size > @report_every # expire old report if cycle_number == (@report_every - 1) # it's report time! diff --git a/logstash-core/lib/logstash/special_agent.rb b/logstash-core/lib/logstash/special_agent.rb new file mode 100644 index 000000000..ada28849e --- /dev/null +++ b/logstash-core/lib/logstash/special_agent.rb @@ -0,0 +1,8 @@ +# encoding: utf-8 +require "logstash/agent" + +class LogStash::SpecialAgent < LogStash::Agent + def fetch_config(settings) + Net::HTTP.get(settings["remote.url"]) + end +end diff --git a/logstash-core/locales/en.yml b/logstash-core/locales/en.yml index d19b54216..ee238c14c 100644 --- a/logstash-core/locales/en.yml +++ b/logstash-core/locales/en.yml @@ -59,12 +59,14 @@ en: missing-configuration: >- No configuration file was specified. Perhaps you forgot to provide the '-f yourlogstash.conf' flag? + reload-without-config-path: >- + Configuration reloading also requires passing a configuration path with '-f yourlogstash.conf' error: >- Error: %{error} sigint: >- - SIGINT received. Shutting down the pipeline. + SIGINT received. Shutting down the agent. sigterm: >- - SIGTERM received. Shutting down the pipeline. + SIGTERM received. Shutting down the agent. slow_shutdown: |- Received shutdown signal, but pipeline is still waiting for in-flight events to be processed. Sending another ^C will force quit Logstash, but this may cause @@ -163,6 +165,10 @@ en: pipeline-batch-delay: |+ When creating pipeline batches, how long to wait while polling for the next event. + auto_reload: |+ + Monitor configuration changes and reload + whenever it is changed. + NOTE: use SIGHUP to manually reload the config log: |+ Write logstash internal logs to the given file. Without this flag, logstash will emit diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 54f994d88..17ecb6a1e 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -1,41 +1,169 @@ # encoding: utf-8 require 'spec_helper' +require 'stud/temporary' +require 'stud/task' describe LogStash::Agent do - subject { LogStash::Agent.new('') } - let(:dummy_config) { 'input {}' } - context "when loading the configuration" do - context "when local" do - before { expect(subject).to receive(:local_config).with(path) } + let(:logger) { double("logger") } + let(:agent_args) { [] } + subject { LogStash::Agent.new("", "") } - context "unix" do - let(:path) { './test.conf' } - it 'works with relative path' do - subject.load_config(path) - end + before :each do + [:log, :info, :warn, :error, :fatal, :debug].each do |level| + allow(logger).to receive(level) + end + [:info?, :warn?, :error?, :fatal?, :debug?].each do |level| + allow(logger).to receive(level) + end + allow(logger).to receive(:level=) + allow(logger).to receive(:subscribe) + subject.parse(agent_args) + subject.instance_variable_set("@reload_interval", 0.01) + subject.instance_variable_set("@logger", logger) + end + + describe "register_pipeline" do + let(:pipeline_id) { "main" } + let(:settings) { { + :config_string => "input { } filter { } output { }", + :pipeline_workers => 4 + } } + + it "should delegate settings to new pipeline" do + expect(LogStash::Pipeline).to receive(:new).with(settings[:config_string], hash_including(settings)) + subject.register_pipeline(pipeline_id, settings) + end + end + + describe "#execute" do + let(:sample_config) { "input { generator { count => 100000 } } output { }" } + let(:config_file) { Stud::Temporary.pathname } + + before :each do + File.open(config_file, "w") {|f| f.puts sample_config } + end + + after :each do + File.unlink(config_file) + end + + context "when auto_reload is false" do + let(:agent_args) { [ "--config", config_file] } #reload_interval => 0.01, :config_path => } } + + before :each do + allow(subject).to receive(:sleep) + allow(subject).to receive(:clean_state?).and_return(false) + allow(subject).to receive(:running_pipelines?).and_return(true) end - context "windows" do - let(:path) { '.\test.conf' } - it 'work with relative windows path' do - subject.load_config(path) + context "if state is clean" do + it "should not reload_state!" do + expect(subject).to_not receive(:reload_state!) + t = Thread.new { subject.execute } + sleep 0.1 + Stud.stop!(t) + t.join end end end - context "when remote" do - context 'supported scheme' do - let(:path) { "http://test.local/superconfig.conf" } - - before { expect(Net::HTTP).to receive(:get) { dummy_config } } - it 'works with http' do - expect(subject.load_config(path)).to eq("#{dummy_config}\n") + context "when auto_reload is true" do + let(:agent_args) { [ "--auto-reload", "--config", config_file] } #reload_interval => 0.01, :config_path => } } + #let(:agent_args) { { :logger => logger, :auto_reload => false, :reload_interval => 0.01, :config_path => config_file } } + context "if state is clean" do + it "should periodically reload_state" do + allow(subject).to receive(:clean_state?).and_return(false) + expect(subject).to receive(:reload_state!).at_least(3).times + t = Thread.new { subject.execute } + sleep 0.1 + Stud.stop!(t) + t.join end end end end + describe "#reload_state!" do + let(:pipeline_id) { "main" } + let(:first_pipeline_config) { "input { } filter { } output { }" } + let(:second_pipeline_config) { "input { generator {} } filter { } output { }" } + let(:pipeline_settings) { { + :config_string => first_pipeline_config, + :pipeline_workers => 4 + } } + + before(:each) do + subject.register_pipeline(pipeline_id, pipeline_settings) + end + + context "when fetching a new state" do + it "upgrades the state" do + expect(subject).to receive(:fetch_config).and_return(second_pipeline_config) + expect(subject).to receive(:upgrade_pipeline).with(pipeline_id, kind_of(LogStash::Pipeline)) + subject.send(:reload_state!) + end + end + context "when fetching the same state" do + it "doesn't upgrade the state" do + expect(subject).to receive(:fetch_config).and_return(first_pipeline_config) + expect(subject).to_not receive(:upgrade_pipeline) + subject.send(:reload_state!) + end + end + end + + describe "#upgrade_pipeline" do + let(:pipeline_id) { "main" } + let(:pipeline_config) { "input { } filter { } output { }" } + let(:pipeline_settings) { { + :config_string => pipeline_config, + :pipeline_workers => 4 + } } + let(:new_pipeline_config) { "input { generator {} } output { }" } + + before(:each) do + subject.register_pipeline(pipeline_id, pipeline_settings) + end + + context "when the upgrade fails" do + before :each do + allow(subject).to receive(:fetch_config).and_return(new_pipeline_config) + allow(subject).to receive(:create_pipeline).and_return(nil) + allow(subject).to receive(:stop_pipeline) + end + + it "leaves the state untouched" do + subject.send(:reload_state!) + expect(subject.pipelines[pipeline_id].config_str).to eq(pipeline_config) + end + + context "and current state is empty" do + it "should not start a pipeline" do + expect(subject).to_not receive(:start_pipeline) + subject.send(:reload_state!) + end + end + end + + context "when the upgrade succeeds" do + let(:new_config) { "input { generator { count => 1 } } output { }" } + before :each do + allow(subject).to receive(:fetch_config).and_return(new_config) + allow(subject).to receive(:stop_pipeline) + end + it "updates the state" do + subject.send(:reload_state!) + expect(subject.pipelines[pipeline_id].config_str).to eq(new_config) + end + it "starts the pipeline" do + expect(subject).to receive(:stop_pipeline) + expect(subject).to receive(:start_pipeline) + subject.send(:reload_state!) + end + end + end + context "--pluginpath" do let(:single_path) { "/some/path" } let(:multiple_paths) { ["/some/path1", "/some/path2"] } @@ -58,5 +186,67 @@ describe LogStash::Agent do subject.configure_plugin_paths(multiple_paths) end end + + describe "#fetch_config" do + let(:file_config) { "input { generator { count => 100 } } output { }" } + let(:cli_config) { "filter { drop { } } " } + let(:tmp_config_path) { Stud::Temporary.pathname } + let(:agent_args) { [ "-e", "filter { drop { } } ", "-f", tmp_config_path ] } + + before :each do + IO.write(tmp_config_path, file_config) + end + + after :each do + File.unlink(tmp_config_path) + end + + it "should join the config string and config path content" do + settings = { :config_path => tmp_config_path, :config_string => cli_config } + fetched_config = subject.send(:fetch_config, settings) + expect(fetched_config.strip).to eq(cli_config + IO.read(tmp_config_path)) + end + end + + context "--pluginpath" do + let(:single_path) { "/some/path" } + let(:multiple_paths) { ["/some/path1", "/some/path2"] } + + it "should fail with single invalid dir path" do + expect(File).to receive(:directory?).and_return(false) + expect(LogStash::Environment).not_to receive(:add_plugin_path) + expect{subject.configure_plugin_paths(single_path)}.to raise_error(LogStash::ConfigurationError) + end + end + + describe "pipeline settings" do + let(:pipeline_string) { "input { stdin {} } output { stdout {} }" } + let(:main_pipeline_settings) { { :pipeline_id => "main" } } + let(:pipeline) { double("pipeline") } + + before(:each) do + task = Stud::Task.new { 1 } + allow(pipeline).to receive(:run).and_return(task) + allow(pipeline).to receive(:shutdown) + end + + context "when :pipeline_workers is not defined by the user" do + it "should not pass the value to the pipeline" do + expect(LogStash::Pipeline).to receive(:new).once.with(pipeline_string, hash_excluding(:pipeline_workers)).and_return(pipeline) + args = ["-e", pipeline_string] + subject.run(args) + end + end + + context "when :pipeline_workers is defined by the user" do + it "should pass the value to the pipeline" do + main_pipeline_settings[:pipeline_workers] = 2 + expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, hash_including(main_pipeline_settings)).and_return(pipeline) + args = ["-w", "2", "-e", pipeline_string] + subject.run(args) + end + end + end + end diff --git a/logstash-core/spec/logstash/config/loader_spec.rb b/logstash-core/spec/logstash/config/loader_spec.rb new file mode 100644 index 000000000..b51272ee1 --- /dev/null +++ b/logstash-core/spec/logstash/config/loader_spec.rb @@ -0,0 +1,36 @@ +# encoding: utf-8 +require "spec_helper" +require "logstash/config/loader" + +describe LogStash::Config::Loader do + subject { described_class.new(Cabin::Channel.get) } + context "when local" do + before { expect(subject).to receive(:local_config).with(path) } + + context "unix" do + let(:path) { './test.conf' } + it 'works with relative path' do + subject.load_config(path) + end + end + + context "windows" do + let(:path) { '.\test.conf' } + it 'work with relative windows path' do + subject.load_config(path) + end + end + end + + context "when remote" do + context 'supported scheme' do + let(:path) { "http://test.local/superconfig.conf" } + let(:dummy_config) { 'input {}' } + + before { expect(Net::HTTP).to receive(:get) { dummy_config } } + it 'works with http' do + expect(subject.load_config(path)).to eq("#{dummy_config}\n") + end + end + end +end diff --git a/logstash-core/spec/logstash/runner_spec.rb b/logstash-core/spec/logstash/runner_spec.rb index cc9415be0..a57556349 100644 --- a/logstash-core/spec/logstash/runner_spec.rb +++ b/logstash-core/spec/logstash/runner_spec.rb @@ -10,6 +10,13 @@ end describe LogStash::Runner do + let(:channel) { Cabin::Channel.new } + + before :each do + allow(Cabin::Channel).to receive(:get).with(LogStash).and_return(channel) + end + + context "argument parsing" do it "should run agent" do expect(Stud::Task).to receive(:new).once.and_return(nil) @@ -38,31 +45,15 @@ describe LogStash::Runner do end end - describe "pipeline settings" do - let(:pipeline_string) { "input { stdin {} } output { stdout {} }" } - let(:base_pipeline_settings) { { :pipeline_id => "base" } } - let(:pipeline) { double("pipeline") } + context "--auto-reload" do + context "when -f is not given" do - before(:each) do - task = Stud::Task.new { 1 } - allow(pipeline).to receive(:run).and_return(task) - end + let(:args) { ["agent", "-r", "-e", "input {} output {}"] } - context "when pipeline workers is not defined by the user" do - it "should not pass the value to the pipeline" do - expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline) - args = ["agent", "-e", pipeline_string] - subject.run(args).wait - end - end - - context "when pipeline workers is defined by the user" do - it "should pass the value to the pipeline" do - base_pipeline_settings[:pipeline_workers] = 2 - expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline) - args = ["agent", "-w", "2", "-e", pipeline_string] - subject.run(args).wait + it "should exit immediately" do + expect(subject.run(args).wait).to eq(1) end end end + end diff --git a/logstash-core/spec/logstash/shutdown_watcher_spec.rb b/logstash-core/spec/logstash/shutdown_watcher_spec.rb index 28dfa2f12..118e126ea 100644 --- a/logstash-core/spec/logstash/shutdown_watcher_spec.rb +++ b/logstash-core/spec/logstash/shutdown_watcher_spec.rb @@ -17,6 +17,7 @@ describe LogStash::ShutdownWatcher do LogStash::ShutdownWatcher.logger = channel allow(pipeline).to receive(:reporter).and_return(reporter) + allow(pipeline).to receive(:thread).and_return(Thread.current) allow(reporter).to receive(:snapshot).and_return(reporter_snapshot) allow(reporter_snapshot).to receive(:o_simple_hash).and_return({})