mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge pull request #686 from bodgit/netflow
Small enhancements to netflow codec
This commit is contained in:
commit
6ae99fcec9
1 changed files with 21 additions and 9 deletions
|
@ -1,6 +1,7 @@
|
||||||
require "logstash/filters/base"
|
require "logstash/filters/base"
|
||||||
require "logstash/namespace"
|
require "logstash/namespace"
|
||||||
|
|
||||||
|
# The "netflow" codec is for decoding Netflow v5/v9 flows.
|
||||||
class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
||||||
config_name "netflow"
|
config_name "netflow"
|
||||||
milestone 1
|
milestone 1
|
||||||
|
@ -8,6 +9,12 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
||||||
# Netflow v9 template cache TTL (minutes)
|
# Netflow v9 template cache TTL (minutes)
|
||||||
config :cache_ttl, :validate => :number, :default => 4000
|
config :cache_ttl, :validate => :number, :default => 4000
|
||||||
|
|
||||||
|
# Specify into what field you want the Netflow data.
|
||||||
|
config :target, :validate => :string, :default => "netflow"
|
||||||
|
|
||||||
|
# Specify which Netflow versions you will accept.
|
||||||
|
config :versions, :validate => :array, :default => [5, 9]
|
||||||
|
|
||||||
public
|
public
|
||||||
def initialize(params={})
|
def initialize(params={})
|
||||||
super(params)
|
super(params)
|
||||||
|
@ -24,6 +31,11 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
||||||
def decode(payload, &block)
|
def decode(payload, &block)
|
||||||
header = Header.read(payload)
|
header = Header.read(payload)
|
||||||
|
|
||||||
|
unless @versions.include?(header.version)
|
||||||
|
@logger.warn("Ignoring Netflow version v#{header.version}")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
if header.version == 5
|
if header.version == 5
|
||||||
flowset = Netflow5PDU.read(payload)
|
flowset = Netflow5PDU.read(payload)
|
||||||
elsif header.version == 9
|
elsif header.version == 9
|
||||||
|
@ -42,11 +54,11 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
||||||
# The flowset header gives us the UTC epoch seconds along with
|
# The flowset header gives us the UTC epoch seconds along with
|
||||||
# residual nanoseconds so we can set @timestamp to that easily
|
# residual nanoseconds so we can set @timestamp to that easily
|
||||||
event["@timestamp"] = Time.at(flowset.unix_sec, flowset.unix_nsec / 1000).utc
|
event["@timestamp"] = Time.at(flowset.unix_sec, flowset.unix_nsec / 1000).utc
|
||||||
event['netflow'] = {}
|
event[@target] = {}
|
||||||
|
|
||||||
# Copy some of the pertinent fields in the header to the event
|
# Copy some of the pertinent fields in the header to the event
|
||||||
['version', 'flow_seq_num', 'engine_type', 'engine_id', 'sampling_algorithm', 'sampling_interval', 'flow_records'].each do |f|
|
['version', 'flow_seq_num', 'engine_type', 'engine_id', 'sampling_algorithm', 'sampling_interval', 'flow_records'].each do |f|
|
||||||
event['netflow'][f] = flowset[f]
|
event[@target][f] = flowset[f]
|
||||||
end
|
end
|
||||||
|
|
||||||
# Create fields in the event from each field in the flow record
|
# Create fields in the event from each field in the flow record
|
||||||
|
@ -65,9 +77,9 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
||||||
micros += 1000000
|
micros += 1000000
|
||||||
end
|
end
|
||||||
# FIXME Again, probably doing this wrong WRT JRuby?
|
# FIXME Again, probably doing this wrong WRT JRuby?
|
||||||
event['netflow'][k.to_s] = Time.at(seconds, micros).utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ")
|
event[@target][k.to_s] = Time.at(seconds, micros).utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ")
|
||||||
else
|
else
|
||||||
event['netflow'][k.to_s] = v
|
event[@target][k.to_s] = v
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -142,15 +154,15 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
||||||
records.each do |r|
|
records.each do |r|
|
||||||
event = LogStash::Event.new(
|
event = LogStash::Event.new(
|
||||||
"@timestamp" => Time.at(flowset.unix_sec).utc,
|
"@timestamp" => Time.at(flowset.unix_sec).utc,
|
||||||
"netflow" => {}
|
@target => {}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fewer fields in the v9 header
|
# Fewer fields in the v9 header
|
||||||
['version', 'flow_seq_num'].each do |f|
|
['version', 'flow_seq_num'].each do |f|
|
||||||
event['netflow'][f] = flowset[f]
|
event[@target][f] = flowset[f]
|
||||||
end
|
end
|
||||||
|
|
||||||
event['netflow']['flowset_id'] = record.flowset_id
|
event[@target]['flowset_id'] = record.flowset_id
|
||||||
|
|
||||||
r.each_pair do |k,v|
|
r.each_pair do |k,v|
|
||||||
case k.to_s
|
case k.to_s
|
||||||
|
@ -159,9 +171,9 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
|
||||||
seconds = flowset.unix_sec - (millis / 1000)
|
seconds = flowset.unix_sec - (millis / 1000)
|
||||||
# v9 did away with the nanosecs field
|
# v9 did away with the nanosecs field
|
||||||
micros = 1000000 - (millis % 1000)
|
micros = 1000000 - (millis % 1000)
|
||||||
event['netflow'][k.to_s] = Time.at(seconds, micros).utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ")
|
event[@target][k.to_s] = Time.at(seconds, micros).utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ")
|
||||||
else
|
else
|
||||||
event['netflow'][k.to_s] = v
|
event[@target][k.to_s] = v
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue