separate server/client modes to separate methods; use ShutdownSignal to identify termination

This commit is contained in:
Jordan Sissel 2013-06-26 18:13:22 -07:00
parent 50079f1bba
commit 4dc1207483

View file

@ -59,8 +59,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
def register
require "socket"
require "timeout"
require "openssl"
if @ssl_enable
require "openssl"
@ssl_context = OpenSSL::SSL::SSLContext.new
@ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
@ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase)
@ -96,16 +96,16 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
private
def handle_socket(socket, event_source, output_queue)
begin
loop do
while true
buf = nil
# NOTE(petef): the timeout only hits after the line is read
# or socket dies
# TODO(sissel): Why do we have a timeout here? What's the point?
if @data_timeout == -1
buf = readline(socket)
buf = readline(socket).chomp
else
Timeout::timeout(@data_timeout) do
buf = readline(socket)
buf = readline(socket).chomp
end
end
@codec.decode(buf) do |event|
@ -143,69 +143,84 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
public
def run(output_queue)
if server?
@thread = Thread.current
@client_threads = []
loop do
# Start a new thread for each connection.
begin
@client_threads << Thread.start(@server_socket.accept) do |s|
# TODO(sissel): put this block in its own method.
run_server(output_queue)
else
run_client(output_queue)
end
end # def run
# monkeypatch a 'peer' method onto the socket.
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Accepted connection", :client => s.peer,
:server => "#{@host}:#{@port}")
begin
handle_socket(s, "tcp://#{s.peer}/", output_queue)
rescue Interrupted
s.close rescue nil
end
end # Thread.start
def run_server(output_queue)
@thread = Thread.current
@client_threads = []
loop do
# Start a new thread for each connection.
begin
@client_threads << Thread.start(@server_socket.accept) do |s|
# TODO(sissel): put this block in its own method.
# monkeypatch a 'peer' method onto the socket.
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Accepted connection", :client => s.peer,
:server => "#{@host}:#{@port}")
begin
handle_socket(s, "tcp://#{s.peer}/", output_queue)
rescue Interrupted
s.close rescue nil
end
end # Thread.start
rescue OpenSSL::SSL::SSLError => ssle
# NOTE(mrichar1): This doesn't return a useful error message for some reason
@logger.error("SSL Error", :exception => ssle,
:backtrace => ssle.backtrace)
rescue IOError, LogStash::ShutdownSignal
if @interrupted
# Intended shutdown, get out of the loop
@server_socket.close
@client_threads.each do |thread|
thread.raise(LogStash::ShutdownSignal)
end
break
else
# Else it was a genuine IOError caused by something else, so propagate it up..
raise
end
end
end # loop
rescue LogStash::ShutdownSignal
# nothing to do
ensure
@server_socket.close
end # def run_server
def run_client(output_queue)
@thread = Thread.current
while true
client_socket = TCPSocket.new(@host, @port)
if @ssl_enable
client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
begin
client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
# NOTE(mrichar1): This doesn't return a useful error message for some reason
@logger.error("SSL Error", :exception => ssle,
:backtrace => ssle.backtrace)
rescue IOError, Interrupted
if @interrupted
# Intended shutdown, get out of the loop
@server_socket.close
@client_threads.each do |thread|
thread.raise(IOError.new)
end
break
else
# Else it was a genuine IOError caused by something else, so propagate it up..
raise
end
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
next
end
end # loop
else
loop do
client_socket = TCPSocket.new(@host, @port)
if @ssl_enable
client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
begin
client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
@logger.error("SSL Error", :exception => ssle,
:backtrace => ssle.backtrace)
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
next
end
end
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
handle_socket(client_socket, "tcp://#{client_socket.peer}/server", output_queue)
end # loop
end
end
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
handle_socket(client_socket, "tcp://#{client_socket.peer}/server", output_queue)
end # loop
ensure
client_socket.close
end # def run
public
def teardown
if server?
@interrupted = true
@thread.raise(Interrupted.new)
@thread.raise(LogStash::ShutdownSignal)
end
end # def teardown
end # class LogStash::Inputs::Tcp