New Graphite Codec

This codec will soon replace the need for a graphite input and output.
The tcp inputs/outputs can be used in conjunction with the graphite codec.
This lets us consolidate the many different ways various tcp based plugins
handle timeouts, exceptions, etc.
This commit is contained in:
Nick Ethier 2013-09-17 14:02:44 -06:00
parent 1ec66fe38e
commit ab23fc2676
2 changed files with 156 additions and 0 deletions

View file

@ -0,0 +1,121 @@
require "logstash/codecs/base"
require "logstash/codecs/line"
require "json"
# This codec will encode and decode Graphite formated lines.
class LogStash::Codecs::Graphite < LogStash::Codecs::Base
config_name "graphite"
milestone 1
# The character encoding used in this codec. Examples include "UTF-8" and
# "CP1252"
#
# JSON requires valid UTF-8 strings, but in some cases, software that
# emits JSON does so in another encoding (nxlog, for example). In
# weird cases like this, you can set the charset setting to the
# actual encoding of the text and logstash will convert it for you.
#
# For nxlog users, you'll want to set this to "CP1252"
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"
EXCLUDE_ALWAYS = [ "@timestamp", "@version" ]
DEFAULT_METRICS_FORMAT = "*"
METRIC_PLACEHOLDER = "*"
# The metric(s) to use. This supports dynamic strings like %{host}
# for metric names and also for values. This is a hash field with key
# of the metric name, value of the metric value. Example:
#
# [ "%{host}/uptime", "%{uptime_1m}" ]
#
# The value will be coerced to a floating point value. Values which cannot be
# coerced will zero (0)
config :metrics, :validate => :hash, :default => {}
# Indicate that the event @fields should be treated as metrics and will be sent as is to graphite
config :fields_are_metrics, :validate => :boolean, :default => false
# Include only regex matched metric names
config :include_metrics, :validate => :array, :default => [ ".*" ]
# Exclude regex matched metric names, by default exclude unresolved %{field} strings
config :exclude_metrics, :validate => :array, :default => [ "%\{[^}]+\}" ]
# Defines format of the metric string. The placeholder '*' will be
# replaced with the name of the actual metric.
#
# metrics_format => "foo.bar.*.sum"
#
# NOTE: If no metrics_format is defined the name of the metric will be used as fallback.
config :metrics_format, :validate => :string, :default => DEFAULT_METRICS_FORMAT
public
def initialize(params={})
super(params)
@lines = LogStash::Codecs::Line.new
end
public
def decode(data)
#data.force_encoding(@charset)
#if @charset != "UTF-8"
# The user has declared the character encoding of this data is
# something other than UTF-8. Let's convert it (as cleanly as possible)
# into UTF-8 so we can use it with JSON, etc.
# data = data.encode("UTF-8", :invalid => :replace, :undef => :replace)
#end
@lines.decode(data) do |event|
name, value, time = event["message"].split(" ")
yield LogStash::Event.new(name => value.to_f, "@timestamp" => Time.at(time.to_i).gmtime)
end # @lines.decode
end # def decode
private
def construct_metric_name(metric)
if @metrics_format
return @metrics_format.gsub(METRIC_PLACEHOLDER, metric)
end
return metric
end
public
def encode(event)
# Graphite message format: metric value timestamp\n
messages = []
timestamp = event.sprintf("%{+%s}")
if @fields_are_metrics
@logger.debug("got metrics event", :metrics => event.to_hash)
event.to_hash.each do |metric,value|
next if EXCLUDE_ALWAYS.include?(metric)
next unless @include_metrics.empty? || @include_metrics.any? { |regexp| metric.match(regexp) }
next if @exclude_metrics.any? {|regexp| metric.match(regexp)}
messages << "#{construct_metric_name(metric)} #{event.sprintf(value.to_s).to_f} #{timestamp}"
end # data.to_hash.each
else
@metrics.each do |metric, value|
@logger.debug("processing", :metric => metric, :value => value)
metric = event.sprintf(metric)
next unless @include_metrics.any? {|regexp| metric.match(regexp)}
next if @exclude_metrics.any? {|regexp| metric.match(regexp)}
messages << "#{construct_metric_name(event.sprintf(metric))} #{event.sprintf(value).to_f} #{timestamp}"
end # @metrics.each
end # if @fields_are_metrics
if messages.empty?
@logger.debug("Message is empty, not emiting anything.", :messages => messages)
else
message = messages.join("\n") + "\n"
@logger.debug("Emiting carbon messages", :messages => messages)
@on_event.call(message)
end # if messages.empty?
end # def encode
end # class LogStash::Codecs::JSON

35
spec/codecs/graphite.rb Normal file
View file

@ -0,0 +1,35 @@
require "logstash/codecs/graphite"
require "logstash/event"
require "insist"
describe LogStash::Codecs::Graphite do
subject do
next LogStash::Codecs::Graphite.new
end
context "#decode" do
it "should return an event from single full graphite line" do
name = Random.srand.to_s(36)
value = Random.rand*1000
timestamp = Time.now.gmtime.to_i
subject.decode("#{name} #{value} #{timestamp}\n") do |event|
insist { event.is_a? LogStash::Event }
insist { event[name] } == value
end
end
end
context "#encode" do
it "should emit an graphite formatted line" do
name = Random.srand.to_s(36)
value = Random.rand*1000
timestamp = Time.now.gmtime
subject.metrics = {name => value}
subject.on_event do |event|
insist { event.is_a? String }
insist { event } == "#{name} #{value} #{timestamp.to_i}\n"
end
subject.encode(LogStash::Event.new)
end
end
end