mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- The ruby URI class sucks. URI#to_s doesn't yield a valid URL.
Should fix: https://logstash.jira.com/browse/LOGSTASH-115
This commit is contained in:
parent
358636d686
commit
fbc4074644
1 changed files with 42 additions and 22 deletions
|
@ -22,6 +22,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
public
|
||||
def initialize(params)
|
||||
super
|
||||
BasicSocket.do_not_reverse_lookup = true
|
||||
|
||||
# force "plain" format. others don't make sense here.
|
||||
@format = ["plain"]
|
||||
|
@ -33,8 +34,10 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
# Optional fields (priority, host) are because some syslog implementations
|
||||
# don't send these under some circumstances.
|
||||
@@syslog_re ||= \
|
||||
/(?:<([0-9]{1,3})>)?([A-z]{3} ?[0-9]{1,2} [0-9]{2}:[0-9]{2}:[0-9]{2}) (?:(\S+[^:]) )?(.*)/
|
||||
#<priority> timestamp Mmm dd hh:mm:ss host msg
|
||||
/(?:<([0-9]{1,3})>)?([A-z]{3} ?[0-9]{1,2} [0-9]{2}:[0-9]{2}:[0-9]{2}) (?:(\S*[^ :]) )?(.*)/
|
||||
# <priority> timestamp Mmm dd hh:mm:ss host msg
|
||||
|
||||
@tcp_clients = []
|
||||
end # def register
|
||||
|
||||
public
|
||||
|
@ -69,42 +72,54 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
private
|
||||
def udp_listener(output_queue)
|
||||
@logger.info("Starting syslog udp listener on #{@host}:#{@port}")
|
||||
server = UDPSocket.new(Socket::AF_INET)
|
||||
server.bind(@host, @port)
|
||||
|
||||
if @udp
|
||||
@udp.close_read
|
||||
@udp.close_write
|
||||
end
|
||||
|
||||
@udp = UDPSocket.new(Socket::AF_INET)
|
||||
@udp.bind(@host, @port)
|
||||
|
||||
loop do
|
||||
line, client = server.recvfrom(9000)
|
||||
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil,
|
||||
nil, nil, nil)
|
||||
e = to_event(line.chomp, source.to_s)
|
||||
line, client = @udp.recvfrom(9000)
|
||||
# Ruby uri sucks, so don't use it.
|
||||
source = "syslog://#{client[3]}/"
|
||||
e = to_event(line.chomp, source)
|
||||
if e
|
||||
syslog_relay(e, source)
|
||||
output_queue << e
|
||||
end
|
||||
end
|
||||
ensure
|
||||
if server
|
||||
server.close_read
|
||||
server.close_write
|
||||
if @udp
|
||||
@udp.close_read rescue nil
|
||||
@udp.close_write rescue nil
|
||||
end
|
||||
end # def udp_listener
|
||||
|
||||
private
|
||||
def tcp_listener(output_queue)
|
||||
@logger.info("Starting syslog tcp listener on #{@host}:#{@port}")
|
||||
server = TCPServer.new(@host, @port)
|
||||
@tcp = TCPServer.new(@host, @port)
|
||||
@tcp_clients = []
|
||||
|
||||
loop do
|
||||
Thread.new(server.accept) do |client|
|
||||
client = @tcp.accept
|
||||
@tcp_clients << client
|
||||
Thread.new(client) do |client|
|
||||
ip, port = client.peeraddr[3], client.peeraddr[1]
|
||||
@logger.warn("got connection from #{ip}:#{port}")
|
||||
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
|
||||
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil,
|
||||
nil, nil, nil)
|
||||
if ip.include?(":") # ipv6
|
||||
source = "syslog://[#{ip}]/"
|
||||
else
|
||||
source = "syslog://#{ip}/"
|
||||
end
|
||||
|
||||
client.each do |line|
|
||||
e = to_event(line.chomp, source_base.to_s)
|
||||
e = to_event(line.chomp, source)
|
||||
if e
|
||||
source = source_base.dup
|
||||
syslog_relay(e, source)
|
||||
output_queue << e
|
||||
end # e
|
||||
|
@ -112,7 +127,11 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
end # Thread.new
|
||||
end # loop do
|
||||
ensure
|
||||
server.close if server
|
||||
# If we somehow have this left open, close it.
|
||||
@tcp_clients.each do |client|
|
||||
client.close rescue nil
|
||||
end
|
||||
@tcp.close if @tcp rescue nil
|
||||
end # def tcp_listener
|
||||
|
||||
# Following RFC3164 where sane, we'll try to parse a received message
|
||||
|
@ -133,6 +152,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
event.fields["priority"] = priority
|
||||
event.fields["severity"] = severity
|
||||
event.fields["facility"] = facility
|
||||
host = match[3]
|
||||
|
||||
# TODO(sissel): Use the date filter, somehow.
|
||||
event.timestamp = LogStash::Time.to_iso8601(
|
||||
|
@ -140,14 +160,14 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
|
||||
# Hostname is optional, use if present in message, otherwise use source
|
||||
# address of message.
|
||||
url.host = match[3] if match[3]
|
||||
url.port = nil
|
||||
event.source = url
|
||||
if host
|
||||
event.source = "syslog://#{host}/"
|
||||
end
|
||||
|
||||
event.message = match[4]
|
||||
else
|
||||
@logger.info(["NOT SYSLOG", event.message])
|
||||
url.host = Socket.gethostname if url.host == "127.0.0.1"
|
||||
url = "syslog://#{Socket.gethostname}/" if url == "syslog://127.0.0.1/"
|
||||
|
||||
# RFC3164 says unknown messages get pri=13
|
||||
priority = 13
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue