mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Merge pull request #657 from logstash/tcp-output-client-close-fix
Tcp output client remote-has-closed fix
This commit is contained in:
commit
3ac1244a18
1 changed files with 16 additions and 6 deletions
|
@ -2,7 +2,6 @@ require "logstash/outputs/base"
|
|||
require "logstash/namespace"
|
||||
require "thread"
|
||||
|
||||
|
||||
# Write events over a TCP socket.
|
||||
#
|
||||
# Each event json is separated by a newline.
|
||||
|
@ -68,6 +67,7 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
|
||||
public
|
||||
def register
|
||||
require "stud/try"
|
||||
if server?
|
||||
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
|
||||
@server_socket = TCPServer.new(@host, @port)
|
||||
|
@ -91,16 +91,24 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
@client_threads.reject! {|t| !t.alive? }
|
||||
end
|
||||
else
|
||||
@client_socket = nil
|
||||
client_socket = nil
|
||||
@codec.on_event do |payload|
|
||||
begin
|
||||
connect unless @client_socket
|
||||
@client_socket.write(payload)
|
||||
client_socket = connect unless client_socket
|
||||
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
|
||||
# don't expect any reads, but a readable socket might
|
||||
# mean the remote end closed, so read it and throw it away.
|
||||
# we'll get an EOFError if it happens.
|
||||
client_socket.sysread(16384) if r.any?
|
||||
|
||||
# Now send the payload
|
||||
client_socket.syswrite(payload) if w.any?
|
||||
rescue => e
|
||||
@logger.warn("tcp output exception", :host => @host, :port => @port,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
client_socket.close rescue nil
|
||||
client_socket = nil
|
||||
sleep @reconnect_interval
|
||||
@client_socket = nil
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
@ -109,7 +117,9 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
|
||||
private
|
||||
def connect
|
||||
@client_socket = TCPSocket.new(@host, @port)
|
||||
Stud::try do
|
||||
return TCPSocket.new(@host, @port)
|
||||
end
|
||||
end # def connect
|
||||
|
||||
private
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue