ensure metric collection is disabled when metric.collect is false

Fixes #6445
This commit is contained in:
Joao Duarte 2016-12-20 15:12:38 +00:00 committed by João Duarte
parent 1bff586c8a
commit 90c364e903
4 changed files with 105 additions and 66 deletions

View file

@ -70,7 +70,11 @@ module LogStash; class Pipeline
# This needs to be configured before we evaluate the code to make
# sure the metric instance is correctly send to the plugins to make the namespace scoping work
@metric = namespaced_metric.nil? ? Instrument::NullMetric.new : namespaced_metric
@metric = if namespaced_metric
settings.get("metric.collect") ? namespaced_metric : Instrument::NullMetric.new(namespaced_metric.collector)
else
Instrument::NullMetric.new
end
grammar = LogStashConfigParser.new
@config = grammar.parse(config_str)

View file

@ -2,31 +2,7 @@
require "spec_helper"
require "logstash/pipeline"
require "logstash/pipeline_reporter"
class DummyOutput < LogStash::Outputs::Base
config_name "dummyoutput"
milestone 2
attr_reader :num_closes, :events
def initialize(params={})
super
@num_closes = 0
@events = []
end
def register
end
def receive(event)
@events << event
end
def close
@num_closes += 1
end
end
require_relative "../support/mocks_classes"
#TODO: Figure out how to add more tests that actually cover inflight events
#This will require some janky multithreading stuff
@ -39,7 +15,7 @@ describe LogStash::PipelineReporter do
let(:reporter) { pipeline.reporter }
before do
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_call_original
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_call_original

View file

@ -49,7 +49,7 @@ class DummyCodec < LogStash::Codecs::Base
end
end
class DummyOutputMore < DummyOutput
class DummyOutputMore < ::LogStash::Outputs::DummyOutput
config_name "dummyoutputmore"
end
@ -158,7 +158,7 @@ describe LogStash::Pipeline do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummysafefilter").and_return(DummySafeFilter)
end
@ -258,7 +258,7 @@ describe LogStash::Pipeline do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
@ -313,7 +313,7 @@ describe LogStash::Pipeline do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:config) { "input { dummyinput {} } output { dummyoutput {} }"}
@ -378,12 +378,12 @@ describe LogStash::Pipeline do
let(:pipeline_settings) { { "pipeline.batch.size" => batch_size, "pipeline.workers" => 1 } }
let(:pipeline) { LogStash::Pipeline.new(config, pipeline_settings_obj) }
let(:logger) { pipeline.logger }
let(:warning_prefix) { /CAUTION: Recommended inflight events max exceeded!/ }
let(:warning_prefix) { Regexp.new("CAUTION: Recommended inflight events max exceeded!") }
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(logger).to receive(:warn)
thread = Thread.new { pipeline.run }
pipeline.shutdown
@ -435,28 +435,87 @@ describe LogStash::Pipeline do
end
context "metrics" do
config <<-CONFIG
input { }
filter { }
output { }
CONFIG
config = "input { } filter { } output { }"
it "uses a `NullMetric` object if `metric.collect` is set to false" do
settings = double("LogStash::SETTINGS")
let(:settings) { LogStash::SETTINGS.clone }
subject { LogStash::Pipeline.new(config, settings, metric) }
allow(settings).to receive(:get_value).with("pipeline.id").and_return("main")
allow(settings).to receive(:get_value).with("metric.collect").and_return(false)
allow(settings).to receive(:get_value).with("config.debug").and_return(false)
allow(settings).to receive(:get).with("queue.type").and_return("memory")
allow(settings).to receive(:get).with("queue.page_capacity").and_return(1024 * 1024)
allow(settings).to receive(:get).with("queue.max_events").and_return(250)
allow(settings).to receive(:get).with("queue.max_bytes").and_return(1024 * 1024 * 1024)
allow(settings).to receive(:get).with("queue.checkpoint.acks").and_return(1024)
allow(settings).to receive(:get).with("queue.checkpoint.writes").and_return(1024)
allow(settings).to receive(:get).with("queue.checkpoint.interval").and_return(1000)
context "when metric.collect is disabled" do
before :each do
settings.set("metric.collect", false)
end
pipeline = LogStash::Pipeline.new(config, settings)
expect(pipeline.metric).to be_kind_of(LogStash::Instrument::NullMetric)
context "if namespaced_metric is nil" do
let(:metric) { nil }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
end
context "if namespaced_metric is a Metric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::Metric.new(collector) }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
context "if namespaced_metric is a NullMetric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::NullMetric.new(collector) }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(::LogStash::Instrument::NullMetric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
end
context "when metric.collect is enabled" do
before :each do
settings.set("metric.collect", true)
end
context "if namespaced_metric is nil" do
let(:metric) { nil }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
end
context "if namespaced_metric is a Metric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::Metric.new(collector) }
it "uses a `Metric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::Metric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
context "if namespaced_metric is a NullMetric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::NullMetric.new(collector) }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
end
end
@ -465,7 +524,7 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinputgenerator").and_return(DummyInputGenerator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore)
end
@ -501,14 +560,14 @@ describe LogStash::Pipeline do
}
EOS
end
let(:output) { DummyOutput.new }
let(:output) { ::LogStash::Outputs::DummyOutput.new }
before do
allow(DummyOutput).to receive(:new).with(any_args).and_return(output)
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
it "flushes the buffered contents of the filter" do
@ -531,7 +590,7 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
@ -643,22 +702,22 @@ describe LogStash::Pipeline do
}
EOS
end
let(:dummyoutput) { DummyOutput.new({ "id" => dummy_output_id }) }
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) }
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
before :each do
allow(DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
Thread.new { subject.run }
# make sure we have received all the generated events
wait(3).for do
# give us a bit of time to flush the events
dummyoutput.events.size < number_of_events
end.to be_falsey
dummyoutput.events.size >= number_of_events
end.to be_truthy
end
after :each do
@ -702,7 +761,7 @@ describe LogStash::Pipeline do
it "populates the name of the output plugin" do
plugin_name = dummy_output_id.to_sym
expect(collected_metric[:stats][:pipelines][:main][:plugins][:outputs][plugin_name][:name].value).to eq(DummyOutput.config_name)
expect(collected_metric[:stats][:pipelines][:main][:plugins][:outputs][plugin_name][:name].value).to eq(::LogStash::Outputs::DummyOutput.config_name)
end
it "populates the name of the filter plugin" do
@ -719,7 +778,7 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }

View file

@ -12,7 +12,7 @@ module LogStash module Outputs
def initialize(params={})
super
@num_closes = 0
@events = Queue.new
@events = []
end
def register
@ -23,7 +23,7 @@ module LogStash module Outputs
end
def close
@num_closes = 1
@num_closes += 1
end
end