mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Merge pull request #718 from Syntigo-nv/master
Fix gelf input for handling of chunked messages Conflicts: Makefile
This commit is contained in:
commit
6c20979271
3 changed files with 57 additions and 2 deletions
2
Makefile
2
Makefile
|
@ -40,7 +40,7 @@ default:
|
|||
@echo " jar -- builds the monolithic jar"
|
||||
@echo " jar-test -- runs the test suite against the monolithic jar"
|
||||
|
||||
TESTS=$(wildcard spec/support/*.rb spec/filters/*.rb spec/examples/*.rb spec/codecs/*.rb spec/conditionals/*.rb spec/event.rb spec/jar.rb)
|
||||
TESTS=$(wildcard spec/inputs/gelf.rb spec/support/*.rb spec/filters/*.rb spec/examples/*.rb spec/codecs/*.rb spec/conditionals/*.rb spec/event.rb spec/jar.rb)
|
||||
|
||||
# The 'version' is generated based on the logstash version, git revision, etc.
|
||||
.VERSION.mk:
|
||||
|
|
|
@ -80,7 +80,10 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
|
|||
@logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace)
|
||||
next
|
||||
end
|
||||
|
||||
|
||||
# Gelfd parser outputs null if it received and cached a non-final chunk
|
||||
next if data.nil?
|
||||
|
||||
event = LogStash::Event.new(JSON.parse(data))
|
||||
event["host"] = client[3]
|
||||
if event["timestamp"].is_a?(Numeric)
|
||||
|
|
52
spec/inputs/gelf.rb
Normal file
52
spec/inputs/gelf.rb
Normal file
|
@ -0,0 +1,52 @@
|
|||
|
||||
require "test_utils"
|
||||
require "gelf"
|
||||
describe "inputs/gelf" do
|
||||
extend LogStash::RSpec
|
||||
|
||||
describe "reads chunked gelf messages " do
|
||||
port = 12209
|
||||
host = "127.0.0.9"
|
||||
chunksize = 1420
|
||||
gelfclient = GELF::Notifier.new(host,port,chunksize)
|
||||
|
||||
config <<-CONFIG
|
||||
input {
|
||||
gelf {
|
||||
port => "#{port}"
|
||||
host => "#{host}"
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
# generate random characters (message is zipped!) from printable ascii ( SPACE till ~ )
|
||||
# to trigger gelf chunking
|
||||
s = StringIO.new
|
||||
for i in 1..2000
|
||||
s << 32 + rand(126-32)
|
||||
end
|
||||
large_random = s.string
|
||||
|
||||
[ "hello",
|
||||
"world",
|
||||
large_random,
|
||||
"we survived gelf!"
|
||||
].each do |m|
|
||||
gelfclient.notify!( "short_message" => m )
|
||||
# poll at most 10 times
|
||||
waits = 0
|
||||
while waits < 10 and queue.size == 0
|
||||
sleep 0.1
|
||||
waits += 1
|
||||
end
|
||||
insist { queue.size } > 0
|
||||
insist { queue.pop["message"] } == m
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue