mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
update syslog input
This commit is contained in:
parent
5f4f5bfef3
commit
3cc1c1c815
1 changed files with 71 additions and 64 deletions
|
@ -1,9 +1,8 @@
|
|||
require "date"
|
||||
require "eventmachine-tail"
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "logstash/time" # should really use the filters/date.rb bits
|
||||
require "socket" # for Socket.gethostname
|
||||
require "socket"
|
||||
|
||||
class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
||||
|
||||
|
@ -18,23 +17,11 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
super
|
||||
|
||||
@host ||= "0.0.0.0"
|
||||
@port ||= 514
|
||||
end
|
||||
|
||||
public
|
||||
def register
|
||||
if !@url.host or !@url.port
|
||||
@logger.fatal("No host or port given in #{self.class}: #{@url}")
|
||||
# TODO(sissel): Make this an actual exception class
|
||||
raise "configuration error"
|
||||
end
|
||||
|
||||
@logger.info("Starting tcp listener for #{@url}")
|
||||
EventMachine::start_server(@url.host, @url.port, TCPInput, self, @logger)
|
||||
|
||||
@logger.info("Starting udp listener for #{@url}")
|
||||
EventMachine::open_datagram_socket(@url.host, @url.port, UDPInput, self,
|
||||
@logger)
|
||||
|
||||
# This comes from RFC3164, mostly.
|
||||
@@syslog_re ||= \
|
||||
/<([0-9]{1,3})>([A-z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) (\S+) (.*)/
|
||||
|
@ -42,22 +29,75 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
end # def register
|
||||
|
||||
public
|
||||
def receive(host, port, message)
|
||||
url = @url.clone
|
||||
url.host = host
|
||||
url.port = port
|
||||
def run(output_queue)
|
||||
# udp server
|
||||
Thread.new do
|
||||
LogStash::Util::set_thread_name("input|syslog|udp")
|
||||
begin
|
||||
udp_listener(output_queue)
|
||||
rescue
|
||||
@logger.warn("syslog udp listener died: #{$!}")
|
||||
sleep(5)
|
||||
retry
|
||||
end # begin
|
||||
end # Thread.new
|
||||
|
||||
# Do some syslog relay-like behavior.
|
||||
# * Add syslog headers if there are none
|
||||
event = LogStash::Event.new({
|
||||
"@message" => message,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
syslog_relay(event, url)
|
||||
@logger.debug(["Got event", event.class, event.to_hash])
|
||||
@callback.call(event)
|
||||
end # def receive
|
||||
# tcp server
|
||||
Thread.new do
|
||||
LogStash::Util::set_thread_name("input|syslog|tcp")
|
||||
begin
|
||||
tcp_listener(output_queue)
|
||||
rescue
|
||||
@logger.warn("syslog tcp listener died: #{$!}")
|
||||
sleep(5)
|
||||
retry
|
||||
end # begin
|
||||
end # Thread.new
|
||||
end # def run
|
||||
|
||||
private
|
||||
def udp_listener(output_queue)
|
||||
@logger.info("Starting syslog udp listener on #{@host}:#{@port}")
|
||||
s = UDPSocket.new
|
||||
s.bind(@host, @port)
|
||||
|
||||
loop do
|
||||
line, client = s.recvfrom(1024)
|
||||
event = LogStash::Event.new({
|
||||
"@message" => line.chomp,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
source = URI.parse("syslog://#{client[3]}")
|
||||
syslog_relay(event, source)
|
||||
output_queue << event
|
||||
end
|
||||
end # def udp_listener
|
||||
|
||||
private
|
||||
def tcp_listener(output_queue)
|
||||
@logger.info("Starting syslog tcp listener on #{@host}:#{@port}")
|
||||
s = TCPServer.new(@host, @port)
|
||||
|
||||
loop do
|
||||
Thread.new(s.accept) do |s|
|
||||
ip, port = s.peeraddr[3], s.peeraddr[1]
|
||||
@logger.warn("got connection from #{ip}:#{port}")
|
||||
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
|
||||
source_base = URI.parse("syslog://#{ip}")
|
||||
s.each do |line|
|
||||
event = LogStash::Event.new({
|
||||
"@message" => line.chomp,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
source = source_base.dup
|
||||
syslog_relay(event, source)
|
||||
output_queue << event
|
||||
end
|
||||
end
|
||||
end
|
||||
end # def tcp_listener
|
||||
|
||||
# Following RFC3164 where sane, we'll try to parse a received message
|
||||
# as if you were relaying a syslog message to it.
|
||||
|
@ -77,7 +117,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
event.fields["priority"] = priority
|
||||
event.fields["severity"] = severity
|
||||
event.fields["facility"] = facility
|
||||
|
||||
|
||||
# TODO(sissel): Use the date filter, somehow.
|
||||
event.timestamp = LogStash::Time.to_iso8601(
|
||||
DateTime.strptime(match[2], "%b %d %H:%M:%S"))
|
||||
|
@ -106,37 +146,4 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
event.source = url
|
||||
end
|
||||
end # def syslog_relay
|
||||
|
||||
private
|
||||
class TCPInput < EventMachine::Connection
|
||||
def initialize(receiver, logger)
|
||||
@logger = logger
|
||||
@receiver = receiver
|
||||
@buffer = BufferedTokenizer.new # From eventmachine
|
||||
end # def initialize
|
||||
|
||||
# Messages over TCP may not be received all at once, chunk by newline.
|
||||
def receive_data(data)
|
||||
@buffer.extract(data).each do |line|
|
||||
port, host = Socket.unpack_sockaddr_in(self.get_peername)
|
||||
# Trim trailing newlines
|
||||
@receiver.receive(host, port, line.chomp)
|
||||
end
|
||||
end # def receive_data
|
||||
end # class TCPInput
|
||||
|
||||
private
|
||||
class UDPInput < EventMachine::Connection
|
||||
def initialize(receiver, logger)
|
||||
@logger = logger
|
||||
@receiver = receiver
|
||||
end # def initialize
|
||||
|
||||
# Every udp packet is a unique message.
|
||||
def receive_data(data)
|
||||
port, host = Socket.unpack_sockaddr_in(self.get_peername)
|
||||
# Trim trailing newlines
|
||||
@receiver.receive(host, port, data.chomp)
|
||||
end # def receive_data
|
||||
end # class UDPInput
|
||||
end # class LogStash::Inputs::Syslog
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue