mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
- Add tcp input; tcp://listenip:port/
Specify 0.0.0.0 for listenip if you want to listen on all interfaces.
This commit is contained in:
parent
ddb94e5012
commit
42b7333b04
1 changed files with 51 additions and 0 deletions
51
lib/logstash/inputs/tcp.rb
Normal file
51
lib/logstash/inputs/tcp.rb
Normal file
|
@ -0,0 +1,51 @@
|
|||
require "logstash/inputs/base"
|
||||
require "eventmachine-tail"
|
||||
require "socket" # for Socket.gethostname
|
||||
|
||||
class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
||||
def initialize(url, type, config={}, &block)
|
||||
super
|
||||
end
|
||||
|
||||
def register
|
||||
if !@url.host or !@url.port
|
||||
@logger.fatal("No host or port given in #{self.class}: #{@url}")
|
||||
# TODO(sissel): Make this an actual exception class
|
||||
raise "configuration error"
|
||||
end
|
||||
|
||||
@logger.info("Starting tcp listener for #{@url}")
|
||||
EventMachine::start_server(@url.host, @url.port, TCPInput, @url, self, @logger)
|
||||
end
|
||||
|
||||
def receive(host, port, event)
|
||||
url = @url.clone
|
||||
url.host = host
|
||||
url.port = port
|
||||
@logger.debug(["original url", { :originalurl => @url, :newurl => url }])
|
||||
event = LogStash::Event.new({
|
||||
"@source" => url.to_s,
|
||||
"@message" => event,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
@logger.debug(["Got event", event])
|
||||
@callback.call(event)
|
||||
end # def receive
|
||||
|
||||
class TCPInput < EventMachine::Connection
|
||||
def initialize(url, receiver, logger)
|
||||
@logger = logger
|
||||
@receiver = receiver
|
||||
@url = url;
|
||||
@buffer = BufferedTokenizer.new # From eventmachine
|
||||
end
|
||||
|
||||
def receive_data(data)
|
||||
@buffer.extract(data).each do |line|
|
||||
port, host = Socket.unpack_sockaddr_in(self.get_peername)
|
||||
@receiver.receive(host, port, line)
|
||||
end
|
||||
end # def receive_data
|
||||
end # class TCPInput
|
||||
end # class LogStash::Inputs::Tcp
|
Loading…
Add table
Add a link
Reference in a new issue