mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
Minor changes on graphite plugin.
- Added metrics_format option to define the resulting format of the metric string that will be sent to graphite. example: graphite { host => "graphite.local" port => 2003 fields_are_metrics => true include_metrics => ["foo"] metrics_format => "my.system.%{metric}" } - added test suite for graphite plugin
This commit is contained in:
parent
5cd155c2a0
commit
24eb0b2223
2 changed files with 249 additions and 6 deletions
|
@ -12,6 +12,9 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
||||||
config_name "graphite"
|
config_name "graphite"
|
||||||
plugin_status "beta"
|
plugin_status "beta"
|
||||||
|
|
||||||
|
DEFAULT_METRICS_FORMAT = "%{metric}"
|
||||||
|
METRIC_PLACEHOLDER = "%{metric}"
|
||||||
|
|
||||||
# The address of the graphite server.
|
# The address of the graphite server.
|
||||||
config :host, :validate => :string, :default => "localhost"
|
config :host, :validate => :string, :default => "localhost"
|
||||||
|
|
||||||
|
@ -46,9 +49,24 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
||||||
# Enable debug output
|
# Enable debug output
|
||||||
config :debug, :validate => :boolean, :default => false
|
config :debug, :validate => :boolean, :default => false
|
||||||
|
|
||||||
|
# Defines format of the metric string. The placeholder %{metrics} will be
|
||||||
|
# replaced with the name of the actual metric.
|
||||||
|
#
|
||||||
|
# metrics_format => "foo.bar.%{metric}.sum"
|
||||||
|
#
|
||||||
|
# NOTE: If no metrics_format is defined the name of the metric will be used as fallback.
|
||||||
|
config :metrics_format, :validate => :string, :default => DEFAULT_METRICS_FORMAT
|
||||||
|
|
||||||
def register
|
def register
|
||||||
@include_metrics.collect!{|regexp| Regexp.new(regexp)}
|
@include_metrics.collect!{|regexp| Regexp.new(regexp)}
|
||||||
@exclude_metrics.collect!{|regexp| Regexp.new(regexp)}
|
@exclude_metrics.collect!{|regexp| Regexp.new(regexp)}
|
||||||
|
|
||||||
|
if @metrics_format && !@metrics_format.include?(METRIC_PLACEHOLDER)
|
||||||
|
@logger.warn("metrics_format does not include placeholder #{METRIC_PLACEHOLDER} .. falling back to default format: #{DEFAULT_METRICS_FORMAT.inspect}")
|
||||||
|
|
||||||
|
@metrics_format = DEFAULT_METRICS_FORMAT
|
||||||
|
end
|
||||||
|
|
||||||
connect
|
connect
|
||||||
end # def register
|
end # def register
|
||||||
|
|
||||||
|
@ -64,6 +82,14 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
||||||
end
|
end
|
||||||
end # def connect
|
end # def connect
|
||||||
|
|
||||||
|
def construct_metric_name(metric)
|
||||||
|
if @metrics_format
|
||||||
|
return @metrics_format.gsub(METRIC_PLACEHOLDER, metric)
|
||||||
|
end
|
||||||
|
|
||||||
|
metric
|
||||||
|
end
|
||||||
|
|
||||||
public
|
public
|
||||||
def receive(event)
|
def receive(event)
|
||||||
return unless output?(event)
|
return unless output?(event)
|
||||||
|
@ -78,7 +104,7 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
||||||
event.fields.each do |metric,value|
|
event.fields.each do |metric,value|
|
||||||
next unless @include_metrics.any? {|regexp| metric.match(regexp)}
|
next unless @include_metrics.any? {|regexp| metric.match(regexp)}
|
||||||
next if @exclude_metrics.any? {|regexp| metric.match(regexp)}
|
next if @exclude_metrics.any? {|regexp| metric.match(regexp)}
|
||||||
messages << "#{metric} #{value.to_f} #{timestamp}"
|
messages << "#{construct_metric_name(metric)} #{event.sprintf(value.to_s).to_f} #{timestamp}"
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
@metrics.each do |metric, value|
|
@metrics.each do |metric, value|
|
||||||
|
@ -86,11 +112,13 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
||||||
metric = event.sprintf(metric)
|
metric = event.sprintf(metric)
|
||||||
next unless @include_metrics.any? {|regexp| metric.match(regexp)}
|
next unless @include_metrics.any? {|regexp| metric.match(regexp)}
|
||||||
next if @exclude_metrics.any? {|regexp| metric.match(regexp)}
|
next if @exclude_metrics.any? {|regexp| metric.match(regexp)}
|
||||||
messages << "#{event.sprintf(metric)} #{event.sprintf(value).to_f} #{timestamp}"
|
messages << "#{construct_metric_name(event.sprintf(metric))} #{event.sprintf(value).to_f} #{timestamp}"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
unless messages.empty?
|
if messages.empty?
|
||||||
|
@logger.debug("Message is empty, not sending anything to graphite", :messages => messages, :host => @host, :port => @port)
|
||||||
|
else
|
||||||
message = messages.join("\n")
|
message = messages.join("\n")
|
||||||
@logger.debug("Sending carbon messages", :messages => messages, :host => @host, :port => @port)
|
@logger.debug("Sending carbon messages", :messages => messages, :host => @host, :port => @port)
|
||||||
|
|
||||||
|
@ -108,4 +136,4 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
||||||
end
|
end
|
||||||
|
|
||||||
end # def receive
|
end # def receive
|
||||||
end # class LogStash::Outputs::Statsd
|
end # class LogStash::Outputs::Graphite
|
||||||
|
|
215
spec/outputs/graphite.rb
Normal file
215
spec/outputs/graphite.rb
Normal file
|
@ -0,0 +1,215 @@
|
||||||
|
require "test_utils"
|
||||||
|
require "logstash/outputs/graphite"
|
||||||
|
|
||||||
|
require "mocha"
|
||||||
|
|
||||||
|
describe LogStash::Outputs::Graphite do
|
||||||
|
extend LogStash::RSpec
|
||||||
|
|
||||||
|
def self.run_agent(config_str)
|
||||||
|
agent = LogStash::Agent.new
|
||||||
|
agent.run(["-e", config_str])
|
||||||
|
agent.wait
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "fields are metrics = true" do
|
||||||
|
describe "metrics_format set" do
|
||||||
|
describe "match one key" do
|
||||||
|
config_str = <<-CONFIG
|
||||||
|
input {
|
||||||
|
generator {
|
||||||
|
message => "foo=123"
|
||||||
|
count => 1
|
||||||
|
type => "generator"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
filter {
|
||||||
|
kv { }
|
||||||
|
}
|
||||||
|
|
||||||
|
output {
|
||||||
|
graphite {
|
||||||
|
host => "localhost"
|
||||||
|
port => 2003
|
||||||
|
fields_are_metrics => true
|
||||||
|
include_metrics => ["foo"]
|
||||||
|
metrics_format => "foo.bar.sys.data.%{metric}"
|
||||||
|
debug => true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
mock = StringIO.new
|
||||||
|
TCPSocket.expects(:new).with("localhost", 2003).returns(mock)
|
||||||
|
|
||||||
|
run_agent(config_str)
|
||||||
|
|
||||||
|
mock.rewind
|
||||||
|
lines = mock.readlines
|
||||||
|
|
||||||
|
insist { lines.size } == 1
|
||||||
|
insist { lines[0] } =~ /^foo.bar.sys.data.foo 123.0 \d{10,}\n$/
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "match all keys" do
|
||||||
|
config_str = <<-CONFIG
|
||||||
|
input {
|
||||||
|
generator {
|
||||||
|
message => "foo=123 bar=42"
|
||||||
|
count => 1
|
||||||
|
type => "generator"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
filter {
|
||||||
|
kv { }
|
||||||
|
}
|
||||||
|
|
||||||
|
output {
|
||||||
|
graphite {
|
||||||
|
host => "localhost"
|
||||||
|
port => 2003
|
||||||
|
fields_are_metrics => true
|
||||||
|
include_metrics => [".*"]
|
||||||
|
metrics_format => "foo.bar.sys.data.%{metric}"
|
||||||
|
debug => true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
mock = StringIO.new
|
||||||
|
TCPSocket.expects(:new).with("localhost", 2003).returns(mock)
|
||||||
|
|
||||||
|
run_agent(config_str)
|
||||||
|
|
||||||
|
mock.rewind
|
||||||
|
lines = mock.readlines.delete_if { |l| l =~ /\.sequence \d+/ }
|
||||||
|
|
||||||
|
insist { lines.size } == 2
|
||||||
|
insist { lines.any? { |l| l =~ /^foo.bar.sys.data.foo 123.0 \d{10,}\n$/ } }
|
||||||
|
insist { lines.any? { |l| l =~ /^foo.bar.sys.data.bar 42.0 \d{10,}\n$/ } }
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "no match" do
|
||||||
|
config_str = <<-CONFIG
|
||||||
|
input {
|
||||||
|
generator {
|
||||||
|
message => "foo=123 bar=42"
|
||||||
|
count => 1
|
||||||
|
type => "generator"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
filter {
|
||||||
|
kv { }
|
||||||
|
}
|
||||||
|
|
||||||
|
output {
|
||||||
|
graphite {
|
||||||
|
host => "localhost"
|
||||||
|
port => 2003
|
||||||
|
fields_are_metrics => true
|
||||||
|
include_metrics => ["notmatchinganything"]
|
||||||
|
metrics_format => "foo.bar.sys.data.%{metric}"
|
||||||
|
debug => true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
mock = StringIO.new
|
||||||
|
TCPSocket.expects(:new).with("localhost", 2003).returns(mock)
|
||||||
|
|
||||||
|
run_agent(config_str)
|
||||||
|
|
||||||
|
mock.rewind
|
||||||
|
lines = mock.readlines
|
||||||
|
|
||||||
|
insist { lines.size } == 0
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "match one key with invalid metric_format" do
|
||||||
|
config_str = <<-CONFIG
|
||||||
|
input {
|
||||||
|
generator {
|
||||||
|
message => "foo=123"
|
||||||
|
count => 1
|
||||||
|
type => "generator"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
filter {
|
||||||
|
kv { }
|
||||||
|
}
|
||||||
|
|
||||||
|
output {
|
||||||
|
graphite {
|
||||||
|
host => "localhost"
|
||||||
|
port => 2003
|
||||||
|
fields_are_metrics => true
|
||||||
|
include_metrics => ["foo"]
|
||||||
|
metrics_format => "invalidformat"
|
||||||
|
debug => true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
mock = StringIO.new
|
||||||
|
TCPSocket.expects(:new).with("localhost", 2003).returns(mock)
|
||||||
|
|
||||||
|
run_agent(config_str)
|
||||||
|
|
||||||
|
mock.rewind
|
||||||
|
lines = mock.readlines
|
||||||
|
|
||||||
|
insist { lines.size } == 1
|
||||||
|
insist { lines[0] } =~ /^foo 123.0 \d{10,}\n$/
|
||||||
|
|
||||||
|
puts "END - match one key with invalid metric_format"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "fields are metrics = false" do
|
||||||
|
describe "metrics_format not set" do
|
||||||
|
describe "match one key with metrics list" do
|
||||||
|
config_str = <<-CONFIG
|
||||||
|
input {
|
||||||
|
generator {
|
||||||
|
message => "foo=123"
|
||||||
|
count => 1
|
||||||
|
type => "generator"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
filter {
|
||||||
|
kv { }
|
||||||
|
}
|
||||||
|
|
||||||
|
output {
|
||||||
|
graphite {
|
||||||
|
host => "localhost"
|
||||||
|
port => 2003
|
||||||
|
fields_are_metrics => false
|
||||||
|
include_metrics => ["foo"]
|
||||||
|
metrics => [ "custom.foo", "%{foo}" ]
|
||||||
|
debug => true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
mock = StringIO.new
|
||||||
|
TCPSocket.expects(:new).with("localhost", 2003).returns(mock)
|
||||||
|
|
||||||
|
run_agent(config_str)
|
||||||
|
|
||||||
|
mock.rewind
|
||||||
|
lines = mock.readlines
|
||||||
|
|
||||||
|
insist { lines.size } == 1
|
||||||
|
insist { lines[0] } =~ /^custom.foo 123.0 \d{10,}\n$/
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Add table
Add a link
Reference in a new issue