mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- facter out common program activities to LogStash::Program
This commit is contained in:
parent
8b32b04aea
commit
1ac4ed62a6
2 changed files with 158 additions and 92 deletions
133
bin/logstashd
133
bin/logstashd
|
@ -7,6 +7,7 @@ require 'lib/net/servers/indexer'
|
|||
require 'lib/net/servers/parser'
|
||||
require 'logger'
|
||||
require 'optparse'
|
||||
require 'lib/program'
|
||||
|
||||
$progname = $0.split(File::SEPARATOR).last
|
||||
$version = "0.3"
|
||||
|
@ -15,120 +16,68 @@ $logger.level = Logger::INFO
|
|||
$logger.progname = $progname
|
||||
$logger.datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
def main(args)
|
||||
Thread::abort_on_exception = true
|
||||
|
||||
options = parse_options(args)
|
||||
children = {}
|
||||
|
||||
if options[:logfile]
|
||||
logfd = File.open(options[:logfile], "a")
|
||||
$stdout.reopen(logfd)
|
||||
$stderr.reopen(logfd)
|
||||
else
|
||||
# Require a logfile for daemonization
|
||||
if options[:daemonize]
|
||||
$stderr.puts "Daemonizing requires you specify a logfile (--logfile), " \
|
||||
"none was given"
|
||||
return 1
|
||||
end
|
||||
end
|
||||
|
||||
if options[:daemonize]
|
||||
fork and exit(0)
|
||||
|
||||
# Copied mostly from Daemons.daemonize, but since the ruby 1.8 'daemons'
|
||||
# and gem 'daemons' have api variances, let's do it ourselves since nobody
|
||||
# agrees.
|
||||
|
||||
trap("SIGHUP", "IGNORE")
|
||||
ObjectSpace.each_object(IO) do |io|
|
||||
# closing STDIN is ok, but keep STDOUT and STDERR
|
||||
# close everything else
|
||||
next if [STDOUT, STDERR].include?(io)
|
||||
begin
|
||||
unless io.closed?
|
||||
io.close
|
||||
end
|
||||
rescue ::Exception
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
pidfile = nil
|
||||
if options[:pidfile]
|
||||
pidfile = File.open(options[:pidfile], IO::RDWR | IO::CREAT)
|
||||
gotlock = pidfile.flock(File::LOCK_EX | File::LOCK_NB)
|
||||
if !gotlock
|
||||
owner = pidfile.read()
|
||||
if owner.length == 0
|
||||
owner = "unknown"
|
||||
end
|
||||
$stderr.puts "Failed to get lock on #{options[:pidfile]}; owned by #{owner}"
|
||||
exit(1)
|
||||
end
|
||||
pidfile.truncate(0)
|
||||
pidfile.puts $$
|
||||
pidfile.flush
|
||||
end
|
||||
|
||||
if options[:indexer]
|
||||
class LogStash::Daemon < LogStash::Program
|
||||
def start_indexer
|
||||
pid = fork do
|
||||
$0 = "logstashd (indexer)"
|
||||
indexer = LogStash::Net::Servers::Indexer.new(options[:config],
|
||||
$0 = "logstashd (indexer)"
|
||||
indexer = LogStash::Net::Servers::Indexer.new(@options[:config],
|
||||
$logger)
|
||||
indexer.run
|
||||
exit(0)
|
||||
end
|
||||
$logger.info "starting indexer (pid #{pid})"
|
||||
children[pid] = :indexer
|
||||
@children[pid] = :indexer
|
||||
end
|
||||
|
||||
if options[:parsers] > 0
|
||||
1.upto(options[:parsers]) do |i|
|
||||
def start_parsers
|
||||
1.upto(@options[:parsers]) do |i|
|
||||
pid = fork do
|
||||
$0 = "logstashd (parser)"
|
||||
parser = LogStash::Net::Servers::Parser.new(options[:config],
|
||||
parser = LogStash::Net::Servers::Parser.new(@options[:config],
|
||||
$logger)
|
||||
parser.run
|
||||
exit(0)
|
||||
end
|
||||
$logger.info "starting parser #{i}/#{options[:parsers]} (pid #{pid})"
|
||||
children[pid] = :parser
|
||||
$logger.info "starting parser #{i}/#{@options[:parsers]} (pid #{pid})"
|
||||
@children[pid] = :parser
|
||||
end
|
||||
end
|
||||
|
||||
$0 = "logstashd (supervisor)"
|
||||
dying = false
|
||||
Signal.trap("INT") do
|
||||
Process.kill("TERM", $$)
|
||||
end
|
||||
def run
|
||||
@children = {}
|
||||
super
|
||||
|
||||
Signal.trap("TERM") do
|
||||
dying = true
|
||||
$logger.warn "received SIGTERM, shutting down"
|
||||
children.keys.each { |pid| Process.kill("TERM", pid) rescue nil }
|
||||
Process.waitall
|
||||
if pidfile
|
||||
pidfile.close
|
||||
File.unlink(options[:pidfile])
|
||||
start_indexer if @options[:indexer]
|
||||
start_parsers if @options[:parsers] > 0
|
||||
|
||||
$0 = "logstashd (supervisor)"
|
||||
|
||||
termination_handler do
|
||||
@children.keys.each { |pid| Process.kill("TERM", pid) rescue nil }
|
||||
end
|
||||
exit(5)
|
||||
end
|
||||
|
||||
# We do this lame loop instead of "Process.waitall" because of a bug
|
||||
# in ruby 1.8.5 related to handling SIGTERM.
|
||||
while children.keys.length > 0
|
||||
pid = Process.waitpid(-1, Process::WNOHANG)
|
||||
if pid and !dying
|
||||
$logger.fatal "pid #{pid} died unexpectedly (#{children[pid]}), " \
|
||||
"initiating shutdown"
|
||||
Process.kill("TERM", $$)
|
||||
puts "Children: #{@children.inspect}"
|
||||
|
||||
# We do this lame loop instead of "Process.waitall" because of a bug
|
||||
# in ruby 1.8.5 related to handling SIGTERM.
|
||||
|
||||
while @children.keys.length > 0
|
||||
pid = Process.waitpid(-1, 0)
|
||||
if pid and !dying?
|
||||
$logger.fatal "pid #{pid} died unexpectedly (#{@children[pid]}), " \
|
||||
"initiating shutdown"
|
||||
Process.kill("TERM", $$)
|
||||
end
|
||||
end
|
||||
sleep(5)
|
||||
end
|
||||
|
||||
return 0
|
||||
return 0
|
||||
end # def run
|
||||
end # class LogStash::Daemon
|
||||
|
||||
def main(args)
|
||||
options = parse_options(args)
|
||||
logstashd = LogStash::Daemon.new(options)
|
||||
return logstashd.run
|
||||
end
|
||||
|
||||
def parse_options(args)
|
||||
|
|
117
lib/program.rb
Normal file
117
lib/program.rb
Normal file
|
@ -0,0 +1,117 @@
|
|||
require 'rubygems'
|
||||
#require 'lib/util'
|
||||
|
||||
module LogStash
|
||||
|
||||
class Program
|
||||
class PidFileLockFailed < StandardError
|
||||
end # class LogStash::Program::PidFileLockFailed
|
||||
|
||||
def initialize(options)
|
||||
@pidfile = options[:pidfile]
|
||||
@logfile = options[:logfile]
|
||||
@daemonize = options[:daemonize]
|
||||
@options = options
|
||||
@dying = false
|
||||
end
|
||||
|
||||
def run
|
||||
Thread::abort_on_exception = true
|
||||
redirect_io
|
||||
daemonize if @daemonize
|
||||
grab_pidfile if @pidfile
|
||||
termination_handler do
|
||||
puts "Default termination signal handler being invoked."
|
||||
end
|
||||
yield @options if block_given?
|
||||
end # def run
|
||||
|
||||
def termination_handler(&block)
|
||||
puts "Block: #{block.inspect}"
|
||||
puts "Block: #{block.inspect}"
|
||||
puts "Block: #{block.inspect}"
|
||||
puts "Block: #{block.inspect}"
|
||||
puts "Block: #{block.inspect}"
|
||||
|
||||
@termination_callback = block
|
||||
Signal.trap("INT") do
|
||||
Process.kill("TERM", $$)
|
||||
end
|
||||
|
||||
Signal.trap("TERM") do
|
||||
dying
|
||||
$logger.warn "received SIGTERM, shutting down"
|
||||
@termination_handler.call if @termination_handler
|
||||
Process.waitall
|
||||
if @pidfile_fd
|
||||
@pidfile_fd.close
|
||||
@pidfile_fd.delete
|
||||
end
|
||||
exit(5)
|
||||
end
|
||||
end # def register_signals
|
||||
|
||||
def redirect_io
|
||||
if @logfile != nil
|
||||
logfd = File.open(@logfile, "a")
|
||||
logfd.sync = true
|
||||
$stdout.reopen(logfd)
|
||||
$stderr.reopen(logfd)
|
||||
else
|
||||
# Require a logfile for daemonization
|
||||
if @daemonize
|
||||
$stderr.puts "Daemonizing requires you specify a logfile"
|
||||
return 1
|
||||
end
|
||||
end
|
||||
end # def redirect_io
|
||||
|
||||
def grab_pidfile
|
||||
if @pidfile
|
||||
pidfile = File.open(@pidfile, IO::RDWR | IO::CREAT)
|
||||
gotlock = pidfile.flock(File::LOCK_EX | File::LOCK_NB)
|
||||
if !gotlock
|
||||
owner = pidfile.read()
|
||||
if owner.length == 0
|
||||
owner = "unknown"
|
||||
end
|
||||
$stderr.puts "Failed to get lock on #{@pidfile}; owned by #{owner}"
|
||||
raise LogStash::Program::PidFileLockFailed(@pidfile)
|
||||
end
|
||||
pidfile.truncate(0)
|
||||
pidfile.puts $$
|
||||
pidfile.flush
|
||||
@pidfile_fd = pidfile
|
||||
end
|
||||
end # def grab_pidfile
|
||||
|
||||
def daemonize
|
||||
fork and exit(0)
|
||||
|
||||
# Copied mostly from Daemons.daemonize, but since the ruby 1.8 'daemons'
|
||||
# and gem 'daemons' have api variances, let's do it ourselves since nobody
|
||||
# agrees.
|
||||
trap("SIGHUP", "IGNORE")
|
||||
Process.setsid
|
||||
ObjectSpace.each_object(IO) do |io|
|
||||
# closing STDIN is ok, but keep STDOUT and STDERR
|
||||
# close everything else
|
||||
next if [$stdout, $stdout].include?(io)
|
||||
begin
|
||||
unless io.closed?
|
||||
io.close
|
||||
end
|
||||
rescue ::Exception
|
||||
end
|
||||
end
|
||||
end # def daemonize
|
||||
|
||||
def dying
|
||||
@dying = true
|
||||
end
|
||||
|
||||
def dying?
|
||||
return @dying
|
||||
end
|
||||
end # class LogStash::Program
|
||||
end # class LogStash
|
Loading…
Add table
Add a link
Reference in a new issue