update tcp input

This commit is contained in:
Pete Fritchman 2011-02-21 10:40:52 -08:00
parent 7706953586
commit 6d75615697
2 changed files with 51 additions and 42 deletions

View file

@ -3,6 +3,11 @@ input {
path => [ "/var/log/messages", "/var/log/*.log" ]
type => "linux-syslog"
}
tcp {
port => 1234
type => "linux-syslog"
}
}
filter {

View file

@ -1,60 +1,64 @@
require "eventmachine-tail"
require "logstash/inputs/base"
require "logstash/namespace"
require "socket" # for Socket.gethostname
require "socket"
require "timeout"
class LogStash::Inputs::Tcp < LogStash::Inputs::Base
config_name "tcp"
config :host => :string
config :port => :number
config :data_timeout => :number
public
def initialize(params)
super
raise "issue/17: needs refactor to support configfile"
end # def initialize
@host ||= "0.0.0.0"
@data_timeout ||= 5
end
public
def register
if !@url.host or !@url.port
@logger.fatal("No host or port given in #{self.class}: #{@url}")
# TODO(sissel): Make this an actual exception class
raise "configuration error"
end
@logger.info("Starting tcp listener for #{@url}")
EventMachine::start_server(@url.host, @url.port, TCPInput, @url, self, @logger)
@logger.info("Starting tcp listener on #{@host}:#{@port}")
@server = TCPServer.new(@host, @port)
end # def register
public
def receive(host, port, event)
url = @url.clone
url.host = host
url.port = port
@logger.debug(["original url", { :originalurl => @url, :newurl => url }])
event = LogStash::Event.new({
"@message" => event,
"@type" => @type,
"@tags" => @tags.clone,
})
event.source = url
@logger.debug(["Got event", event])
@callback.call(event)
def run(output_queue)
loop do
Thread.start(@server.accept) do |s|
peer = "#{s.peeraddr[3]}:#{s.peeraddr[1]}"
@logger.debug("Accepted connection from #{peer} on #{@host}:#{@port}")
begin
loop do
buf = nil
# NOTE(petef): the timeout only hits after the line is read
# or socket dies
Timeout::timeout(@data_timeout) do
buf = s.readline
end
e = LogStash::Event.new({
"@message" => buf,
"@type" => @type,
"@tags" => [@type],
})
e.source = "tcp://#{@host}:#{@port}/client/#{peer}"
output_queue << e
end # loop do
rescue
@logger.debug("Closing connection with #{peer}")
rescue Timeout::Error
@logger.debug("Closing connection with #{peer} after read timeout")
end # begin
begin
s.close
rescue IOError
pass
end # begin
end # Thread.start
end # loop (outer)
end # def receive
private
class TCPInput < EventMachine::Connection
def initialize(url, receiver, logger)
@logger = logger
@receiver = receiver
@url = url;
@buffer = BufferedTokenizer.new # From eventmachine
end # def initialize
def receive_data(data)
@buffer.extract(data).each do |line|
port, host = Socket.unpack_sockaddr_in(self.get_peername)
@receiver.receive(host, port, line)
end
end # def receive_data
end # class TCPInput
end # class LogStash::Inputs::Tcp