mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Fix gelf input to work again; also improve remapping
This commit is contained in:
parent
c8d834b306
commit
98e49b1bf7
1 changed files with 29 additions and 21 deletions
|
@ -30,10 +30,10 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
|
|||
#
|
||||
# Remapping converts the following gelf fields to logstash equivalents:
|
||||
#
|
||||
# * event.message becomes full_message
|
||||
# if no full_message, use event.message becomes short_message
|
||||
# if no short_message, event.message is the raw json input
|
||||
# * host + file to event.source
|
||||
# * event["message"] becomes full_message
|
||||
# if no full_message, use event["message"] becomes short_message
|
||||
# if no short_message, event["message"] is the raw json input
|
||||
# * host + file to event["source"]
|
||||
config :remap, :validate => :boolean, :default => true
|
||||
|
||||
public
|
||||
|
@ -66,31 +66,33 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
|
|||
@logger.info("Starting gelf listener", :address => "#{@host}:#{@port}")
|
||||
|
||||
if @udp
|
||||
@udp.close_read
|
||||
@udp.close_write
|
||||
@udp.close_read rescue nil
|
||||
@udp.close_write rescue nil
|
||||
end
|
||||
|
||||
@udp = UDPSocket.new(Socket::AF_INET)
|
||||
@udp.bind(@host, @port)
|
||||
|
||||
loop do
|
||||
while true
|
||||
line, client = @udp.recvfrom(8192)
|
||||
# Ruby uri sucks, so don't use it.
|
||||
source = "gelf://#{client[3]}/"
|
||||
begin
|
||||
data = Gelfd::Parser.parse(line)
|
||||
rescue => ex
|
||||
@logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace)
|
||||
next
|
||||
end
|
||||
|
||||
# The nil guard is needed to deal with chunked messages.
|
||||
# Gelfd::Parser.parse will only return the message when all chunks are
|
||||
# completed
|
||||
event = LogStash::Event.new(data)
|
||||
event = LogStash::Event.new(JSON.parse(data))
|
||||
event["source"] = client[3]
|
||||
if event["timestamp"].is_a?(Numeric)
|
||||
event["@timestamp"] = Time.at(event["timestamp"]).gmtime
|
||||
event.remove("timestamp")
|
||||
end
|
||||
remap_gelf(event) if @remap
|
||||
output_queue << event
|
||||
end
|
||||
rescue LogStash::ShutdownSignal
|
||||
# Do nothing, shutdown.
|
||||
ensure
|
||||
if @udp
|
||||
@udp.close_read rescue nil
|
||||
|
@ -101,16 +103,22 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
|
|||
private
|
||||
def remap_gelf(event)
|
||||
if event["full_message"]
|
||||
event.message = event["full_message"].dup
|
||||
event["message"] = event["full_message"].dup
|
||||
event.remove("full_message")
|
||||
if event["short_message"] == event["message"]
|
||||
event.remove("short_message")
|
||||
end
|
||||
elsif event["short_message"]
|
||||
event.message = event["short_message"].dup
|
||||
event["message"] = event["short_message"].dup
|
||||
event.remove("short_message")
|
||||
end
|
||||
if event["host"]
|
||||
event.source_host = event["host"]
|
||||
|
||||
|
||||
# Map all '_foo' fields to simply 'foo'
|
||||
event.to_hash.keys.each do |key|
|
||||
next unless key[0,1] == "_"
|
||||
event[key[1..-1]] = event[key]
|
||||
event.remove(key)
|
||||
end
|
||||
if event["file"]
|
||||
event.source_path = event["file"]
|
||||
end
|
||||
event.source = "gelf://#{event["host"]}/#{event["file"]}"
|
||||
end # def remap_gelf
|
||||
end # class LogStash::Inputs::Gelf
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue