mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
allow teardown in tcp input
This commit is contained in:
parent
9aa7f1fe9b
commit
32d3860a8c
1 changed files with 29 additions and 10 deletions
|
@ -67,10 +67,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
end # loop do
|
||||
rescue => e
|
||||
@logger.debug("Closing connection", :client => socket.peer,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
rescue Timeout::Error
|
||||
@logger.debug("Closing connection after read timeout",
|
||||
:client => socket.peer)
|
||||
:client => socket.peer)
|
||||
end # begin
|
||||
|
||||
begin
|
||||
|
@ -95,15 +95,26 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
if server?
|
||||
loop do
|
||||
# Start a new thread for each connection.
|
||||
Thread.start(@server_socket.accept) do |s|
|
||||
# TODO(sissel): put this block in its own method.
|
||||
begin
|
||||
Thread.start(@server_socket.accept) do |s|
|
||||
# TODO(sissel): put this block in its own method.
|
||||
|
||||
# monkeypatch a 'peer' method onto the socket.
|
||||
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
|
||||
@logger.debug("Accepted connection", :client => s.peer,
|
||||
:server => "#{@host}:#{@port}")
|
||||
handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}")
|
||||
end # Thread.start
|
||||
# monkeypatch a 'peer' method onto the socket.
|
||||
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
|
||||
@logger.debug("Accepted connection", :client => s.peer,
|
||||
:server => "#{@host}:#{@port}")
|
||||
handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}")
|
||||
|
||||
end # Thread.start
|
||||
rescue IOError
|
||||
if @interrupted
|
||||
#Intended shutdown, get out of the loop
|
||||
break
|
||||
else
|
||||
# Else it was a genuine IOError caused by something else, so propagate it up..
|
||||
raise
|
||||
end
|
||||
end
|
||||
end # loop
|
||||
else
|
||||
loop do
|
||||
|
@ -114,4 +125,12 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
end # loop
|
||||
end
|
||||
end # def run
|
||||
|
||||
public
|
||||
def teardown
|
||||
if server?
|
||||
@interrupted = true
|
||||
@server_socket.close
|
||||
end
|
||||
end # def teardown
|
||||
end # class LogStash::Inputs::Tcp
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue