mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- refactor agent to be a real daemon
- refactor configs (all inherit from BaseConfig) - move beef of agent code to lib/net/clients/ - use optionparser's ".order" instead of ".parse"
This commit is contained in:
parent
26082d9309
commit
24169ef165
6 changed files with 201 additions and 82 deletions
150
bin/agent.rb
150
bin/agent.rb
|
@ -1,65 +1,109 @@
|
|||
#!/usr/bin/env ruby
|
||||
|
||||
require 'rubygems'
|
||||
require 'lib/net/client'
|
||||
require 'lib/net/messages/indexevent'
|
||||
require 'lib/net/messages/quit'
|
||||
require 'lib/file/tail/since'
|
||||
require 'stomp'
|
||||
require 'socket'
|
||||
$: << File.join(File.dirname(__FILE__), "..")
|
||||
|
||||
class Agent < LogStash::Net::MessageClient
|
||||
def initialize(config)
|
||||
host, port = config["server"].split(":")
|
||||
host ||= "localhost"
|
||||
port ||= 61613
|
||||
super(username="", password="", host=host, port=port)
|
||||
@hostname = Socket.gethostname
|
||||
@config = config
|
||||
@msgs = []
|
||||
end # def initialize
|
||||
require 'lib/net/clients/agent'
|
||||
require 'logger'
|
||||
require 'optparse'
|
||||
|
||||
def start_log_watcher
|
||||
@config["sources"].each do |file, logtype|
|
||||
Thread.new do
|
||||
File::Tail::Since.new(file).tail do |line|
|
||||
index(logtype, line.chomp)
|
||||
$progname = $0.split(File::SEPARATOR).last
|
||||
$version = "0.3"
|
||||
$logger = Logger.new(STDOUT)
|
||||
$logger.level = Logger::INFO
|
||||
$logger.progname = $progname
|
||||
$logger.datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
def main(args)
|
||||
Thread::abort_on_exception = true
|
||||
|
||||
options = parse_options(args)
|
||||
|
||||
if options[:logfile]
|
||||
logfd = File.open(options[:logfile], "a")
|
||||
$stdout.reopen(logfd)
|
||||
$stderr.reopen(logfd)
|
||||
else
|
||||
# Require a logfile for daemonization
|
||||
if options[:daemonize]
|
||||
$stderr.puts "Daemonizing requires you specify a logfile (--logfile), " \
|
||||
"none was given"
|
||||
return 1
|
||||
end
|
||||
end
|
||||
|
||||
if options[:daemonize]
|
||||
fork and exit(0)
|
||||
|
||||
# Copied mostly from Daemons.daemonize, but since the ruby 1.8 'daemons'
|
||||
# and gem 'daemons' have api variances, let's do it ourselves since nobody
|
||||
# agrees.
|
||||
|
||||
trap("SIGHUP", "IGNORE")
|
||||
ObjectSpace.each_object(IO) do |io|
|
||||
# closing STDIN is ok, but keep STDOUT and STDERR
|
||||
# close everything else
|
||||
next if [STDOUT, STDERR].include?(io)
|
||||
begin
|
||||
unless io.closed?
|
||||
io.close
|
||||
end
|
||||
rescue ::Exception
|
||||
end
|
||||
end
|
||||
end # def start_log_watcher
|
||||
|
||||
def index(type, string)
|
||||
ier = LogStash::Net::Messages::IndexEventRequest.new
|
||||
ier.log_type = type
|
||||
ier.log_data = string.strip_upper_ascii
|
||||
ier.metadata["source_host"] = @hostname
|
||||
|
||||
puts "Indexing: #{string}"
|
||||
|
||||
sendmsg("logstash", ier)
|
||||
end # def index
|
||||
|
||||
def IndexEventResponseHandler(msg)
|
||||
if msg.code != 0
|
||||
puts msg.inspect
|
||||
end
|
||||
end # def IndexEventResponseHandler
|
||||
|
||||
def run
|
||||
start_log_watcher
|
||||
super
|
||||
end
|
||||
end
|
||||
|
||||
if $0 == __FILE__
|
||||
if ARGV.length != 1
|
||||
$stderr.puts "Usage: #{$0} configfile"
|
||||
exit 1
|
||||
if options[:pidfile]
|
||||
File.open(options[:pidfile], "w+") { |f| f.puts $$ }
|
||||
end
|
||||
Thread::abort_on_exception = true
|
||||
configfile = ARGV[0]
|
||||
config = YAML.load(File.open(configfile).read())
|
||||
agent = Agent.new(config)
|
||||
|
||||
agent = LogStash::Net::Clients::Agent.new(options[:config], $logger)
|
||||
agent.run
|
||||
end
|
||||
|
||||
def parse_options(args)
|
||||
options = {:daemonize => true,
|
||||
:logfile => nil,
|
||||
:pidfile => nil,
|
||||
:config => nil,
|
||||
}
|
||||
|
||||
opts = OptionParser.new do |opts|
|
||||
opts.banner = "Usage: agent.rb [options] configfile"
|
||||
opts.version = $version
|
||||
|
||||
opts.on("-d", "--debug", "Enable debug output") do |x|
|
||||
$logger.level = Logger::DEBUG
|
||||
end
|
||||
|
||||
opts.on("--pidfile FILE", "Path to pidfile") do |x|
|
||||
options[:pidfile] = x
|
||||
end
|
||||
|
||||
opts.on("-f", "--foreground", "Do not daemonize") do |x|
|
||||
options[:daemonize] = false
|
||||
end
|
||||
|
||||
opts.on("-l FILE", "--logfile FILE", "File path to put logs") do |x|
|
||||
options[:logfile] = x
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
opts.order!(args)
|
||||
rescue
|
||||
$stderr.puts "#{$progname}: #{$!}"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
end
|
||||
|
||||
if args.length != 1
|
||||
$stderr.puts "#{$progname}: must specify exactly one config file"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
end
|
||||
options[:config] = args.shift
|
||||
|
||||
return options
|
||||
end
|
||||
|
||||
exit main(ARGV)
|
||||
|
|
|
@ -173,14 +173,14 @@ def parse_options(args)
|
|||
end
|
||||
|
||||
begin
|
||||
opts.parse!(args)
|
||||
opts.order!(args)
|
||||
rescue
|
||||
$stderr.puts "#{$progname}: #{$!}"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
end
|
||||
|
||||
if ARGV.length != 1
|
||||
if args.length != 1
|
||||
$stderr.puts "#{$progname}: must specify exactly one config file"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
|
|
15
lib/config/agent.rb
Normal file
15
lib/config/agent.rb
Normal file
|
@ -0,0 +1,15 @@
|
|||
require "lib/config/base"
|
||||
|
||||
module LogStash; module Config
|
||||
class AgentConfig < BaseConfig
|
||||
attr_reader :sources
|
||||
attr_reader :logstash_dir
|
||||
|
||||
def initialize(file)
|
||||
super(file)
|
||||
obj = YAML::load(File.open(file).read())
|
||||
|
||||
@sources = obj["sources"]
|
||||
end # def initialize
|
||||
end # class AgentConfig
|
||||
end; end # module LogStash::Config
|
22
lib/config/base.rb
Normal file
22
lib/config/base.rb
Normal file
|
@ -0,0 +1,22 @@
|
|||
require 'yaml'
|
||||
|
||||
module LogStash; module Config
|
||||
# Base config class. All configs need to know how to get to a broker.
|
||||
class BaseConfig
|
||||
attr_reader :mqhost
|
||||
attr_reader :mqport
|
||||
attr_reader :mquser
|
||||
attr_reader :mqpass
|
||||
attr_reader :mqvhost
|
||||
|
||||
def initialize(file)
|
||||
obj = YAML::load(File.open(file).read())
|
||||
|
||||
@mqhost = obj["mqhost"] || "localhost"
|
||||
@mqport = obj["mqport"] || 5672
|
||||
@mquser = obj["mquser"] || "guest"
|
||||
@mqpass = obj["mqpass"] || "guest"
|
||||
@mqvhost = obj["mqvhost"] || "/"
|
||||
end # def initialize
|
||||
end # class BaseConfig
|
||||
end; end # module LogStash::Config
|
|
@ -1,50 +1,39 @@
|
|||
require 'yaml'
|
||||
require 'lib/config/base'
|
||||
require 'lib/logs'
|
||||
require 'lib/log/json'
|
||||
require 'lib/log/text'
|
||||
|
||||
module LogStash::Config
|
||||
class IndexerConfig
|
||||
module LogStash; module Config
|
||||
class IndexerConfig < BaseConfig
|
||||
attr_reader :logs
|
||||
attr_reader :logstash_dir
|
||||
attr_reader :mqhost
|
||||
attr_reader :mqport
|
||||
attr_reader :mquser
|
||||
attr_reader :mqpass
|
||||
attr_reader :mqvhost
|
||||
|
||||
def initialize(file)
|
||||
super(file)
|
||||
obj = YAML::load(File.open(file).read())
|
||||
|
||||
@mqhost = obj["mqhost"] || "localhost"
|
||||
@mqport = obj["mqport"] || 5672
|
||||
@mquser = obj["mquser"] || "guest"
|
||||
@mqpass = obj["mqpass"] || "guest"
|
||||
@mqvhost = obj["mqvhost"] || "/"
|
||||
@logstash_dir = obj["logstash_dir"]
|
||||
@logs = LogStash::Logs.new
|
||||
|
||||
obj["log-types"].each do |log_type, data|
|
||||
log = nil
|
||||
#puts ":: #{log_type}"
|
||||
log_config = {:type => log_type,
|
||||
:date_key => data["date"]["key"],
|
||||
:date_format => data["date"]["format"],
|
||||
:logstash_dir => @logstash_dir,
|
||||
}
|
||||
|
||||
case data["type"]
|
||||
when "text"
|
||||
|
||||
log = LogStash::Log::TextLog.new(:type => log_type,
|
||||
:grok_patterns => data["patterns"],
|
||||
:date_key => data["date"]["key"],
|
||||
:date_format => data["date"]["format"],
|
||||
:logstash_dir => @logstash_dir)
|
||||
log_config[:grok_patterns] = data["patterns"]
|
||||
log = LogStash::Log::TextLog.new(log_config)
|
||||
when "json"
|
||||
log = LogStash::Log::JsonLog.new(:type => log_type,
|
||||
:line_format => data["display_format"],
|
||||
:date_key => data["date"]["key"],
|
||||
:date_format => data["date"]["format"],
|
||||
:logstash_dir => @logstash_dir)
|
||||
log_config[:line_format] = data["display_format"]
|
||||
log = LogStash::Log::JsonLog.new(log_config)
|
||||
end
|
||||
|
||||
@logs.register(log)
|
||||
end
|
||||
end
|
||||
end # def initialize
|
||||
end # class IndexerConfig
|
||||
end # module LogStash::Config
|
||||
end; end # module LogStash::Config
|
||||
|
|
49
lib/net/clients/agent.rb
Executable file
49
lib/net/clients/agent.rb
Executable file
|
@ -0,0 +1,49 @@
|
|||
require 'lib/config/agent'
|
||||
require 'lib/net/client'
|
||||
require 'lib/net/messages/indexevent'
|
||||
require 'lib/file/tail/since'
|
||||
require 'socket'
|
||||
|
||||
module LogStash; module Net; module Clients
|
||||
class Agent < LogStash::Net::MessageClient
|
||||
def initialize(configfile, logger)
|
||||
@config = LogStash::Config::AgentConfig.new(configfile)
|
||||
super(@config, nil)
|
||||
@hostname = Socket.gethostname
|
||||
@msgs = []
|
||||
@logger = logger
|
||||
end # def initialize
|
||||
|
||||
def start_log_watcher
|
||||
@config.sources.each do |file, logtype|
|
||||
Thread.new do
|
||||
@logger.info "Watching #{file} (type #{logtype})"
|
||||
File::Tail::Since.new(file).tail do |line|
|
||||
index(logtype, line.chomp)
|
||||
end
|
||||
end
|
||||
end
|
||||
end # def start_log_watcher
|
||||
|
||||
def index(type, string)
|
||||
ier = LogStash::Net::Messages::IndexEventRequest.new
|
||||
ier.log_type = type
|
||||
ier.log_data = string.strip_upper_ascii
|
||||
ier.metadata["source_host"] = @hostname
|
||||
|
||||
@logger.debug "Indexing #{type}: #{string}"
|
||||
sendmsg("logstash", ier)
|
||||
end # def index
|
||||
|
||||
def IndexEventResponseHandler(msg)
|
||||
if msg.code != 0
|
||||
@logger.warn "Error indexing line (code=#{msg.code}): #{msg.error}"
|
||||
end
|
||||
end # def IndexEventResponseHandler
|
||||
|
||||
def run
|
||||
start_log_watcher
|
||||
super
|
||||
end
|
||||
end
|
||||
end; end; end # LogStash::Net::Clients
|
Loading…
Add table
Add a link
Reference in a new issue