diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 7743ff861..638a0062a 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -6,67 +6,104 @@ require "timeout" # Read events over a TCP socket. # # Like stdin and file inputs, each event is assumed to be one line of text. +# +# Can either accept connections from clients or connect to a server, +# depending on `mode`. class LogStash::Inputs::Tcp < LogStash::Inputs::Base config_name "tcp" - # The address to listen on + # When mode is `server`, the address to listen on. + # When mode is `client`, the address to connect to. config :host, :validate => :string, :default => "0.0.0.0" - - # the port to listen on + + # When mode is `server`, the port to listen on. + # When mode is `client`, the port to connect to. config :port, :validate => :number, :required => true # Read timeout in seconds. If a particular tcp connection is - # idle for more than this timeout period, we will assume + # idle for more than this timeout period, we will assume # it is dead and close it. # If you never want to timeout, use -1. config :data_timeout, :validate => :number, :default => 5 + # Mode to operate in. `server` listens for client connections, + # `client` connects to a server. + config :mode, :validate => ["server", "client"], :default => "server" + + module SocketPeer + public + def peer + "#{peeraddr[3]}:#{peeraddr[1]}" + end # def peer + end # module SocketPeer + public def register - @logger.info("Starting tcp listener on #{@host}:#{@port}") - @server = TCPServer.new(@host, @port) + if server? + @logger.info("Starting tcp input listener on #{@host}:#{@port}") + @server_socket = TCPServer.new(@host, @port) + end end # def register + private + def handle_socket(socket, output_queue, event_source) + begin + loop do + buf = nil + # NOTE(petef): the timeout only hits after the line is read + # or socket dies + # TODO(sissel): Why do we have a timeout here? What's the point? + if @data_timeout == -1 + buf = socket.readline + else + Timeout::timeout(@data_timeout) do + buf = socket.readline + end + end + e = self.to_event(buf, event_source) + if e + output_queue << e + end + end # loop do + rescue => e + @logger.debug(["Closing connection with #{socket.peer}", $!]) + @logger.debug(["Backtrace", e.backtrace]) + rescue Timeout::Error + @logger.debug("Closing connection with #{socket.peer} after read timeout") + end # begin + + begin + socket.close + rescue IOError + pass + end # begin + end + + private + def server? + @mode == "server" + end # def server? + public def run(output_queue) - loop do - # Start a new thread for each connection. - Thread.start(@server.accept) do |s| - # TODO(sissel): put this block in its own method. - peer = "#{s.peeraddr[3]}:#{s.peeraddr[1]}" - @logger.debug("Accepted connection from #{peer} on #{@host}:#{@port}") - begin - loop do - buf = nil - # NOTE(petef): the timeout only hits after the line is read - # or socket dies - # TODO(sissel): Why do we have a timeout here? What's the point? - if @data_timeout == -1 - buf = s.readline - else - Timeout::timeout(@data_timeout) do - buf = s.readline - end - end - e = self.to_event(buf, "tcp://#{@host}:#{@port}/client/#{peer}") - if e - output_queue << e - end - end # loop do - rescue => e - @logger.debug(["Closing connection with #{peer}", $!]) - @logger.debug(["Backtrace", e.backtrace]) - rescue Timeout::Error - @logger.debug("Closing connection with #{peer} after read timeout") - end # begin - - begin - s.close - rescue IOError - pass - end # begin - end # Thread.start - end # loop (outer) + if server? + loop do + # Start a new thread for each connection. + Thread.start(@server_socket.accept) do |s| + # TODO(sissel): put this block in its own method. + s.instance_eval { class << self; include SocketPeer end } + @logger.debug("Accepted connection from #{s.peer} on #{@host}:#{@port}") + handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}") + end # Thread.start + end # loop + else + loop do + client_socket = TCPSocket.new(@host, @port) + client_socket.instance_eval { class << self; include SocketPeer end } + @logger.debug("Opened connection to #{client_socket.peer}") + handle_socket(client_socket, output_queue, "tcp://#{client_socket.peer}/server") + end # loop + end end # def run end # class LogStash::Inputs::Tcp diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index fcbebb0e7..70d736455 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -1,45 +1,109 @@ require "logstash/outputs/base" require "logstash/namespace" +require "thread" -# This output writes each event in json format to -# the specified host:port over tcp. + +# Write events over a TCP socket. # # Each event json is separated by a newline. +# +# Can either accept connections from clients or connect to a server, +# depending on `mode`. class LogStash::Outputs::Tcp < LogStash::Outputs::Base config_name "tcp" - # The host to connect to + # When mode is `server`, the address to listen on. + # When mode is `client`, the address to connect to. config :host, :validate => :string, :required => true - # The port to connect to + # When mode is `server`, the port to listen on. + # When mode is `client`, the port to connect to. config :port, :validate => :number, :required => true - public - def initialize(params) - super - end # def initialize + # Mode to operate in. `server` listens for client connections, + # `client` connects to a server. + config :mode, :validate => ["server", "client"], :default => "client" + + class Client + public + def initialize(socket, logger) + @socket = socket + @logger = logger + @queue = Queue.new + end + + public + def run + loop do + begin + @socket.write(@queue.pop) + rescue => e + @logger.warn(["tcp output exception", @socket, $!]) + @logger.debug(["backtrace", e.backtrace]) + break + end + end + end # def run + + public + def write(msg) + @queue.push(msg) + end # def write + end # class Client public def register - @socket = nil + if server? + @logger.info("Starting tcp output listener on #{@host}:#{@port}") + @server_socket = TCPServer.new(@host, @port) + @client_threads = [] + + @accept_thread = Thread.new(@server_socket) do |server_socket| + loop do + client_thread = Thread.start(server_socket.accept) do |client_socket| + client = Client.new(client_socket, @logger) + Thread.current[:client] = client + client.run + end + @client_threads << client_thread + end + end + else + @client_socket = nil + end end # def register private def connect - @socket = TCPSocket.new(@host, @port) - end + @client_socket = TCPSocket.new(@host, @port) + end # def connect + + private + def server? + @mode == "server" + end # def server? public def receive(event) - begin - connect unless @socket - @socket.write(event.to_hash.to_json) - @socket.write("\n") - rescue => e - @logger.warn(["tcp output exception", @host, @port, $!]) - @logger.debug(["backtrace", e.backtrace]) - @socket = nil + wire_event = event.to_hash.to_json + "\n" + + if server? + @client_threads.each do |client_thread| + client_thread[:client].write(wire_event) + end + + @client_threads.reject! {|t| !t.alive? } + else + begin + connect unless @client_socket + @client_socket.write(event.to_hash.to_json) + @client_socket.write("\n") + rescue => e + @logger.warn(["tcp output exception", @host, @port, $!]) + @logger.debug(["backtrace", e.backtrace]) + @client_socket = nil + end end end # def receive end # class LogStash::Outputs::Tcp