diff --git a/etc/agent.conf b/etc/agent.conf index b6971f18a..acd31207a 100644 --- a/etc/agent.conf +++ b/etc/agent.conf @@ -3,6 +3,11 @@ input { path => [ "/var/log/messages", "/var/log/*.log" ] type => "linux-syslog" } + + tcp { + port => 1234 + type => "linux-syslog" + } } filter { diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 7802055de..7530d82eb 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -1,60 +1,64 @@ -require "eventmachine-tail" require "logstash/inputs/base" require "logstash/namespace" -require "socket" # for Socket.gethostname +require "socket" +require "timeout" class LogStash::Inputs::Tcp < LogStash::Inputs::Base config_name "tcp" + config :host => :string + config :port => :number + config :data_timeout => :number + public def initialize(params) super - raise "issue/17: needs refactor to support configfile" - end # def initialize + + @host ||= "0.0.0.0" + @data_timeout ||= 5 + end public def register - if !@url.host or !@url.port - @logger.fatal("No host or port given in #{self.class}: #{@url}") - # TODO(sissel): Make this an actual exception class - raise "configuration error" - end - - @logger.info("Starting tcp listener for #{@url}") - EventMachine::start_server(@url.host, @url.port, TCPInput, @url, self, @logger) + @logger.info("Starting tcp listener on #{@host}:#{@port}") + @server = TCPServer.new(@host, @port) end # def register public - def receive(host, port, event) - url = @url.clone - url.host = host - url.port = port - @logger.debug(["original url", { :originalurl => @url, :newurl => url }]) - event = LogStash::Event.new({ - "@message" => event, - "@type" => @type, - "@tags" => @tags.clone, - }) - event.source = url - @logger.debug(["Got event", event]) - @callback.call(event) + def run(output_queue) + loop do + Thread.start(@server.accept) do |s| + peer = "#{s.peeraddr[3]}:#{s.peeraddr[1]}" + @logger.debug("Accepted connection from #{peer} on #{@host}:#{@port}") + begin + loop do + buf = nil + # NOTE(petef): the timeout only hits after the line is read + # or socket dies + Timeout::timeout(@data_timeout) do + buf = s.readline + end + e = LogStash::Event.new({ + "@message" => buf, + "@type" => @type, + "@tags" => [@type], + }) + e.source = "tcp://#{@host}:#{@port}/client/#{peer}" + output_queue << e + end # loop do + rescue + @logger.debug("Closing connection with #{peer}") + rescue Timeout::Error + @logger.debug("Closing connection with #{peer} after read timeout") + end # begin + + begin + s.close + rescue IOError + pass + end # begin + end # Thread.start + end # loop (outer) end # def receive - - private - class TCPInput < EventMachine::Connection - def initialize(url, receiver, logger) - @logger = logger - @receiver = receiver - @url = url; - @buffer = BufferedTokenizer.new # From eventmachine - end # def initialize - - def receive_data(data) - @buffer.extract(data).each do |line| - port, host = Socket.unpack_sockaddr_in(self.get_peername) - @receiver.receive(host, port, line) - end - end # def receive_data - end # class TCPInput end # class LogStash::Inputs::Tcp