diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 2342a07c8..da24f1ecd 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -2,7 +2,6 @@ require "logstash/outputs/base" require "logstash/namespace" require "thread" - # Write events over a TCP socket. # # Each event json is separated by a newline. @@ -68,6 +67,7 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base public def register + require "stud/try" if server? @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") @server_socket = TCPServer.new(@host, @port) @@ -91,16 +91,24 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base @client_threads.reject! {|t| !t.alive? } end else - @client_socket = nil + client_socket = nil @codec.on_event do |payload| begin - connect unless @client_socket - @client_socket.write(payload) + client_socket = connect unless client_socket + r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) + # don't expect any reads, but a readable socket might + # mean the remote end closed, so read it and throw it away. + # we'll get an EOFError if it happens. + client_socket.sysread(16384) if r.any? + + # Now send the payload + client_socket.syswrite(payload) if w.any? rescue => e @logger.warn("tcp output exception", :host => @host, :port => @port, :exception => e, :backtrace => e.backtrace) + client_socket.close rescue nil + client_socket = nil sleep @reconnect_interval - @client_socket = nil retry end end @@ -109,7 +117,9 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base private def connect - @client_socket = TCPSocket.new(@host, @port) + Stud::try do + return TCPSocket.new(@host, @port) + end end # def connect private