From 24169ef1655426edead76b06ba887e5cce169454 Mon Sep 17 00:00:00 2001 From: Pete Fritchman Date: Tue, 20 Oct 2009 17:40:44 +0000 Subject: [PATCH] - refactor agent to be a real daemon - refactor configs (all inherit from BaseConfig) - move beef of agent code to lib/net/clients/ - use optionparser's ".order" instead of ".parse" --- bin/agent.rb | 150 +++++++++++++++++++++++++-------------- bin/logstashd | 4 +- lib/config/agent.rb | 15 ++++ lib/config/base.rb | 22 ++++++ lib/config/indexer.rb | 43 +++++------ lib/net/clients/agent.rb | 49 +++++++++++++ 6 files changed, 201 insertions(+), 82 deletions(-) create mode 100644 lib/config/agent.rb create mode 100644 lib/config/base.rb create mode 100755 lib/net/clients/agent.rb diff --git a/bin/agent.rb b/bin/agent.rb index b116c2f90..e5ea549d0 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -1,65 +1,109 @@ #!/usr/bin/env ruby -require 'rubygems' -require 'lib/net/client' -require 'lib/net/messages/indexevent' -require 'lib/net/messages/quit' -require 'lib/file/tail/since' -require 'stomp' -require 'socket' +$: << File.join(File.dirname(__FILE__), "..") -class Agent < LogStash::Net::MessageClient - def initialize(config) - host, port = config["server"].split(":") - host ||= "localhost" - port ||= 61613 - super(username="", password="", host=host, port=port) - @hostname = Socket.gethostname - @config = config - @msgs = [] - end # def initialize +require 'lib/net/clients/agent' +require 'logger' +require 'optparse' - def start_log_watcher - @config["sources"].each do |file, logtype| - Thread.new do - File::Tail::Since.new(file).tail do |line| - index(logtype, line.chomp) +$progname = $0.split(File::SEPARATOR).last +$version = "0.3" +$logger = Logger.new(STDOUT) +$logger.level = Logger::INFO +$logger.progname = $progname +$logger.datetime_format = "%Y-%m-%d %H:%M:%S" + +def main(args) + Thread::abort_on_exception = true + + options = parse_options(args) + + if options[:logfile] + logfd = File.open(options[:logfile], "a") + $stdout.reopen(logfd) + $stderr.reopen(logfd) + else + # Require a logfile for daemonization + if options[:daemonize] + $stderr.puts "Daemonizing requires you specify a logfile (--logfile), " \ + "none was given" + return 1 + end + end + + if options[:daemonize] + fork and exit(0) + + # Copied mostly from Daemons.daemonize, but since the ruby 1.8 'daemons' + # and gem 'daemons' have api variances, let's do it ourselves since nobody + # agrees. + + trap("SIGHUP", "IGNORE") + ObjectSpace.each_object(IO) do |io| + # closing STDIN is ok, but keep STDOUT and STDERR + # close everything else + next if [STDOUT, STDERR].include?(io) + begin + unless io.closed? + io.close end + rescue ::Exception end end - end # def start_log_watcher - - def index(type, string) - ier = LogStash::Net::Messages::IndexEventRequest.new - ier.log_type = type - ier.log_data = string.strip_upper_ascii - ier.metadata["source_host"] = @hostname - - puts "Indexing: #{string}" - - sendmsg("logstash", ier) - end # def index - - def IndexEventResponseHandler(msg) - if msg.code != 0 - puts msg.inspect - end - end # def IndexEventResponseHandler - - def run - start_log_watcher - super end -end -if $0 == __FILE__ - if ARGV.length != 1 - $stderr.puts "Usage: #{$0} configfile" - exit 1 + if options[:pidfile] + File.open(options[:pidfile], "w+") { |f| f.puts $$ } end - Thread::abort_on_exception = true - configfile = ARGV[0] - config = YAML.load(File.open(configfile).read()) - agent = Agent.new(config) + + agent = LogStash::Net::Clients::Agent.new(options[:config], $logger) agent.run end + +def parse_options(args) + options = {:daemonize => true, + :logfile => nil, + :pidfile => nil, + :config => nil, + } + + opts = OptionParser.new do |opts| + opts.banner = "Usage: agent.rb [options] configfile" + opts.version = $version + + opts.on("-d", "--debug", "Enable debug output") do |x| + $logger.level = Logger::DEBUG + end + + opts.on("--pidfile FILE", "Path to pidfile") do |x| + options[:pidfile] = x + end + + opts.on("-f", "--foreground", "Do not daemonize") do |x| + options[:daemonize] = false + end + + opts.on("-l FILE", "--logfile FILE", "File path to put logs") do |x| + options[:logfile] = x + end + end + + begin + opts.order!(args) + rescue + $stderr.puts "#{$progname}: #{$!}" + $stderr.puts opts + exit(1) + end + + if args.length != 1 + $stderr.puts "#{$progname}: must specify exactly one config file" + $stderr.puts opts + exit(1) + end + options[:config] = args.shift + + return options +end + +exit main(ARGV) diff --git a/bin/logstashd b/bin/logstashd index c02a49451..c0a598a20 100755 --- a/bin/logstashd +++ b/bin/logstashd @@ -173,14 +173,14 @@ def parse_options(args) end begin - opts.parse!(args) + opts.order!(args) rescue $stderr.puts "#{$progname}: #{$!}" $stderr.puts opts exit(1) end - if ARGV.length != 1 + if args.length != 1 $stderr.puts "#{$progname}: must specify exactly one config file" $stderr.puts opts exit(1) diff --git a/lib/config/agent.rb b/lib/config/agent.rb new file mode 100644 index 000000000..6f69214f3 --- /dev/null +++ b/lib/config/agent.rb @@ -0,0 +1,15 @@ +require "lib/config/base" + +module LogStash; module Config + class AgentConfig < BaseConfig + attr_reader :sources + attr_reader :logstash_dir + + def initialize(file) + super(file) + obj = YAML::load(File.open(file).read()) + + @sources = obj["sources"] + end # def initialize + end # class AgentConfig +end; end # module LogStash::Config diff --git a/lib/config/base.rb b/lib/config/base.rb new file mode 100644 index 000000000..d263ecb1d --- /dev/null +++ b/lib/config/base.rb @@ -0,0 +1,22 @@ +require 'yaml' + +module LogStash; module Config + # Base config class. All configs need to know how to get to a broker. + class BaseConfig + attr_reader :mqhost + attr_reader :mqport + attr_reader :mquser + attr_reader :mqpass + attr_reader :mqvhost + + def initialize(file) + obj = YAML::load(File.open(file).read()) + + @mqhost = obj["mqhost"] || "localhost" + @mqport = obj["mqport"] || 5672 + @mquser = obj["mquser"] || "guest" + @mqpass = obj["mqpass"] || "guest" + @mqvhost = obj["mqvhost"] || "/" + end # def initialize + end # class BaseConfig +end; end # module LogStash::Config diff --git a/lib/config/indexer.rb b/lib/config/indexer.rb index d5053ae94..cae0f86bd 100644 --- a/lib/config/indexer.rb +++ b/lib/config/indexer.rb @@ -1,50 +1,39 @@ -require 'yaml' +require 'lib/config/base' require 'lib/logs' require 'lib/log/json' require 'lib/log/text' -module LogStash::Config - class IndexerConfig +module LogStash; module Config + class IndexerConfig < BaseConfig attr_reader :logs attr_reader :logstash_dir - attr_reader :mqhost - attr_reader :mqport - attr_reader :mquser - attr_reader :mqpass - attr_reader :mqvhost def initialize(file) + super(file) obj = YAML::load(File.open(file).read()) - @mqhost = obj["mqhost"] || "localhost" - @mqport = obj["mqport"] || 5672 - @mquser = obj["mquser"] || "guest" - @mqpass = obj["mqpass"] || "guest" - @mqvhost = obj["mqvhost"] || "/" @logstash_dir = obj["logstash_dir"] @logs = LogStash::Logs.new obj["log-types"].each do |log_type, data| log = nil - #puts ":: #{log_type}" + log_config = {:type => log_type, + :date_key => data["date"]["key"], + :date_format => data["date"]["format"], + :logstash_dir => @logstash_dir, + } + case data["type"] when "text" - - log = LogStash::Log::TextLog.new(:type => log_type, - :grok_patterns => data["patterns"], - :date_key => data["date"]["key"], - :date_format => data["date"]["format"], - :logstash_dir => @logstash_dir) + log_config[:grok_patterns] = data["patterns"] + log = LogStash::Log::TextLog.new(log_config) when "json" - log = LogStash::Log::JsonLog.new(:type => log_type, - :line_format => data["display_format"], - :date_key => data["date"]["key"], - :date_format => data["date"]["format"], - :logstash_dir => @logstash_dir) + log_config[:line_format] = data["display_format"] + log = LogStash::Log::JsonLog.new(log_config) end @logs.register(log) end - end + end # def initialize end # class IndexerConfig -end # module LogStash::Config +end; end # module LogStash::Config diff --git a/lib/net/clients/agent.rb b/lib/net/clients/agent.rb new file mode 100755 index 000000000..3df6a8ec5 --- /dev/null +++ b/lib/net/clients/agent.rb @@ -0,0 +1,49 @@ +require 'lib/config/agent' +require 'lib/net/client' +require 'lib/net/messages/indexevent' +require 'lib/file/tail/since' +require 'socket' + +module LogStash; module Net; module Clients + class Agent < LogStash::Net::MessageClient + def initialize(configfile, logger) + @config = LogStash::Config::AgentConfig.new(configfile) + super(@config, nil) + @hostname = Socket.gethostname + @msgs = [] + @logger = logger + end # def initialize + + def start_log_watcher + @config.sources.each do |file, logtype| + Thread.new do + @logger.info "Watching #{file} (type #{logtype})" + File::Tail::Since.new(file).tail do |line| + index(logtype, line.chomp) + end + end + end + end # def start_log_watcher + + def index(type, string) + ier = LogStash::Net::Messages::IndexEventRequest.new + ier.log_type = type + ier.log_data = string.strip_upper_ascii + ier.metadata["source_host"] = @hostname + + @logger.debug "Indexing #{type}: #{string}" + sendmsg("logstash", ier) + end # def index + + def IndexEventResponseHandler(msg) + if msg.code != 0 + @logger.warn "Error indexing line (code=#{msg.code}): #{msg.error}" + end + end # def IndexEventResponseHandler + + def run + start_log_watcher + super + end + end +end; end; end # LogStash::Net::Clients