mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Latest info:
* Hash + Closures. Now replacing case/when statements with closures and hashes to improve performance * Raise exceptions now instead of using `break` * Zipped lines 163-164 per @colinsurprenant * In line 280, this change is a regression. The authfile is tiny, so this should still be okay, and it's only read once. Tests failed without this change. * Use `has_key?` instead of `include?()` in multiple places. * In the spec/test, I fixed a typo that was bugging me. With this commit, all tests pass! ``` $ bin/logstash rspec -fd spec/codecs/collectd.rb Using Accessor#strict_set for specs Run options: exclude {:redis=>true, :socket=>true, :performance=>true, :elasticsearch=>true, :broken=>true} LogStash::Codecs::Collectd None should parse a normal packet Sign should parse a correctly signed packet should not parse an incorrectly signed packet Encrypt should parse an encrypted packet should not parse unencrypted packets when encrypt is configured Finished in 2.1 seconds 5 examples, 0 failures ```
This commit is contained in:
parent
6bf38771c1
commit
25033496cd
2 changed files with 179 additions and 107 deletions
|
@ -36,18 +36,26 @@ require "time"
|
|||
# Be sure to replace "10.0.0.1" with the IP of your Logstash instance.
|
||||
#
|
||||
|
||||
#
|
||||
class ProtocolError < LogStash::Error; end
|
||||
class EncryptionError < LogStash::Error; end
|
||||
|
||||
class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
||||
config_name "collectd"
|
||||
milestone 1
|
||||
|
||||
AUTHFILEREGEX = /([^:]+): (.+)/
|
||||
|
||||
PLUGIN_TYPE = 2
|
||||
COLLECTD_TYPE = 4
|
||||
SIGNATURE_TYPE = 512
|
||||
ENCRYPTION_TYPE = 528
|
||||
|
||||
TYPEMAP = {
|
||||
0 => "host",
|
||||
1 => "@timestamp",
|
||||
2 => "plugin",
|
||||
PLUGIN_TYPE => "plugin",
|
||||
3 => "plugin_instance",
|
||||
4 => "collectd_type",
|
||||
COLLECTD_TYPE => "collectd_type",
|
||||
5 => "type_instance",
|
||||
6 => "values",
|
||||
7 => "interval",
|
||||
|
@ -55,8 +63,39 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
9 => "interval",
|
||||
256 => "message",
|
||||
257 => "severity",
|
||||
512 => "signature",
|
||||
528 => "encryption"
|
||||
SIGNATURE_TYPE => "signature",
|
||||
ENCRYPTION_TYPE => "encryption"
|
||||
}
|
||||
|
||||
PLUGIN_TYPE_FIELDS = {
|
||||
'host' => true,
|
||||
'@timestamp' => true,
|
||||
}
|
||||
|
||||
COLLECTD_TYPE_FIELDS = {
|
||||
'host' => true,
|
||||
'@timestamp' => true,
|
||||
'plugin' => true,
|
||||
'plugin_instance' => true,
|
||||
}
|
||||
|
||||
INTERVAL_VALUES_FIELDS = {
|
||||
"interval" => true,
|
||||
"values" => true,
|
||||
}
|
||||
|
||||
INTERVAL_BASE_FIELDS = {
|
||||
'host' => true,
|
||||
'collectd_type' => true,
|
||||
'plugin' => true,
|
||||
'plugin_instance' => true,
|
||||
'@timestamp' => true,
|
||||
'type_instance' => true,
|
||||
}
|
||||
|
||||
INTERVAL_TYPES = {
|
||||
7 => true,
|
||||
9 => true,
|
||||
}
|
||||
|
||||
SECURITY_NONE = "None"
|
||||
|
@ -121,61 +160,56 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
File.open(path, 'r').each_line do |line|
|
||||
typename, *line = line.strip.split
|
||||
@logger.debug("typename", :typename => typename.to_s)
|
||||
next if typename.nil?
|
||||
next if typename[0,1] == '#'
|
||||
v = line.collect { |l| l.strip.split(":")[0] }
|
||||
types[typename] = v
|
||||
next if typename.nil? || typename[0,1] == '#'
|
||||
types[typename] = line.collect { |l| l.strip.split(":")[0] }
|
||||
end
|
||||
end
|
||||
@logger.debug("Collectd Types", :types => types.to_s)
|
||||
return types
|
||||
end # def get_types
|
||||
|
||||
public
|
||||
def get_values(id, body)
|
||||
case id
|
||||
when 0,2,3,4,5,256 #=> String types
|
||||
retval = body.pack("C*")
|
||||
retval = retval[0..-2]
|
||||
when 1 # Time
|
||||
# Time here, in bit-shifted format. Parse bytes into UTC.
|
||||
# Lambdas for hash + closure methodology
|
||||
# This replaces when statements for fixed values and is much faster
|
||||
string_decoder = lambda { |body| body.pack("C*")[0..-2] }
|
||||
numeric_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
|
||||
counter_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("Q>")[0] }
|
||||
gauge_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
|
||||
derive_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("q>")[0] }
|
||||
# For Low-Resolution time
|
||||
time_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
retval = Time.at(( ((byte1 << 32) + byte2))).utc
|
||||
when 7,257 #=> Numeric types
|
||||
retval = body.slice!(0..7).pack("C*").unpack("E")[0]
|
||||
when 8 # Time, Hi-Res
|
||||
# Time here, in bit-shifted format. Parse bytes into UTC.
|
||||
Time.at(( ((byte1 << 32) + byte2))).utc
|
||||
end
|
||||
# Hi-Resolution time
|
||||
hirestime_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
retval = Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
|
||||
when 9 # Interval, Hi-Res
|
||||
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
|
||||
end
|
||||
# Hi resolution intervals
|
||||
hiresinterval_decoder = lambda do |body|
|
||||
byte1, byte2 = body.pack("C*").unpack("NN")
|
||||
retval = (((byte1 << 32) + byte2) * (2**-30)).to_i
|
||||
when 6 # Values
|
||||
val_bytes = body.slice!(0..1)
|
||||
val_count = val_bytes.pack("C*").unpack("n")
|
||||
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
|
||||
end
|
||||
# Values decoder
|
||||
values_decoder = lambda do |body|
|
||||
remove_header = body.slice!(0..1)
|
||||
if body.length % 9 == 0 # Should be 9 fields
|
||||
count = 0
|
||||
retval = []
|
||||
# Iterate through and take a slice each time
|
||||
types = body.slice!(0..((body.length/9)-1))
|
||||
while body.length > 0
|
||||
# TYPE VALUES:
|
||||
# 0: COUNTER
|
||||
# 1: GAUGE
|
||||
# 2: DERIVE
|
||||
# 3: ABSOLUTE
|
||||
case types[count]
|
||||
when 0, 3; v = body.slice!(0..7).pack("C*").unpack("Q>")[0]
|
||||
when 1; v = body.slice!(0..7).pack("C*").unpack("E")[0]
|
||||
when 2; v = body.slice!(0..7).pack("C*").unpack("q>")[0]
|
||||
else; v = 0
|
||||
end
|
||||
retval << v
|
||||
# Use another hash + closure here...
|
||||
retval << VALUES_DECODER[types[count]].call(body)
|
||||
count += 1
|
||||
end
|
||||
else
|
||||
@logger.error("Incorrect number of data fields for collectd record", :body => body.to_s)
|
||||
end
|
||||
when 512 # signature
|
||||
return retval
|
||||
end
|
||||
# Signature
|
||||
signature_decoder = lambda do |body|
|
||||
if body.length < 32
|
||||
@logger.warning("SHA256 signature too small (got #{body.length} bytes instead of 32)")
|
||||
elsif body.length < 33
|
||||
|
@ -187,15 +221,51 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
# Byte 0 till 31 contain the signature
|
||||
retval << body[0..31].pack('C*')
|
||||
end
|
||||
when 528 # encryption
|
||||
return retval
|
||||
end
|
||||
# Encryption
|
||||
encryption_decoder = lambda do |body|
|
||||
retval = []
|
||||
user_length = (body.slice!(0) << 8) + body.slice!(0)
|
||||
retval << body.slice!(0..user_length-1).pack('C*') # Username
|
||||
retval << body.slice!(0..15).pack('C*') # IV
|
||||
retval << body.pack('C*') # Encrypted content
|
||||
end
|
||||
retval << body.pack('C*')
|
||||
return retval
|
||||
end # def get_values
|
||||
end
|
||||
# Lambda Hashes
|
||||
ID_DECODER = {
|
||||
0 => string_decoder,
|
||||
1 => time_decoder,
|
||||
2 => string_decoder,
|
||||
3 => string_decoder,
|
||||
4 => string_decoder,
|
||||
5 => string_decoder,
|
||||
6 => values_decoder,
|
||||
7 => numeric_decoder,
|
||||
8 => hirestime_decoder,
|
||||
9 => hiresinterval_decoder,
|
||||
256 => string_decoder,
|
||||
257 => numeric_decoder,
|
||||
512 => signature_decoder,
|
||||
528 => encryption_decoder
|
||||
}
|
||||
# TYPE VALUES:
|
||||
# 0: COUNTER
|
||||
# 1: GAUGE
|
||||
# 2: DERIVE
|
||||
# 3: ABSOLUTE
|
||||
VALUES_DECODER = {
|
||||
0 => counter_decoder,
|
||||
1 => gauge_decoder,
|
||||
2 => derive_decoder,
|
||||
3 => counter_decoder
|
||||
}
|
||||
|
||||
public
|
||||
def get_values(id, body)
|
||||
# Use hash + closure/lambda to speed operations
|
||||
ID_DECODER[id].call(body)
|
||||
end
|
||||
|
||||
private
|
||||
def parse_authfile
|
||||
|
@ -207,7 +277,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
end
|
||||
@auth.clear
|
||||
@authmtime = File.stat(@authfile).mtime
|
||||
File.readlines(@authfile).each_line do
|
||||
File.readlines(@authfile).each do |line|
|
||||
#line.chomp!
|
||||
k,v = line.scan(AUTHFILEREGEX).flatten
|
||||
if k && v
|
||||
|
@ -304,43 +374,45 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
|
||||
values = get_values(typenum, body)
|
||||
|
||||
case field
|
||||
when "signature"
|
||||
break if !verify_signature(values[0], values[1], payload)
|
||||
case typenum
|
||||
when SIGNATURE_TYPE
|
||||
raise(EncryptionError) unless verify_signature(values[0], values[1], payload)
|
||||
next
|
||||
when "encryption"
|
||||
when ENCRYPTION_TYPE
|
||||
payload = decrypt_packet(values[0], values[1], values[2])
|
||||
# decrypt_packet returns an empty array if the decryption was
|
||||
# unsuccessful and this inner loop checks the length. So we can safely
|
||||
# set the 'was_encrypted' variable.
|
||||
raise(EncryptionError) if payload.empty?
|
||||
was_encrypted = true
|
||||
next
|
||||
when "plugin"
|
||||
when PLUGIN_TYPE
|
||||
# We've reached a new plugin, delete everything except for the the host
|
||||
# field, because there's only one per packet and the timestamp field,
|
||||
# because that one goes in front of the plugin
|
||||
collectd.each_key do |k|
|
||||
collectd.delete(k) if !['host', '@timestamp'].include?(k)
|
||||
collectd.delete(k) unless PLUGIN_TYPE_FIELDS.has_key?(k)
|
||||
end
|
||||
when "collectd_type"
|
||||
when COLLECTD_TYPE
|
||||
# We've reached a new type within the plugin section, delete all fields
|
||||
# that could have something to do with the previous type (if any)
|
||||
collectd.each_key do |k|
|
||||
collectd.delete(k) if !['host', '@timestamp', 'plugin', 'plugin_instance'].include?(k)
|
||||
collectd.delete(k) unless COLLECTD_TYPE_FIELDS.has_key?(k)
|
||||
end
|
||||
end
|
||||
|
||||
break if !was_encrypted and @security_level == SECURITY_ENCR
|
||||
raise(EncryptionError) if !was_encrypted and @security_level == SECURITY_ENCR
|
||||
|
||||
# Fill in the fields.
|
||||
if values.is_a?(Array)
|
||||
if values.length > 1 # Only do this iteration on multi-value arrays
|
||||
values.each_with_index do |value, x|
|
||||
begin
|
||||
type = collectd['collectd_type']
|
||||
key = @types[type]
|
||||
key_x = key[x]
|
||||
# assign
|
||||
collectd[key_x] = value
|
||||
rescue
|
||||
@logger.error("Invalid value for type=#{type.inspect}, key=#{@types[type].inspect}, index=#{x}")
|
||||
end
|
||||
end
|
||||
else # Otherwise it's a single value
|
||||
collectd['value'] = values[0] # So name it 'value' accordingly
|
||||
|
@ -349,8 +421,8 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
collectd[field] = values # Append values to collectd under key field
|
||||
end
|
||||
|
||||
if ["interval", "values"].include?(field)
|
||||
if ((@prune_intervals && ![7,9].include?(typenum)) || !@prune_intervals)
|
||||
if INTERVAL_VALUES_FIELDS.has_key?(field)
|
||||
if ((@prune_intervals && !INTERVAL_TYPES.has_key?(typenum)) || !@prune_intervals)
|
||||
# Prune these *specific* keys if they exist and are empty.
|
||||
# This is better than looping over all keys every time.
|
||||
collectd.delete('type_instance') if collectd['type_instance'] == ""
|
||||
|
@ -361,12 +433,12 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
|
|||
end
|
||||
# Clean up the event
|
||||
collectd.each_key do |k|
|
||||
collectd.delete(k) if !['host','collectd_type', 'plugin', 'plugin_instance', '@timestamp', 'type_instance'].include?(k)
|
||||
collectd.delete(k) if !INTERVAL_BASE_FIELDS.has_key?(k)
|
||||
end
|
||||
# This needs to go here to clean up before the next chunk iteration
|
||||
was_encrypted = false
|
||||
end
|
||||
end # while payload.length > 0 do
|
||||
rescue EncryptionError, ProtocolError
|
||||
# basically do nothing, we just want out
|
||||
end # def decode
|
||||
|
||||
end # class LogStash::Codecs::Collectd
|
||||
|
|
|
@ -55,7 +55,7 @@ describe LogStash::Codecs::Collectd do
|
|||
insist { counter } == 24
|
||||
end # it "should parse a correctly signed packet"
|
||||
|
||||
it "should not parse and incorrectly signed packet" do
|
||||
it "should not parse an incorrectly signed packet" do
|
||||
payload = ["0200002a815d5d7f1e72250eee4d37251bf688fbc06ec87e3cbaf289390ef47ad7c413ce706965746572000000236c6965746572732d6b6c6170746f702e70726f742e706c657869732e6575000008000c14b0aa39ef05b3a80009000c000000028000000000020008697271000004000869727100000500084d4953000006000f00010200000000000000000008000c14b0aa39ef06c381000200096c6f616400000400096c6f616400000500050000060021000301010148e17a14ae47e13f85eb51b81e85db3f52b81e85eb51e03f0008000c14b0aa39ef0a7a150002000b6d656d6f7279000004000b6d656d6f7279000005000975736564000006000f000101000000006ce8dc410008000c14b0aa39ef0a87440005000d6275666665726564000006000f00010100000000c0eaa9410008000c14b0aa39ef0a91850005000b636163686564000006000f000101000000002887c8410008000c14b0aa39ef0a9b2f0005000966726565000006000f00010100000000580ed1410008000c14b0aa39ef1b3b8f0002000e696e74657266616365000003000974756e30000004000e69665f6f63746574730000050005000006001800020202000000000000df5f00000000000060c10008000c14b0aa39ef1b49ea0004000f69665f7061636b6574730000060018000202020000000000000177000000000000017a0008000c14b0aa39ef1b55570004000e69665f6572726f7273000006001800020202000000000000000000000000000000000008000c14b0aa39ef1b7a400003000965746830000004000e69665f6f6374657473000006001800020202000000000000000000000000000000000008000c14b0aa39ef1b85160004000f69665f7061636b657473000006001800020202000000000000000000000000000000000008000c14b0aa39ef1b93bc0004000e69665f6572726f7273000006001800020202000000000000000000000000000000000008000c14b0aa39ef1bb0bc000300076c6f000004000e69665f6f63746574730000060018000202020000000000a92d840000000000a92d840008000c14b0aa39ef1bbbdd0004000f69665f7061636b6574730000060018000202020000000000002c1e0000000000002c1e0008000c14b0aa39ef1bc8760004000e69665f6572726f7273000006001800020202000000000000000000000000000000000008000c14b0aa39ef1be36a0003000a776c616e30000004000e69665f6f6374657473000006001800020202000000001043329b0000000001432a5d0008000c14b0aa39ef1bef6c0004000f69665f7061636b6574730000060018000202020000000000043884000000000002931e0008000c14b0aa39ef1bfa8d0004000e69665f6572726f7273000006001800020202000000000000000000000000000000000008000c14b0aa39ef6e4ff5000200096469736b000003000873646100000400106469736b5f6f637465747300000600180002020200000000357c5000000000010dfb10000008000c14b0aa39ef6e8e5a0004000d6469736b5f6f7073000006001800020202000000000000a6fe0000000000049ee00008000c14b0aa39ef6eae480004000e6469736b5f74696d65000006001800020202000000000000000400000000000000120008000c14b0aa39ef6ecc2a000400106469736b5f6d6572676564000006001800020202000000000000446500000000000002460008000c14b0aa39ef6ef9dc000300097364613100000400106469736b5f6f637465747300000600180002020200000000000bf00000000000000000000008000c14b0aa39ef6f05490004000d6469736b5f6f707300000600180002020200000000000000bf0000000000000000"].pack('H*')
|
||||
counter = 0
|
||||
subject.decode(payload) do |event|
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue