mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge pull request #1015 from pieterlexis/collectd_NaN
NaN value handling in the CollectD input (LOGSTASH-1763)
This commit is contained in:
commit
3d67833cff
2 changed files with 185 additions and 5 deletions
|
@ -88,6 +88,21 @@ class LogStash::Inputs::Collectd < LogStash::Inputs::Base
|
|||
# "Sign" or "Encrypt"
|
||||
config :authfile, :validate => :string
|
||||
|
||||
# What to do when a value in the event is NaN (Not a Number)
|
||||
# - change_value (default): Change the NaN to the value of the nan_value option and add nan_tag as a tag
|
||||
# - warn: Change the NaN to the value of the nan_value option, print a warning to the log and add nan_tag as a tag
|
||||
# - drop: Drop the event containing the NaN (this only drops the single event, not the whole packet)
|
||||
config :nan_handeling, :validate => ['change_value','warn','drop'],
|
||||
:default => 'change_value'
|
||||
|
||||
# Only relevant when nan_handeling is set to 'change_value'
|
||||
# Change NaN to this configured value
|
||||
config :nan_value, :validate => :number, :default => 0
|
||||
|
||||
# The tag to add to the event if a NaN value was found
|
||||
# Set this to an empty string ('') if you don't want to tag
|
||||
config :nan_tag, :validate => :string, :default => '_collectdNaN'
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
@ -202,8 +217,18 @@ class LogStash::Inputs::Collectd < LogStash::Inputs::Base
|
|||
# 2: DERIVE
|
||||
# 3: ABSOLUTE
|
||||
case types[count]
|
||||
when 1;
|
||||
v = body.slice!(0..7).pack("C*").unpack("E")[0]
|
||||
if v.nan?
|
||||
case @nan_handeling
|
||||
when 'drop'; return false
|
||||
else
|
||||
v = @nan_value
|
||||
add_tag(@nan_tag)
|
||||
@nan_handeling == 'warn' && @logger.warn("NaN in (unfinished event) #{@collectd}")
|
||||
end
|
||||
end
|
||||
when 0, 3; v = body.slice!(0..7).pack("C*").unpack("Q>")[0]
|
||||
when 1; v = body.slice!(0..7).pack("C*").unpack("E")[0]
|
||||
when 2; v = body.slice!(0..7).pack("C*").unpack("q>")[0]
|
||||
else; v = 0
|
||||
end
|
||||
|
@ -326,6 +351,20 @@ class LogStash::Inputs::Collectd < LogStash::Inputs::Base
|
|||
output_queue << event
|
||||
end # def generate_event
|
||||
|
||||
private
|
||||
def clean_up()
|
||||
@collectd.each_key do |k|
|
||||
@collectd.delete(k) if !['host','collectd_type', 'plugin', 'plugin_instance', '@timestamp', 'type_instance'].include?(k)
|
||||
end
|
||||
end # def clean_up
|
||||
|
||||
private
|
||||
def add_tag(new_tag)
|
||||
return if new_tag.empty?
|
||||
@collectd['tags'] ||= []
|
||||
@collectd['tags'] << new_tag
|
||||
end
|
||||
|
||||
private
|
||||
def collectd_listener(output_queue)
|
||||
@logger.info("Starting Collectd listener", :address => "#{@host}:#{@port}")
|
||||
|
@ -400,6 +439,9 @@ class LogStash::Inputs::Collectd < LogStash::Inputs::Base
|
|||
else # Otherwise it's a single value
|
||||
@collectd['value'] = values[0] # So name it 'value' accordingly
|
||||
end
|
||||
elsif !values
|
||||
clean_up()
|
||||
next
|
||||
elsif field != nil # Not an array, make sure it's non-empty
|
||||
@collectd[field] = values # Append values to @collectd under key field
|
||||
end
|
||||
|
@ -408,10 +450,7 @@ class LogStash::Inputs::Collectd < LogStash::Inputs::Base
|
|||
if ((@prune_intervals && ![7,9].include?(typenum)) || !@prune_intervals)
|
||||
generate_event(output_queue)
|
||||
end
|
||||
# Clean up the event
|
||||
@collectd.each_key do |k|
|
||||
@collectd.delete(k) if !['host','collectd_type', 'plugin', 'plugin_instance', '@timestamp', 'type_instance'].include?(k)
|
||||
end
|
||||
clean_up()
|
||||
end
|
||||
end # while payload.length > 0 do
|
||||
end # loop do
|
||||
|
|
|
@ -185,4 +185,145 @@ describe "inputs/collectd", :socket => true do
|
|||
|
||||
end # input
|
||||
end # describe
|
||||
|
||||
describe "changes NaN to 0 in the default config" do
|
||||
config <<-CONFIG
|
||||
input {
|
||||
collectd {
|
||||
type => "collectd"
|
||||
host => "127.0.0.1"
|
||||
# normal collectd port + 1
|
||||
port => 25827
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
# Sleep so collectd can init itself
|
||||
sleep 3
|
||||
|
||||
msg = ['000000356b756d696e613a70726f64756374696f6e3a6c6965746572732d6b6c6170746f702e70726f742e706c657869732e6575000008000c14baed07bfc492e90009000c00000002800000000002000c63707566726571000004000c63707566726571000005000631000006000f0001010000000084d7c7410008000c14baed07bfc39a790005000630000006000f0001010000000084d7c7410008000c14baed07bfca78480002000764660000030009726f6f74000004000f64665f636f6d706c6578000005000966726565000006000f000101000000002e82ef410008000c14baed07bfcaa14c0005000d7265736572766564000006000f00010100000000a09ec5410008000c14baed07bfcaad4f0005000975736564000006000f00010100000080080d04420008000c14baed07bfcb0f2900030009626f6f74000005000966726565000006000f0001010000000048fcca410008000c14baed07bfcb1bc20005000d7265736572766564000006000f00010100000000c0cc90410008000c14baed07bfcb285b0005000975736564000006000f00010100000000009586410008000c14baed07bfcb489500030009686f6d65000005000966726565000006000f000101000000c0557a12420008000c14baed07bfcb54980005000d7265736572766564000006000f000101000000000020e4410008000c14baed07bfcb5f6f0005000975736564000006000f00010100000000d2181c420008000c14baed07bfc2f24f000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010200000000000000020008000c14baed07bfc2d4b80005000969646c65000006000f0001020000000000022ada0008000c14baed07bfc2bc68000500096e696365000006000f00010200000000000000080008000c14baed07bfeb5d1e0002000e6d656d6361636865640000030005000004001a6d656d6361636865645f636f6e6e656374696f6e73000005000c63757272656e74000006000f00010100000000000014400008000c14baed07bfeb947c000400166d656d6361636865645f636f6d6d616e640000050008676574000006000f00010200000000000000000008000c14baed07bfebb42100050008736574000006000f00010200000000000000000008000c14baed07bfebcd9e0005000a666c757368000006000f00010200000000000000000008000c14baed07bfebe5ee0005000a746f756368000006000f00010200000000000000000008000c14baed07bfebfdf5000400126d656d6361636865645f6f7073000005000968697473000006000f00010200000000000000000008000c14baed07bfec0ea80005000b6d6973736573000006000f00010200000000000000000008000c14baed07bfec278f00050010696e63725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec36350005000e696e63725f68697473000006000f00010200000000000000000008000c14baed07bfec45bc00050010646563725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec54620005000e646563725f68697473000006000f00010200000000000000000008000c14baed07bfeca08e0004000d70735f636f756e740000050005000006001800020101000000000000f87f00000000000010400008000c14baed07bfecce42000400146d656d6361636865645f6974656d73000005000c63757272656e74000006000f0001010000000000000000'].pack('H*')
|
||||
|
||||
udp_sock.send(msg, 0, "127.0.0.1", 25827)
|
||||
|
||||
# give it time to process
|
||||
sleep 2
|
||||
|
||||
insist { queue.size } == 27
|
||||
|
||||
events = 26.times.collect { queue.pop }
|
||||
|
||||
insist { events[25]['tags'] } == ['_collectdNaN']
|
||||
insist { events[25]['threads'] } == 4
|
||||
insist { events[25]['processes'] } == 0
|
||||
|
||||
end # input do
|
||||
end # describe
|
||||
|
||||
describe "changes NaN to -1 when configged to do so" do
|
||||
config <<-CONFIG
|
||||
input {
|
||||
collectd {
|
||||
type => "collectd"
|
||||
host => "127.0.0.1"
|
||||
# normal collectd port + 1
|
||||
port => 25827
|
||||
nan_value => -1
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
# Sleep so collectd can init itself
|
||||
sleep 3
|
||||
|
||||
msg = ['000000356b756d696e613a70726f64756374696f6e3a6c6965746572732d6b6c6170746f702e70726f742e706c657869732e6575000008000c14baed07bfc492e90009000c00000002800000000002000c63707566726571000004000c63707566726571000005000631000006000f0001010000000084d7c7410008000c14baed07bfc39a790005000630000006000f0001010000000084d7c7410008000c14baed07bfca78480002000764660000030009726f6f74000004000f64665f636f6d706c6578000005000966726565000006000f000101000000002e82ef410008000c14baed07bfcaa14c0005000d7265736572766564000006000f00010100000000a09ec5410008000c14baed07bfcaad4f0005000975736564000006000f00010100000080080d04420008000c14baed07bfcb0f2900030009626f6f74000005000966726565000006000f0001010000000048fcca410008000c14baed07bfcb1bc20005000d7265736572766564000006000f00010100000000c0cc90410008000c14baed07bfcb285b0005000975736564000006000f00010100000000009586410008000c14baed07bfcb489500030009686f6d65000005000966726565000006000f000101000000c0557a12420008000c14baed07bfcb54980005000d7265736572766564000006000f000101000000000020e4410008000c14baed07bfcb5f6f0005000975736564000006000f00010100000000d2181c420008000c14baed07bfc2f24f000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010200000000000000020008000c14baed07bfc2d4b80005000969646c65000006000f0001020000000000022ada0008000c14baed07bfc2bc68000500096e696365000006000f00010200000000000000080008000c14baed07bfeb5d1e0002000e6d656d6361636865640000030005000004001a6d656d6361636865645f636f6e6e656374696f6e73000005000c63757272656e74000006000f00010100000000000014400008000c14baed07bfeb947c000400166d656d6361636865645f636f6d6d616e640000050008676574000006000f00010200000000000000000008000c14baed07bfebb42100050008736574000006000f00010200000000000000000008000c14baed07bfebcd9e0005000a666c757368000006000f00010200000000000000000008000c14baed07bfebe5ee0005000a746f756368000006000f00010200000000000000000008000c14baed07bfebfdf5000400126d656d6361636865645f6f7073000005000968697473000006000f00010200000000000000000008000c14baed07bfec0ea80005000b6d6973736573000006000f00010200000000000000000008000c14baed07bfec278f00050010696e63725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec36350005000e696e63725f68697473000006000f00010200000000000000000008000c14baed07bfec45bc00050010646563725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec54620005000e646563725f68697473000006000f00010200000000000000000008000c14baed07bfeca08e0004000d70735f636f756e740000050005000006001800020101000000000000f87f00000000000010400008000c14baed07bfecce42000400146d656d6361636865645f6974656d73000005000c63757272656e74000006000f0001010000000000000000'].pack('H*')
|
||||
|
||||
udp_sock.send(msg, 0, "127.0.0.1", 25827)
|
||||
|
||||
# give it time to process
|
||||
sleep 2
|
||||
|
||||
insist { queue.size } == 27
|
||||
|
||||
events = 26.times.collect { queue.pop }
|
||||
|
||||
insist { events[25]['tags'] } == ['_collectdNaN']
|
||||
insist { events[25]['threads'] } == 4
|
||||
insist { events[25]['processes'] } == -1
|
||||
|
||||
end # input do
|
||||
end # describe
|
||||
|
||||
describe "Drops the event when NaN is found" do
|
||||
config <<-CONFIG
|
||||
input {
|
||||
collectd {
|
||||
type => "collectd"
|
||||
host => "127.0.0.1"
|
||||
# normal collectd port + 1
|
||||
port => 25827
|
||||
nan_handeling => 'drop'
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
# Sleep so collectd can init itself
|
||||
sleep 3
|
||||
|
||||
msg = ['000000356b756d696e613a70726f64756374696f6e3a6c6965746572732d6b6c6170746f702e70726f742e706c657869732e6575000008000c14baed07bfc492e90009000c00000002800000000002000c63707566726571000004000c63707566726571000005000631000006000f0001010000000084d7c7410008000c14baed07bfc39a790005000630000006000f0001010000000084d7c7410008000c14baed07bfca78480002000764660000030009726f6f74000004000f64665f636f6d706c6578000005000966726565000006000f000101000000002e82ef410008000c14baed07bfcaa14c0005000d7265736572766564000006000f00010100000000a09ec5410008000c14baed07bfcaad4f0005000975736564000006000f00010100000080080d04420008000c14baed07bfcb0f2900030009626f6f74000005000966726565000006000f0001010000000048fcca410008000c14baed07bfcb1bc20005000d7265736572766564000006000f00010100000000c0cc90410008000c14baed07bfcb285b0005000975736564000006000f00010100000000009586410008000c14baed07bfcb489500030009686f6d65000005000966726565000006000f000101000000c0557a12420008000c14baed07bfcb54980005000d7265736572766564000006000f000101000000000020e4410008000c14baed07bfcb5f6f0005000975736564000006000f00010100000000d2181c420008000c14baed07bfc2f24f000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010200000000000000020008000c14baed07bfc2d4b80005000969646c65000006000f0001020000000000022ada0008000c14baed07bfc2bc68000500096e696365000006000f00010200000000000000080008000c14baed07bfeb5d1e0002000e6d656d6361636865640000030005000004001a6d656d6361636865645f636f6e6e656374696f6e73000005000c63757272656e74000006000f00010100000000000014400008000c14baed07bfeb947c000400166d656d6361636865645f636f6d6d616e640000050008676574000006000f00010200000000000000000008000c14baed07bfebb42100050008736574000006000f00010200000000000000000008000c14baed07bfebcd9e0005000a666c757368000006000f00010200000000000000000008000c14baed07bfebe5ee0005000a746f756368000006000f00010200000000000000000008000c14baed07bfebfdf5000400126d656d6361636865645f6f7073000005000968697473000006000f00010200000000000000000008000c14baed07bfec0ea80005000b6d6973736573000006000f00010200000000000000000008000c14baed07bfec278f00050010696e63725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec36350005000e696e63725f68697473000006000f00010200000000000000000008000c14baed07bfec45bc00050010646563725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec54620005000e646563725f68697473000006000f00010200000000000000000008000c14baed07bfeca08e0004000d70735f636f756e740000050005000006001800020101000000000000f87f00000000000010400008000c14baed07bfecce42000400146d656d6361636865645f6974656d73000005000c63757272656e74000006000f0001010000000000000000'].pack('H*')
|
||||
|
||||
udp_sock.send(msg, 0, "127.0.0.1", 25827)
|
||||
|
||||
# give it time to process
|
||||
sleep 2
|
||||
|
||||
insist { queue.size } == 26
|
||||
end # input do
|
||||
end # describe
|
||||
|
||||
describe "Empty nan_tag doesnt add a tag" do
|
||||
config <<-CONFIG
|
||||
input {
|
||||
collectd {
|
||||
type => "collectd"
|
||||
host => "127.0.0.1"
|
||||
# normal collectd port + 1
|
||||
port => 25827
|
||||
nan_tag => ''
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
# Sleep so collectd can init itself
|
||||
sleep 3
|
||||
|
||||
msg = ['000000356b756d696e613a70726f64756374696f6e3a6c6965746572732d6b6c6170746f702e70726f742e706c657869732e6575000008000c14baed07bfc492e90009000c00000002800000000002000c63707566726571000004000c63707566726571000005000631000006000f0001010000000084d7c7410008000c14baed07bfc39a790005000630000006000f0001010000000084d7c7410008000c14baed07bfca78480002000764660000030009726f6f74000004000f64665f636f6d706c6578000005000966726565000006000f000101000000002e82ef410008000c14baed07bfcaa14c0005000d7265736572766564000006000f00010100000000a09ec5410008000c14baed07bfcaad4f0005000975736564000006000f00010100000080080d04420008000c14baed07bfcb0f2900030009626f6f74000005000966726565000006000f0001010000000048fcca410008000c14baed07bfcb1bc20005000d7265736572766564000006000f00010100000000c0cc90410008000c14baed07bfcb285b0005000975736564000006000f00010100000000009586410008000c14baed07bfcb489500030009686f6d65000005000966726565000006000f000101000000c0557a12420008000c14baed07bfcb54980005000d7265736572766564000006000f000101000000000020e4410008000c14baed07bfcb5f6f0005000975736564000006000f00010100000000d2181c420008000c14baed07bfc2f24f000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010200000000000000020008000c14baed07bfc2d4b80005000969646c65000006000f0001020000000000022ada0008000c14baed07bfc2bc68000500096e696365000006000f00010200000000000000080008000c14baed07bfeb5d1e0002000e6d656d6361636865640000030005000004001a6d656d6361636865645f636f6e6e656374696f6e73000005000c63757272656e74000006000f00010100000000000014400008000c14baed07bfeb947c000400166d656d6361636865645f636f6d6d616e640000050008676574000006000f00010200000000000000000008000c14baed07bfebb42100050008736574000006000f00010200000000000000000008000c14baed07bfebcd9e0005000a666c757368000006000f00010200000000000000000008000c14baed07bfebe5ee0005000a746f756368000006000f00010200000000000000000008000c14baed07bfebfdf5000400126d656d6361636865645f6f7073000005000968697473000006000f00010200000000000000000008000c14baed07bfec0ea80005000b6d6973736573000006000f00010200000000000000000008000c14baed07bfec278f00050010696e63725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec36350005000e696e63725f68697473000006000f00010200000000000000000008000c14baed07bfec45bc00050010646563725f6d6973736573000006000f00010200000000000000000008000c14baed07bfec54620005000e646563725f68697473000006000f00010200000000000000000008000c14baed07bfeca08e0004000d70735f636f756e740000050005000006001800020101000000000000f87f00000000000010400008000c14baed07bfecce42000400146d656d6361636865645f6974656d73000005000c63757272656e74000006000f0001010000000000000000'].pack('H*')
|
||||
|
||||
udp_sock.send(msg, 0, "127.0.0.1", 25827)
|
||||
|
||||
# give it time to process
|
||||
sleep 2
|
||||
|
||||
events = 26.times.collect { queue.pop }
|
||||
|
||||
insist { events[25]['tags'] }.nil?
|
||||
end # input do
|
||||
end # describe
|
||||
|
||||
end # describe "inputs/collectd"
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue