diff --git a/lib/logstash/outputs/graphite.rb b/lib/logstash/outputs/graphite.rb index e09248d36..0d804f23e 100644 --- a/lib/logstash/outputs/graphite.rb +++ b/lib/logstash/outputs/graphite.rb @@ -12,6 +12,9 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base config_name "graphite" plugin_status "beta" + DEFAULT_METRICS_FORMAT = "*" + METRIC_PLACEHOLDER = "*" + # The address of the graphite server. config :host, :validate => :string, :default => "localhost" @@ -46,9 +49,24 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base # Enable debug output config :debug, :validate => :boolean, :default => false + # Defines format of the metric string. The placeholder '*' will be + # replaced with the name of the actual metric. + # + # metrics_format => "foo.bar.*.sum" + # + # NOTE: If no metrics_format is defined the name of the metric will be used as fallback. + config :metrics_format, :validate => :string, :default => DEFAULT_METRICS_FORMAT + def register @include_metrics.collect!{|regexp| Regexp.new(regexp)} @exclude_metrics.collect!{|regexp| Regexp.new(regexp)} + + if @metrics_format && !@metrics_format.include?(METRIC_PLACEHOLDER) + @logger.warn("metrics_format does not include placeholder #{METRIC_PLACEHOLDER} .. falling back to default format: #{DEFAULT_METRICS_FORMAT.inspect}") + + @metrics_format = DEFAULT_METRICS_FORMAT + end + connect end # def register @@ -64,10 +82,18 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base end end # def connect + def construct_metric_name(metric) + if @metrics_format + return @metrics_format.gsub(METRIC_PLACEHOLDER, metric) + end + + metric + end + public def receive(event) return unless output?(event) - + # Graphite message format: metric value timestamp\n messages = [] @@ -78,7 +104,7 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base event.fields.each do |metric,value| next unless @include_metrics.any? {|regexp| metric.match(regexp)} next if @exclude_metrics.any? {|regexp| metric.match(regexp)} - messages << "#{metric} #{value.to_f} #{timestamp}" + messages << "#{construct_metric_name(metric)} #{event.sprintf(value.to_s).to_f} #{timestamp}" end else @metrics.each do |metric, value| @@ -86,11 +112,13 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base metric = event.sprintf(metric) next unless @include_metrics.any? {|regexp| metric.match(regexp)} next if @exclude_metrics.any? {|regexp| metric.match(regexp)} - messages << "#{event.sprintf(metric)} #{event.sprintf(value).to_f} #{timestamp}" + messages << "#{construct_metric_name(event.sprintf(metric))} #{event.sprintf(value).to_f} #{timestamp}" end end - unless messages.empty? + if messages.empty? + @logger.debug("Message is empty, not sending anything to graphite", :messages => messages, :host => @host, :port => @port) + else message = messages.join("\n") @logger.debug("Sending carbon messages", :messages => messages, :host => @host, :port => @port) @@ -106,6 +134,6 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base retry if @resend_on_failure end end - + end # def receive -end # class LogStash::Outputs::Statsd \ No newline at end of file +end # class LogStash::Outputs::Graphite diff --git a/spec/outputs/graphite.rb b/spec/outputs/graphite.rb new file mode 100644 index 000000000..923299e51 --- /dev/null +++ b/spec/outputs/graphite.rb @@ -0,0 +1,213 @@ +require "test_utils" +require "logstash/outputs/graphite" + +require "mocha" + +describe LogStash::Outputs::Graphite do + extend LogStash::RSpec + + def self.run_agent(config_str) + agent = LogStash::Agent.new + agent.run(["-e", config_str]) + agent.wait + end + + describe "fields are metrics = true" do + describe "metrics_format set" do + describe "match one key" do + config_str = <<-CONFIG + input { + generator { + message => "foo=123" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + graphite { + host => "localhost" + port => 2003 + fields_are_metrics => true + include_metrics => ["foo"] + metrics_format => "foo.bar.sys.data.*" + debug => true + } + } + CONFIG + + mock = StringIO.new + TCPSocket.expects(:new).with("localhost", 2003).returns(mock) + + run_agent(config_str) + + mock.rewind + lines = mock.readlines + + insist { lines.size } == 1 + insist { lines[0] } =~ /^foo.bar.sys.data.foo 123.0 \d{10,}\n$/ + end + + describe "match all keys" do + config_str = <<-CONFIG + input { + generator { + message => "foo=123 bar=42" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + graphite { + host => "localhost" + port => 2003 + fields_are_metrics => true + include_metrics => [".*"] + metrics_format => "foo.bar.sys.data.*" + debug => true + } + } + CONFIG + + mock = StringIO.new + TCPSocket.expects(:new).with("localhost", 2003).returns(mock) + + run_agent(config_str) + + mock.rewind + lines = mock.readlines.delete_if { |l| l =~ /\.sequence \d+/ } + + insist { lines.size } == 2 + insist { lines.any? { |l| l =~ /^foo.bar.sys.data.foo 123.0 \d{10,}\n$/ } } + insist { lines.any? { |l| l =~ /^foo.bar.sys.data.bar 42.0 \d{10,}\n$/ } } + end + + describe "no match" do + config_str = <<-CONFIG + input { + generator { + message => "foo=123 bar=42" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + graphite { + host => "localhost" + port => 2003 + fields_are_metrics => true + include_metrics => ["notmatchinganything"] + metrics_format => "foo.bar.sys.data.*" + debug => true + } + } + CONFIG + + mock = StringIO.new + TCPSocket.expects(:new).with("localhost", 2003).returns(mock) + + run_agent(config_str) + + mock.rewind + lines = mock.readlines + + insist { lines.size } == 0 + end + + describe "match one key with invalid metric_format" do + config_str = <<-CONFIG + input { + generator { + message => "foo=123" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + graphite { + host => "localhost" + port => 2003 + fields_are_metrics => true + include_metrics => ["foo"] + metrics_format => "invalidformat" + debug => true + } + } + CONFIG + + mock = StringIO.new + TCPSocket.expects(:new).with("localhost", 2003).returns(mock) + + run_agent(config_str) + + mock.rewind + lines = mock.readlines + + insist { lines.size } == 1 + insist { lines[0] } =~ /^foo 123.0 \d{10,}\n$/ + end + end + end + + describe "fields are metrics = false" do + describe "metrics_format not set" do + describe "match one key with metrics list" do + config_str = <<-CONFIG + input { + generator { + message => "foo=123" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + graphite { + host => "localhost" + port => 2003 + fields_are_metrics => false + include_metrics => ["foo"] + metrics => [ "custom.foo", "%{foo}" ] + debug => true + } + } + CONFIG + + mock = StringIO.new + TCPSocket.expects(:new).with("localhost", 2003).returns(mock) + + run_agent(config_str) + + mock.rewind + lines = mock.readlines + + insist { lines.size } == 1 + insist { lines[0] } =~ /^custom.foo 123.0 \d{10,}\n$/ + end + + end + end +end