- make 'metrics' a hash config option

- reconnect on write failure
- reconnect on connect failure
This commit is contained in:
Jordan Sissel 2011-08-17 01:35:08 -07:00
parent d038cb76e0
commit d49f81fb16

View file

@ -8,9 +8,6 @@ require "socket"
# An example use case: At loggly, some of our applications emit aggregated # An example use case: At loggly, some of our applications emit aggregated
# stats in the logs every 10 seconds. Using the grok filter and this output, # stats in the logs every 10 seconds. Using the grok filter and this output,
# I can capture the metric values from the logs and emit them to graphite. # I can capture the metric values from the logs and emit them to graphite.
#
# TODO(sissel): Figure out how to manage multiple metrics coming from the same
# event.
class LogStash::Outputs::Graphite < LogStash::Outputs::Base class LogStash::Outputs::Graphite < LogStash::Outputs::Base
config_name "graphite" config_name "graphite"
@ -20,34 +17,52 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
# The port to connect on your graphite server. # The port to connect on your graphite server.
config :port, :validate => :number, :default => 2003 config :port, :validate => :number, :default => 2003
# The metric to use. This supports dynamic strings like %{@source_host} # The metric(s) to use. This supports dynamic strings like %{@source_host}
# TODO(sissel): This should probably be an array. # for metric names and also for values. This is a hash field with key
config :metric, :validate => :string, :required => true # of the metric name, value of the metric value. Example:
# [ "%{@source_host}/uptime", %{uptime_1m} " ]
# The value to use. This supports dynamic strings like %{bytes} #
# It will be coerced to a floating point value. Values which cannot be # The value will be coerced to a floating point value. Values which cannot be
# coerced will zero (0) # coerced will zero (0)
config :value, :validate => :string, :required => true config :metrics, :validate => :hash, :required => true
def register def register
# TODO(sissel): Retry on failure. connect
@socket = connect
end # def register end # def register
def connect def connect
# TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory. # TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory.
socket = TCPSocket.new(@host, @port) begin
@socket = TCPSocket.new(@host, @port)
rescue Errno::ECONNREFUSED => e
@logger.warn(["Connection refused to graphite server, sleeping...",
{ :host => @host, :port => @port }])
sleep(2)
retry
end
end # def connect end # def connect
public public
def receive(event) def receive(event)
# Graphite message format: metric value timestamp\n # Graphite message format: metric value timestamp\n
message = [event.sprintf(@metric), event.sprintf(@value).to_f,
# Catch exceptions like ECONNRESET and friends, reconnect on failure.
@metrics.each do |metric, value|
message = [event.sprintf(metric), event.sprintf(value).to_f,
event.sprintf("%{+%s}")].join(" ") event.sprintf("%{+%s}")].join(" ")
# TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory. # TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory.
begin
@socket.puts(message) @socket.puts(message)
rescue Errno::EPIPE, Errno::ECONNRESET => e
@logger.warn(["Connection to graphite server died",
{ :exception => e, :host => @host, :port => @port }])
sleep(2)
connect
end
# TODO(sissel): retry on failure TODO(sissel): Make 'retry on failure' # TODO(sissel): resend on failure
# tunable; sometimes it's OK to drop metrics. # TODO(sissel): Make 'resend on failure' tunable; sometimes it's OK to
# drop metrics.
end # @metrics.each
end # def receive end # def receive
end # class LogStash::Outputs::Statsd end # class LogStash::Outputs::Statsd