mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Add NaN
support back in to collectd codec
* Add spec tests for NaN support * Catch warning on NaN with nan_handling set to warn * Make lambdas ivars and initialize so they can reference other ivars * Fixed reference to point to ivar * Streamline ivar declarations in one method * Improve tests to cover more cases * Make lambdas not be ivars * Change idiomatic calling of init_lambdas to reflect lack of return value closes 1363
This commit is contained in:
parent
81936e9f0d
commit
e049c58d0a
2 changed files with 227 additions and 93 deletions
|
@ -40,6 +40,7 @@ require "time"
|
|||
class ProtocolError < LogStash::Error; end
|
||||
class HeaderError < LogStash::Error; end
|
||||
class EncryptionError < LogStash::Error; end
|
||||
class NaNError < LogStash::Error; end
|
||||
|
||||
class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
||||
config_name "collectd"
|
||||
|
@ -79,6 +80,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
'@timestamp' => true,
|
||||
'plugin' => true,
|
||||
'plugin_instance' => true,
|
||||
'type_instance' => true,
|
||||
}
|
||||
|
||||
INTERVAL_VALUES_FIELDS = {
|
||||
|
@ -116,6 +118,20 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
# collectd [Network plugin](https://collectd.org/wiki/index.php/Plugin:Network)
|
||||
config :security_level, :validate => [SECURITY_NONE, SECURITY_SIGN, SECURITY_ENCR],
|
||||
:default => "None"
|
||||
|
||||
# What to do when a value in the event is NaN (Not a Number)
|
||||
# - change_value (default): Change the NaN to the value of the nan_value option and add nan_tag as a tag
|
||||
# - warn: Change the NaN to the value of the nan_value option, print a warning to the log and add nan_tag as a tag
|
||||
# - drop: Drop the event containing the NaN (this only drops the single event, not the whole packet)
|
||||
config :nan_handling, :validate => ['change_value','warn','drop'], :default => 'change_value'
|
||||
|
||||
# Only relevant when nan_handeling is set to 'change_value'
|
||||
# Change NaN to this configured value
|
||||
config :nan_value, :validate => :number, :default => 0
|
||||
|
||||
# The tag to add to the event if a NaN value was found
|
||||
# Set this to an empty string ('') if you don't want to tag
|
||||
config :nan_tag, :validate => :string, :default => '_collectdNaN'
|
||||
|
||||
# Path to the authentication file. This file should have the same format as
|
||||
# the [AuthFile](http://collectd.org/documentation/manpages/collectd.conf.5.shtml#authfile_filename)
|
||||
|
@ -126,6 +142,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
public
|
||||
def register
|
||||
@logger.info("Starting Collectd codec...")
|
||||
init_lambdas!
|
||||
if @typesdb.nil?
|
||||
@typesdb = LogStash::Environment.vendor_path("collectd/types.db")
|
||||
if !File.exists?(@typesdb)
|
||||
|
@ -168,103 +185,121 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
return types
|
||||
end # def get_types
|
||||
|
||||
# Lambdas for hash + closure methodology
|
||||
# This replaces when statements for fixed values and is much faster
|
||||
string_decoder = lambda { |body| body.pack("C*")[0..-2] }
|
||||
numeric_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
|
||||
counter_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("Q>")[0] }
|
||||
gauge_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
|
||||
derive_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("q>")[0] }
|
||||
# For Low-Resolution time
|
||||
time_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
Time.at(( ((byte1 << 32) + byte2))).utc
|
||||
end
|
||||
# Hi-Resolution time
|
||||
hirestime_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
|
||||
end
|
||||
# Hi resolution intervals
|
||||
hiresinterval_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
|
||||
end
|
||||
# Values decoder
|
||||
values_decoder = lambda do |body|
|
||||
body.slice!(0..1) # Prune the header
|
||||
if body.length % 9 == 0 # Should be 9 fields
|
||||
count = 0
|
||||
retval = []
|
||||
# Iterate through and take a slice each time
|
||||
types = body.slice!(0..((body.length/9)-1))
|
||||
while body.length > 0
|
||||
# Use another hash + closure here...
|
||||
retval << VALUES_DECODER[types[count]].call(body)
|
||||
count += 1
|
||||
def init_lambdas!
|
||||
# Lambdas for hash + closure methodology
|
||||
# This replaces when statements for fixed values and is much faster
|
||||
string_decoder = lambda { |body| body.pack("C*")[0..-2] }
|
||||
numeric_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
|
||||
counter_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("Q>")[0] }
|
||||
gauge_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
|
||||
derive_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("q>")[0] }
|
||||
# For Low-Resolution time
|
||||
time_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
Time.at(( ((byte1 << 32) + byte2))).utc
|
||||
end
|
||||
# Hi-Resolution time
|
||||
hirestime_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
|
||||
end
|
||||
# Hi resolution intervals
|
||||
hiresinterval_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
|
||||
end
|
||||
# Value type decoder
|
||||
value_type_decoder = lambda do |body|
|
||||
body.slice!(0..1) # Prune the header
|
||||
if body.length % 9 == 0 # Should be 9 fields
|
||||
count = 0
|
||||
retval = []
|
||||
# Iterate through and take a slice each time
|
||||
types = body.slice!(0..((body.length/9)-1))
|
||||
while body.length > 0
|
||||
# Use another hash + closure here...
|
||||
v = @values_decoder[types[count]].call(body)
|
||||
if types[count] == 1 && v.nan?
|
||||
case @nan_handling
|
||||
when 'drop'; drop = true
|
||||
else
|
||||
v = @nan_value
|
||||
add_nan_tag = true
|
||||
@nan_handling == 'warn' && @logger.warn("NaN replaced by #{@nan_value}")
|
||||
end
|
||||
end
|
||||
retval << v
|
||||
count += 1
|
||||
end
|
||||
else
|
||||
@logger.error("Incorrect number of data fields for collectd record", :body => body.to_s)
|
||||
end
|
||||
else
|
||||
@logger.error("Incorrect number of data fields for collectd record", :body => body.to_s)
|
||||
return retval, drop, add_nan_tag
|
||||
end
|
||||
return retval
|
||||
end
|
||||
# Signature
|
||||
signature_decoder = lambda do |body|
|
||||
if body.length < 32
|
||||
@logger.warning("SHA256 signature too small (got #{body.length} bytes instead of 32)")
|
||||
elsif body.length < 33
|
||||
@logger.warning("Received signature without username")
|
||||
else
|
||||
# Signature
|
||||
signature_decoder = lambda do |body|
|
||||
if body.length < 32
|
||||
@logger.warning("SHA256 signature too small (got #{body.length} bytes instead of 32)")
|
||||
elsif body.length < 33
|
||||
@logger.warning("Received signature without username")
|
||||
else
|
||||
retval = []
|
||||
# Byte 32 till the end contains the username as chars (=unsigned ints)
|
||||
retval << body[32..-1].pack('C*')
|
||||
# Byte 0 till 31 contain the signature
|
||||
retval << body[0..31].pack('C*')
|
||||
end
|
||||
return retval
|
||||
end
|
||||
# Encryption
|
||||
encryption_decoder = lambda do |body|
|
||||
retval = []
|
||||
# Byte 32 till the end contains the username as chars (=unsigned ints)
|
||||
retval << body[32..-1].pack('C*')
|
||||
# Byte 0 till 31 contain the signature
|
||||
retval << body[0..31].pack('C*')
|
||||
user_length = (body.slice!(0) << 8) + body.slice!(0)
|
||||
retval << body.slice!(0..user_length-1).pack('C*') # Username
|
||||
retval << body.slice!(0..15).pack('C*') # IV
|
||||
retval << body.pack('C*')
|
||||
return retval
|
||||
end
|
||||
return retval
|
||||
end
|
||||
# Encryption
|
||||
encryption_decoder = lambda do |body|
|
||||
retval = []
|
||||
user_length = (body.slice!(0) << 8) + body.slice!(0)
|
||||
retval << body.slice!(0..user_length-1).pack('C*') # Username
|
||||
retval << body.slice!(0..15).pack('C*') # IV
|
||||
retval << body.pack('C*')
|
||||
return retval
|
||||
end
|
||||
# Lambda Hashes
|
||||
ID_DECODER = {
|
||||
0 => string_decoder,
|
||||
1 => time_decoder,
|
||||
2 => string_decoder,
|
||||
3 => string_decoder,
|
||||
4 => string_decoder,
|
||||
5 => string_decoder,
|
||||
6 => values_decoder,
|
||||
7 => numeric_decoder,
|
||||
8 => hirestime_decoder,
|
||||
9 => hiresinterval_decoder,
|
||||
256 => string_decoder,
|
||||
257 => numeric_decoder,
|
||||
512 => signature_decoder,
|
||||
528 => encryption_decoder
|
||||
}
|
||||
# TYPE VALUES:
|
||||
# 0: COUNTER
|
||||
# 1: GAUGE
|
||||
# 2: DERIVE
|
||||
# 3: ABSOLUTE
|
||||
VALUES_DECODER = {
|
||||
0 => counter_decoder,
|
||||
1 => gauge_decoder,
|
||||
2 => derive_decoder,
|
||||
3 => counter_decoder
|
||||
}
|
||||
@id_decoder = {
|
||||
0 => string_decoder,
|
||||
1 => time_decoder,
|
||||
2 => string_decoder,
|
||||
3 => string_decoder,
|
||||
4 => string_decoder,
|
||||
5 => string_decoder,
|
||||
6 => value_type_decoder,
|
||||
7 => numeric_decoder,
|
||||
8 => hirestime_decoder,
|
||||
9 => hiresinterval_decoder,
|
||||
256 => string_decoder,
|
||||
257 => numeric_decoder,
|
||||
512 => signature_decoder,
|
||||
528 => encryption_decoder
|
||||
}
|
||||
# TYPE VALUES:
|
||||
# 0: COUNTER
|
||||
# 1: GAUGE
|
||||
# 2: DERIVE
|
||||
# 3: ABSOLUTE
|
||||
@values_decoder = {
|
||||
0 => counter_decoder,
|
||||
1 => gauge_decoder,
|
||||
2 => derive_decoder,
|
||||
3 => counter_decoder
|
||||
}
|
||||
end # def init_lambdas!
|
||||
|
||||
public
|
||||
def get_values(id, body)
|
||||
drop = false
|
||||
add_tag = false
|
||||
if id == 6
|
||||
retval, drop, add_nan_tag = @id_decoder[id].call(body)
|
||||
# Use hash + closure/lambda to speed operations
|
||||
ID_DECODER[id].call(body)
|
||||
else
|
||||
retval = @id_decoder[id].call(body)
|
||||
end
|
||||
return retval, drop, add_nan_tag
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -361,7 +396,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
length = ((payload.slice!(0) << 8) + payload.slice!(0)) - 4
|
||||
# Validate that the part length is correct
|
||||
raise(HeaderError) if length > payload.length
|
||||
|
||||
|
||||
body = payload.slice!(0..length-1)
|
||||
|
||||
field = TYPEMAP[typenum]
|
||||
|
@ -370,7 +405,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
next
|
||||
end
|
||||
|
||||
values = get_values(typenum, body)
|
||||
values, drop, add_nan_tag = get_values(typenum, body)
|
||||
|
||||
case typenum
|
||||
when SIGNATURE_TYPE
|
||||
|
@ -425,9 +460,17 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
# This is better than looping over all keys every time.
|
||||
collectd.delete('type_instance') if collectd['type_instance'] == ""
|
||||
collectd.delete('plugin_instance') if collectd['plugin_instance'] == ""
|
||||
if add_nan_tag
|
||||
collectd['tags'] ||= []
|
||||
collectd['tags'] << @nan_tag
|
||||
end
|
||||
# This ugly little shallow-copy hack keeps the new event from getting munged by the cleanup
|
||||
# With pass-by-reference we get hosed (if we pass collectd, then clean it up rapidly, values can disappear)
|
||||
yield LogStash::Event.new(collectd.dup)
|
||||
if !drop # Drop the event if it's flagged true
|
||||
yield LogStash::Event.new(collectd.dup)
|
||||
else
|
||||
raise(NaNError)
|
||||
end
|
||||
end
|
||||
# Clean up the event
|
||||
collectd.each_key do |k|
|
||||
|
@ -435,7 +478,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
end
|
||||
end
|
||||
end # while payload.length > 0 do
|
||||
rescue EncryptionError, ProtocolError, HeaderError
|
||||
rescue EncryptionError, ProtocolError, HeaderError, NaNError
|
||||
# basically do nothing, we just want out
|
||||
end # def decode
|
||||
|
||||
|
|
|
@ -56,8 +56,99 @@ describe LogStash::Codecs::Collectd do
|
|||
# One of these will fail because I altered the payload from the normal packet
|
||||
insist { counter } == 27
|
||||
end # it "should drop a part with an header length"
|
||||
|
||||
# This payload contains a NaN value
|
||||
it "should replace a NaN with a zero and add tag '_collectdNaN' by default" do
|
||||
payload = ["00000015746573742e6578616d706c652e636f6d000008000c14dc4c81831ef78b0009000c00000000400000000002000970696e67000004000970696e67000005001c70696e672d7461726765742e6578616d706c652e636f6d000006000f000101000000000000f87f"].pack('H*')
|
||||
counter = 0
|
||||
subject.decode(payload) do |event|
|
||||
case counter
|
||||
when 0
|
||||
insist { event['host'] } == "test.example.com"
|
||||
insist { event['plugin'] } == "ping"
|
||||
insist { event['type_instance'] } == "ping-target.example.com"
|
||||
insist { event['collectd_type'] } == "ping"
|
||||
insist { event['value'] } == 0 # Not a NaN
|
||||
insist { event['tags'] } == ["_collectdNaN"]
|
||||
end
|
||||
counter += 1
|
||||
end
|
||||
insist { counter } == 1
|
||||
end # it "should replace a NaN with a zero and add tag '_collectdNaN' by default"
|
||||
end # context "None"
|
||||
|
||||
context "Replace nan_value and nan_tag with non-default values" do
|
||||
subject do
|
||||
next LogStash::Codecs::Collectd.new({"nan_value" => 1,
|
||||
"nan_tag" => "NaN_encountered"})
|
||||
end
|
||||
# This payload contains a NaN value
|
||||
it "should replace a NaN with the specified value and tag 'NaN_encountered'" do
|
||||
payload = ["00000015746573742e6578616d706c652e636f6d000008000c14dc4c81831ef78b0009000c00000000400000000002000970696e67000004000970696e67000005001c70696e672d7461726765742e6578616d706c652e636f6d000006000f000101000000000000f87f"].pack('H*')
|
||||
counter = 0
|
||||
subject.decode(payload) do |event|
|
||||
case counter
|
||||
when 0
|
||||
insist { event['host'] } == "test.example.com"
|
||||
insist { event['plugin'] } == "ping"
|
||||
insist { event['type_instance'] } == "ping-target.example.com"
|
||||
insist { event['collectd_type'] } == "ping"
|
||||
insist { event['value'] } == 1 # Not a NaN
|
||||
insist { event['tags'] } == ["NaN_encountered"]
|
||||
end
|
||||
counter += 1
|
||||
end
|
||||
insist { counter } == 1
|
||||
end # it "should replace a NaN with the specified value and tag 'NaN_encountered'"
|
||||
end # context "Replace nan_value and nan_tag with non-default values"
|
||||
|
||||
context "Warn on NaN event" do
|
||||
subject do
|
||||
next LogStash::Codecs::Collectd.new({"nan_handling" => "warn"})
|
||||
end
|
||||
# This payload contains a NaN value
|
||||
it "should replace a NaN with a zero and receive a warning when 'nan_handling' set to warn" do
|
||||
payload = ["00000015746573742e6578616d706c652e636f6d000008000c14dc4c81831ef78b0009000c00000000400000000002000970696e67000004000970696e67000005001c70696e672d7461726765742e6578616d706c652e636f6d000006000f000101000000000000f87f"].pack('H*')
|
||||
counter = 0
|
||||
subject.logger.should_receive(:warn).with("NaN replaced by 0")
|
||||
subject.decode(payload) do |event|
|
||||
case counter
|
||||
when 0
|
||||
insist { event['host'] } == "test.example.com"
|
||||
insist { event['plugin'] } == "ping"
|
||||
insist { event['type_instance'] } == "ping-target.example.com"
|
||||
insist { event['collectd_type'] } == "ping"
|
||||
insist { event['value'] } == 0 # Not a NaN
|
||||
end
|
||||
counter += 1
|
||||
end
|
||||
insist { counter } == 1
|
||||
end # it "should replace a NaN with a zero and receive a warning when 'nan_handling' set to warn"
|
||||
end # context "Warn on NaN event"
|
||||
|
||||
context "Drop NaN event" do
|
||||
subject do
|
||||
next LogStash::Codecs::Collectd.new({"nan_handling" => "drop"})
|
||||
end
|
||||
# This payload contains a NaN value
|
||||
it "should drop an event with a NaN value when 'nan_handling' set to drop" do
|
||||
payload = ["00000015746573742e6578616d706c652e636f6d000008000c14dc4c81831ef78b0009000c00000000400000000002000970696e67000004000970696e67000005001c70696e672d7461726765742e6578616d706c652e636f6d000006000f000101000000000000f87f"].pack('H*')
|
||||
counter = 0
|
||||
subject.decode(payload) do |event|
|
||||
case counter
|
||||
when 0
|
||||
insist { event['host'] } == "test.example.com"
|
||||
insist { event['plugin'] } == "ping"
|
||||
insist { event['type_instance'] } == "ping-target.example.com"
|
||||
insist { event['collectd_type'] } == "ping"
|
||||
insist { event['value'] } == NaN # NaN
|
||||
end
|
||||
counter += 1 # Because we're dropping this, it should not increment
|
||||
end
|
||||
insist { counter } == 0 # We expect no increment
|
||||
end # it "should drop an event with a NaN value when 'nan_handling' set to drop"
|
||||
end # context "Drop NaN event"
|
||||
|
||||
# Create an authfile for the next tests
|
||||
authfile = Tempfile.new('logstash-collectd-authfile')
|
||||
File.open(authfile.path, "a") do |fd|
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue