mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
Clear the collector when logstash reload configuration
When logstash reload a configuration the collector should remove all the collected metrics from the store since it wont make any sense with a new configuration. You should have the same behavior as when you restart logstash. Fixes #4801
This commit is contained in:
parent
7123215982
commit
62c609f135
4 changed files with 100 additions and 26 deletions
|
@ -211,6 +211,12 @@ class LogStash::Agent
|
|||
return unless pipeline.is_a?(LogStash::Pipeline)
|
||||
return if pipeline.ready?
|
||||
@logger.info("starting pipeline", :id => id)
|
||||
|
||||
# Reset the current collected stats,
|
||||
# starting a pipeline with a new configuration should be the same as restarting
|
||||
# logstash.
|
||||
reset_collector
|
||||
|
||||
Thread.new do
|
||||
LogStash::Util.set_thread_name("pipeline.#{id}")
|
||||
begin
|
||||
|
@ -252,4 +258,8 @@ class LogStash::Agent
|
|||
def clean_state?
|
||||
@pipelines.empty?
|
||||
end
|
||||
|
||||
def reset_collector
|
||||
LogStash::Instrument::Collector.instance.clear
|
||||
end
|
||||
end # class LogStash::Agent
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
# encoding: utf-8
|
||||
require 'spec_helper'
|
||||
require 'stud/temporary'
|
||||
require "spec_helper"
|
||||
require "stud/temporary"
|
||||
require "logstash/inputs/generator"
|
||||
require_relative "../support/mocks_classes"
|
||||
|
||||
describe LogStash::Agent do
|
||||
|
||||
|
@ -313,4 +315,63 @@ describe LogStash::Agent do
|
|||
expect(subject.uptime).to be >= 0
|
||||
end
|
||||
end
|
||||
|
||||
context "metrics after config reloading" do
|
||||
let(:dummy_output) { DummyOutput.new }
|
||||
let(:config) { "input { generator { } } output { dummyoutput { } }" }
|
||||
let(:new_config_generator_counter) { 50 }
|
||||
let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput {} }" }
|
||||
let(:config_path) do
|
||||
f = Stud::Temporary.file
|
||||
f.write(config)
|
||||
f.close
|
||||
f.path
|
||||
end
|
||||
let(:interval) { 0.2 }
|
||||
let(:pipeline_settings) { { :pipeline_workers => 4,
|
||||
:config_path => config_path } }
|
||||
|
||||
let(:agent_args) do
|
||||
super.merge({ :auto_reload => true,
|
||||
:reload_interval => interval,
|
||||
:collect_metric => true })
|
||||
end
|
||||
|
||||
before :each do
|
||||
allow(DummyOutput).to receive(:new).at_least(:once).with(anything).and_return(dummy_output)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
|
||||
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
|
||||
|
||||
@t = Thread.new do
|
||||
subject.register_pipeline("main", pipeline_settings)
|
||||
subject.execute
|
||||
end
|
||||
|
||||
sleep(2)
|
||||
end
|
||||
|
||||
after :each do
|
||||
Stud.stop!(@t)
|
||||
@t.join
|
||||
end
|
||||
|
||||
it "resets the metric collector" do
|
||||
# We know that the store has more events that the next expect
|
||||
sleep(0.01) while dummy_output.events.size < new_config_generator_counter
|
||||
snapshot = LogStash::Instrument::Collector.instance.snapshot_metric
|
||||
expect(snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value).to be > new_config_generator_counter
|
||||
|
||||
# update the configuration and give some time to logstash to pick it up and do the work
|
||||
IO.write(config_path, new_config)
|
||||
|
||||
sleep(interval * 3) # Give time to reload the config
|
||||
|
||||
# Since thre is multiple threads involved and with the configuration reload,
|
||||
# It can take some time to the states be visible in the store
|
||||
sleep(0.01) while dummy_output.events.size < new_config_generator_counter
|
||||
snapshot = LogStash::Instrument::Collector.instance.snapshot_metric
|
||||
expect(snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value).to eq(new_config_generator_counter)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
require "spec_helper"
|
||||
require "logstash/inputs/generator"
|
||||
require "logstash/filters/multiline"
|
||||
require_relative "../support/mocks_classes"
|
||||
|
||||
class DummyInput < LogStash::Inputs::Base
|
||||
config_name "dummyinput"
|
||||
|
@ -48,30 +49,6 @@ class DummyCodec < LogStash::Codecs::Base
|
|||
end
|
||||
end
|
||||
|
||||
class DummyOutput < LogStash::Outputs::Base
|
||||
config_name "dummyoutput"
|
||||
milestone 2
|
||||
|
||||
attr_reader :num_closes, :events
|
||||
|
||||
def initialize(params={})
|
||||
super
|
||||
@num_closes = 0
|
||||
@events = []
|
||||
end
|
||||
|
||||
def register
|
||||
end
|
||||
|
||||
def receive(event)
|
||||
@events << event
|
||||
end
|
||||
|
||||
def close
|
||||
@num_closes = 1
|
||||
end
|
||||
end
|
||||
|
||||
class DummyOutputMore < DummyOutput
|
||||
config_name "dummyoutputmore"
|
||||
end
|
||||
|
|
26
logstash-core/spec/support/mocks_classes.rb
Normal file
26
logstash-core/spec/support/mocks_classes.rb
Normal file
|
@ -0,0 +1,26 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/outputs/base"
|
||||
|
||||
class DummyOutput < LogStash::Outputs::Base
|
||||
config_name "dummyoutput"
|
||||
milestone 2
|
||||
|
||||
attr_reader :num_closes, :events
|
||||
|
||||
def initialize(params={})
|
||||
super
|
||||
@num_closes = 0
|
||||
@events = []
|
||||
end
|
||||
|
||||
def register
|
||||
end
|
||||
|
||||
def receive(event)
|
||||
@events << event
|
||||
end
|
||||
|
||||
def close
|
||||
@num_closes = 1
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue