mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
fix connection threads tracking leak
better mutex name, loop with while true event per connection & thread cleanups specs remove leftover closes #1509
This commit is contained in:
parent
0d18814d02
commit
7b6ab95124
2 changed files with 138 additions and 40 deletions
|
@ -59,10 +59,16 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
|
||||
public
|
||||
def register
|
||||
fix_streaming_codecs
|
||||
require "socket"
|
||||
require "timeout"
|
||||
require "openssl"
|
||||
|
||||
# monkey patch TCPSocket and SSLSocket to include socket peer
|
||||
TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
|
||||
OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer}
|
||||
|
||||
fix_streaming_codecs
|
||||
|
||||
if @ssl_enable
|
||||
@ssl_context = OpenSSL::SSL::SSLContext.new
|
||||
@ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
|
||||
|
@ -86,8 +92,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
begin
|
||||
@server_socket = TCPServer.new(@host, @port)
|
||||
rescue Errno::EADDRINUSE
|
||||
@logger.error("Could not start TCP server: Address in use",
|
||||
:host => @host, :port => @port)
|
||||
@logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
|
||||
raise
|
||||
end
|
||||
if @ssl_enable
|
||||
|
@ -100,8 +105,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
def handle_socket(socket, client_address, output_queue, codec)
|
||||
while true
|
||||
buf = nil
|
||||
# NOTE(petef): the timeout only hits after the line is read
|
||||
# or socket dies
|
||||
# NOTE(petef): the timeout only hits after the line is read or socket dies
|
||||
# TODO(sissel): Why do we have a timeout here? What's the point?
|
||||
if @data_timeout == -1
|
||||
buf = read(socket)
|
||||
|
@ -116,14 +120,16 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
decorate(event)
|
||||
output_queue << event
|
||||
end
|
||||
end # loop do
|
||||
end # loop
|
||||
rescue EOFError
|
||||
@logger.debug("Connection closed", :client => socket.peer)
|
||||
@logger.debug? && @logger.debug("Connection closed", :client => socket.peer)
|
||||
rescue Errno::ECONNRESET
|
||||
@logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peer)
|
||||
rescue => e
|
||||
@logger.debug("An error occurred. Closing connection",
|
||||
:client => socket.peer, :exception => e, :backtrace => e.backtrace)
|
||||
@logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace)
|
||||
ensure
|
||||
socket.close rescue IOError nil
|
||||
socket.close rescue nil
|
||||
|
||||
codec.respond_to?(:flush) && codec.flush do |event|
|
||||
event["host"] ||= client_address
|
||||
event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify
|
||||
|
@ -132,6 +138,20 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
end
|
||||
end
|
||||
|
||||
private
|
||||
def client_thread(output_queue, socket)
|
||||
Thread.new(output_queue, socket) do |q, s|
|
||||
begin
|
||||
@logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
|
||||
handle_socket(s, s.peer, q, @codec.clone)
|
||||
rescue Interrupted
|
||||
s.close rescue nil
|
||||
ensure
|
||||
@client_threads_lock.synchronize{@client_threads.delete(Thread.current)}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def server?
|
||||
@mode == "server"
|
||||
|
@ -154,36 +174,29 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
def run_server(output_queue)
|
||||
@thread = Thread.current
|
||||
@client_threads = []
|
||||
loop do
|
||||
# Start a new thread for each connection.
|
||||
begin
|
||||
@client_threads << Thread.start(@server_socket.accept) do |s|
|
||||
# TODO(sissel): put this block in its own method.
|
||||
@client_threads_lock = Mutex.new
|
||||
|
||||
# monkeypatch a 'peer' method onto the socket.
|
||||
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
|
||||
@logger.debug("Accepted connection", :client => s.peer,
|
||||
:server => "#{@host}:#{@port}")
|
||||
begin
|
||||
handle_socket(s, s.peer, output_queue, @codec.clone)
|
||||
rescue Interrupted
|
||||
s.close rescue nil
|
||||
end
|
||||
end # Thread.start
|
||||
while true
|
||||
begin
|
||||
socket = @server_socket.accept
|
||||
# start a new thread for each connection.
|
||||
@client_threads_lock.synchronize{@client_threads << client_thread(output_queue, socket)}
|
||||
rescue OpenSSL::SSL::SSLError => ssle
|
||||
# NOTE(mrichar1): This doesn't return a useful error message for some reason
|
||||
@logger.error("SSL Error", :exception => ssle,
|
||||
:backtrace => ssle.backtrace)
|
||||
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
|
||||
rescue IOError, LogStash::ShutdownSignal
|
||||
if @interrupted
|
||||
# Intended shutdown, get out of the loop
|
||||
@server_socket.close
|
||||
@client_threads.each do |thread|
|
||||
thread.raise(LogStash::ShutdownSignal)
|
||||
@server_socket.close rescue nil
|
||||
|
||||
threads = @client_threads_lock.synchronize{@client_threads.dup}
|
||||
threads.each do |thread|
|
||||
thread.raise(LogStash::ShutdownSignal) if thread.alive?
|
||||
end
|
||||
|
||||
# intended shutdown, get out of the loop
|
||||
break
|
||||
else
|
||||
# Else it was a genuine IOError caused by something else, so propagate it up..
|
||||
# it was a genuine IOError, propagate it up
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
@ -194,7 +207,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
@server_socket.close rescue nil
|
||||
end # def run_server
|
||||
|
||||
def run_client(output_queue)
|
||||
def run_client(output_queue)
|
||||
@thread = Thread.current
|
||||
while true
|
||||
client_socket = TCPSocket.new(@host, @port)
|
||||
|
@ -203,19 +216,17 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
begin
|
||||
client_socket.connect
|
||||
rescue OpenSSL::SSL::SSLError => ssle
|
||||
@logger.error("SSL Error", :exception => ssle,
|
||||
:backtrace => ssle.backtrace)
|
||||
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
|
||||
# NOTE(mrichar1): Hack to prevent hammering peer
|
||||
sleep(5)
|
||||
next
|
||||
end
|
||||
end
|
||||
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
|
||||
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
|
||||
handle_socket(client_socket, client_socket.peer, output_queue, @codec.clone)
|
||||
end # loop
|
||||
ensure
|
||||
client_socket.close
|
||||
client_socket.close rescue nil
|
||||
end # def run
|
||||
|
||||
public
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
# coding: utf-8
|
||||
require "test_utils"
|
||||
require "socket"
|
||||
require "timeout"
|
||||
require "logstash/json"
|
||||
|
||||
describe "inputs/tcp", :socket => true do
|
||||
describe "inputs/tcp" do
|
||||
extend LogStash::RSpec
|
||||
|
||||
describe "read plain with unicode" do
|
||||
|
@ -28,6 +29,9 @@ describe "inputs/tcp", :socket => true do
|
|||
end
|
||||
socket.close
|
||||
|
||||
# wait till all events have been processed
|
||||
Timeout.timeout(1) {sleep 0.1 while queue.size < event_count}
|
||||
|
||||
events = event_count.times.collect { queue.pop }
|
||||
event_count.times do |i|
|
||||
insist { events[i]["message"] } == "#{i} ☹"
|
||||
|
@ -57,6 +61,9 @@ describe "inputs/tcp", :socket => true do
|
|||
socket.puts(text)
|
||||
socket.close
|
||||
|
||||
# wait till all events have been processed
|
||||
Timeout.timeout(1) {sleep 0.1 while queue.size < 1}
|
||||
|
||||
event = queue.pop
|
||||
# Make sure the 0xA3 latin-1 code converts correctly to UTF-8.
|
||||
pending("charset conv broken") do
|
||||
|
@ -93,9 +100,12 @@ describe "inputs/tcp", :socket => true do
|
|||
socket.puts(LogStash::Json.dump(data))
|
||||
socket.close
|
||||
|
||||
# wait till all events have been processed
|
||||
Timeout.timeout(1) {sleep 0.1 while queue.size < 1}
|
||||
|
||||
event = queue.pop
|
||||
insist { event["hello"] } == data["hello"]
|
||||
insist { event["foo"] } == data["foo"]
|
||||
insist { event["foo"].to_a } == data["foo"] # to_a to cast Java ArrayList produced by JrJackson
|
||||
insist { event["baz"] } == data["baz"]
|
||||
|
||||
# Make sure the tcp input, w/ json codec, uses the event's 'host' value,
|
||||
|
@ -127,6 +137,9 @@ describe "inputs/tcp", :socket => true do
|
|||
socket.puts(LogStash::Json.dump(data))
|
||||
socket.close
|
||||
|
||||
# wait till all events have been processed
|
||||
Timeout.timeout(1) {sleep 0.1 while queue.size < 1}
|
||||
|
||||
event = queue.pop
|
||||
insist { event["hello"] } == data["hello"]
|
||||
insist { event }.include?("host")
|
||||
|
@ -165,12 +178,86 @@ describe "inputs/tcp", :socket => true do
|
|||
(1..5).each do |idx|
|
||||
event = queue.pop
|
||||
insist { event["hello"] } == data["hello"]
|
||||
insist { event["foo"] } == data["foo"]
|
||||
insist { event["foo"].to_a } == data["foo"] # to_a to cast Java ArrayList produced by JrJackson
|
||||
insist { event["baz"] } == data["baz"]
|
||||
insist { event["idx"] } == idx
|
||||
end # do
|
||||
end # input
|
||||
end # describe
|
||||
|
||||
describe "one message per connection" do
|
||||
event_count = 10
|
||||
port = 5515
|
||||
config <<-CONFIG
|
||||
input {
|
||||
tcp {
|
||||
port => #{port}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
event_count.times do |i|
|
||||
socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) }
|
||||
socket.puts("#{i}")
|
||||
socket.flush
|
||||
socket.close
|
||||
end
|
||||
|
||||
# wait till all events have been processed
|
||||
Timeout.timeout(1) {sleep 0.1 while queue.size < event_count}
|
||||
|
||||
# since each message is sent on its own tcp connection & thread, exact receiving order cannot be garanteed
|
||||
events = event_count.times.collect{queue.pop}.sort_by{|event| event["message"]}
|
||||
|
||||
event_count.times do |i|
|
||||
insist { events[i]["message"] } == "#{i}"
|
||||
end
|
||||
end # input
|
||||
end
|
||||
|
||||
describe "connection threads are cleaned up when connection is closed" do
|
||||
event_count = 10
|
||||
port = 5515
|
||||
config <<-CONFIG
|
||||
input {
|
||||
tcp {
|
||||
port => #{port}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
inputs = pipeline.instance_variable_get("@inputs")
|
||||
insist { inputs.size } == 1
|
||||
|
||||
sockets = event_count.times.map do |i|
|
||||
socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) }
|
||||
socket.puts("#{i}")
|
||||
socket.flush
|
||||
socket
|
||||
end
|
||||
|
||||
# wait till all events have been processed
|
||||
Timeout.timeout(1) {sleep 0.1 while queue.size < event_count}
|
||||
|
||||
# we should have "event_count" pending threads since sockets were not closed yet
|
||||
client_threads = inputs[0].instance_variable_get("@client_threads")
|
||||
insist { client_threads.size } == event_count
|
||||
|
||||
# close all sockets and make sure there is not more pending threads
|
||||
sockets.each{|socket| socket.close}
|
||||
Timeout.timeout(1) {sleep 0.1 while client_threads.size > 0}
|
||||
insist { client_threads.size } == 0 # this check is actually useless per previous line
|
||||
|
||||
end # input
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue