Fix the ApiTest and the Metric inputs tests

Fixes #5446
This commit is contained in:
Pier-Hugues Pellerin 2016-06-03 09:27:55 -04:00
parent 20cede5b0a
commit 420abaef0b
13 changed files with 76 additions and 142 deletions

View file

@ -55,7 +55,6 @@ class LogStash::Agent
@thread = Thread.current # this var is implicilty used by Stud.stop?
@logger.info("starting agent")
start_background_services
start_pipelines
start_webserver
@ -114,8 +113,13 @@ class LogStash::Agent
((Time.now.to_f - STARTED_AT.to_f) * 1000.0).to_i
end
def stop_collecting_metric
@collector.stop
@periodic_pollers.stop
end
def shutdown
stop_background_services
stop_collecting_metric
stop_webserver
shutdown_pipelines
end
@ -133,7 +137,7 @@ class LogStash::Agent
private
def start_webserver
options = {:http_host => @http_host, :http_port => @http_port, :http_environment => @http_environment }
@webserver = LogStash::WebServer.new(@logger, options)
@webserver = LogStash::WebServer.new(@logger, self, options)
Thread.new(@webserver) do |webserver|
LogStash::Util.set_thread_name("Api Webserver")
webserver.run
@ -144,20 +148,6 @@ class LogStash::Agent
@webserver.stop if @webserver
end
def start_background_services
if collect_metrics?
@logger.debug("Agent: Starting metric periodic pollers")
@periodic_pollers.start
end
end
def stop_background_services
if collect_metrics?
@logger.debug("Agent: Stopping metric periodic pollers")
@periodic_pollers.stop
end
end
def configure_metrics_collectors
@collector = LogStash::Instrument::Collector.new
@ -170,13 +160,12 @@ class LogStash::Agent
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric)
@periodic_pollers.start
end
def reset_metrics_collectors
@periodic_pollers.stop
@collector.stop
stop_collecting_metric
configure_metrics_collectors
@periodic_pollers.start
end
def collect_metrics?

View file

@ -21,9 +21,9 @@ module LogStash
helpers AppHelpers
def initialize(app=nil)
def initialize(app=nil, agent)
super(app)
@factory = ::LogStash::Api::CommandFactory.new(LogStash::Api::Service.instance)
@factory = ::LogStash::Api::CommandFactory.new(LogStash::Api::Service.new(agent))
end
not_found do

View file

@ -19,7 +19,6 @@ module LogStash
as = options[:human] ? :string : :json
respond_with(stats.hot_threads(options), {:as => as})
end
end
end
end

View file

@ -73,8 +73,9 @@ module LogStash
end
end
def self.app(logger, environment)
namespaces = rack_namespaces
def self.app(logger, agent, environment)
namespaces = rack_namespaces(agent)
Rack::Builder.new do
# Custom logger object. Rack CommonLogger does not work with cabin
use ApiLogger, logger
@ -87,21 +88,23 @@ module LogStash
use ApiErrorHandler, logger
end
run LogStash::Api::Modules::Root
run LogStash::Api::Modules::Root.new(nil, agent)
namespaces.each_pair do |namespace, app|
map(namespace) do
run app
# Pass down a reference to the current agent
# This allow the API to have direct access to the collector
run app.new(nil, agent)
end
end
end
end
def self.rack_namespaces
def self.rack_namespaces(agent)
{
"/_node" => LogStash::Api::Modules::Node,
"/_stats" => LogStash::Api::Modules::Stats,
"/_node/stats" => LogStash::Api::Modules::NodeStats,
"/_plugins" => LogStash::Api::Modules::Plugins
"/_plugins" => LogStash::Api::Modules::Plugins,
}
end
end

View file

@ -5,40 +5,21 @@ require "logstash/util/loggable"
module LogStash
module Api
class Service
include Singleton
include LogStash::Util::Loggable
def initialize
@snapshot_rotation_mutex = Mutex.new
@snapshot = nil
attr_reader :agent
def initialize(agent)
@agent = agent
logger.debug("[api-service] start") if logger.debug?
LogStash::Instrument::Collector.instance.add_observer(self)
end
def stop
logger.debug("[api-service] stop") if logger.debug?
LogStash::Instrument::Collector.instance.delete_observer(self)
end
def agent
LogStash::Instrument::Collector.instance.agent
end
def started?
!@snapshot.nil? && has_counters?
end
def update(snapshot)
logger.debug("[api-service] snapshot received", :snapshot_time => snapshot.created_at) if logger.debug?
@snapshot_rotation_mutex.synchronize do
@snapshot = snapshot
end
true
end
def snapshot
@snapshot_rotation_mutex.synchronize { @snapshot }
agent.metric.collector.snapshot_metric
end
def get_shallow(*path)
@ -58,15 +39,7 @@ module LogStash
private
def has_counters?
(["LogStash::Instrument::MetricType::Counter", "LogStash::Instrument::MetricType::Gauge"] - metric_types).empty?
end
def metric_types
types = []
@snapshot_rotation_mutex.synchronize do
types = @snapshot.metric_store.all.map { |t| t.class.to_s }
end
return types
true
end
end
end

View file

@ -21,7 +21,7 @@ module LogStash module Inputs
@queue = queue
# we register to the collector after receiving the pipeline queue
LogStash::Instrument::Collector.instance.add_observer(self)
metric.collector.add_observer(self)
# Keep this plugin thread alive,
# until we shutdown the metric pipeline
@ -30,7 +30,7 @@ module LogStash module Inputs
def stop
@logger.debug("Metrics input: stopped")
LogStash::Instrument::Collector.instance.delete_observer(self)
metric.collector.delete_observer(self)
end
def update(snapshot)

View file

@ -7,7 +7,7 @@ module LogStash
class WebServer
extend Forwardable
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events, :http_host, :http_port, :http_environment
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events, :http_host, :http_port, :http_environment, :agent
def_delegator :@runner, :stats
@ -15,12 +15,13 @@ module LogStash
DEFAULT_PORT = 9600.freeze
DEFAULT_ENVIRONMENT = 'production'.freeze
def initialize(logger, options={})
@logger = logger
@http_host = options[:http_host] || DEFAULT_HOST
@http_port = options[:http_port] || DEFAULT_PORT
def initialize(logger, agent, options={})
@logger = logger
@agent = agent
@http_host = options[:http_host] || DEFAULT_HOST
@http_port = options[:http_port] || DEFAULT_PORT
@http_environment = options[:http_environment] || DEFAULT_ENVIRONMENT
@options = {}
@options = {}
@cli_options = options.merge({ :rackup => ::File.join(::File.dirname(__FILE__), "api", "init.ru"),
:binds => ["tcp://#{http_host}:#{http_port}"],
:debug => logger.debug?,
@ -37,7 +38,7 @@ module LogStash
stop # Just in case
app = LogStash::Api::RackApp.app(logger, http_environment)
app = LogStash::Api::RackApp.app(logger, agent, http_environment)
@server = ::Puma::Server.new(app)
@server.add_tcp_listener(http_host, http_port)

View file

@ -5,12 +5,7 @@ require "logstash/api/modules/node"
require "logstash/json"
describe LogStash::Api::Modules::Node do
include Rack::Test::Methods
def app()
described_class
end
include_context "api setup"
describe "#hot threads" do

View file

@ -5,12 +5,9 @@ require "logstash/api/modules/node_stats"
require "logstash/json"
describe LogStash::Api::Modules::NodeStats do
include Rack::Test::Methods
extend ResourceDSLMethods
include_context "api setup"
def app() # Used by Rack::Test::Methods
described_class
end
extend ResourceDSLMethods
# DSL describing response structure
root_structure = {

View file

@ -5,12 +5,7 @@ require "logstash/api/modules/plugins"
require "logstash/json"
describe LogStash::Api::Modules::Plugins do
include Rack::Test::Methods
def app()
described_class
end
include_context "api setup"
before(:all) do
get "/"
@ -52,6 +47,5 @@ describe LogStash::Api::Modules::Plugins do
expect(plugin["version"]).not_to be_empty
end
end
end
end

View file

@ -5,16 +5,11 @@ require "logstash/api/modules/root"
require "logstash/json"
describe LogStash::Api::Modules::Root do
include Rack::Test::Methods
def app()
described_class
end
include_context "api setup"
it "should respond to root resource" do
do_request { get "/" }
expect(last_response).to be_ok
end
end

View file

@ -1,13 +1,10 @@
# encoding: utf-8
API_ROOT = File.expand_path(File.join(File.dirname(__FILE__), "..", "..", "lib", "logstash", "api"))
require "logstash/devutils/rspec/spec_helper"
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__)))
require "lib/api/support/resource_dsl_methods"
require 'rspec/expectations'
require "logstash/settings"
require 'rack/test'
require 'rspec'
@ -74,14 +71,13 @@ class LogStashRunner
private
def wait_until_ready
# Wait until the service and pipeline have started
while !(LogStash::Api::Service.instance.started? && agent.pipelines["main"].running?) do
sleep 0.5
end
# # Wait until the service and pipeline have started
# while !(LogStash::Api::Service.instance.started? && agent.pipelines["main"].running?) do
# sleep 0.5
# end
end
end
##
# Method used to wrap up a request in between of a running
# pipeline, this makes the whole execution model easier and
@ -95,30 +91,6 @@ def do_request(&block)
ret_val
end
##
# Helper module that setups necessary mocks when doing the requests,
# this could be just included in the test and the runner will be
# started managed for all tests.
##
module LogStash; module RSpec; module RunnerConfig
def self.included(klass)
klass.before(:all) do
LogStashRunner.instance.start
end
klass.before(:each) do
runner = LogStashRunner.instance
allow(LogStash::Instrument::Collector.instance).to receive(:agent).and_return(runner.agent)
end
klass.after(:all) do
LogStashRunner.instance.stop
end
end
end; end; end
require 'rspec/expectations'
RSpec::Matchers.define :be_available? do
match do |plugin|
begin
@ -129,3 +101,20 @@ RSpec::Matchers.define :be_available? do
end
end
end
shared_context "api setup" do
before :all do
@runner = LogStashRunner.new
@runner.start
end
after :all do
@runner.stop
end
include Rack::Test::Methods
def app()
described_class.new(nil, @runner.agent)
end
end

View file

@ -3,15 +3,17 @@ require "logstash/inputs/metrics"
require "spec_helper"
describe LogStash::Inputs::Metrics do
before :each do
LogStash::Instrument::Collector.instance.clear
end
let(:collector) { LogStash::Instrument::Collector.new }
let(:metric) { LogStash::Instrument::Metric.new(collector) }
let(:queue) { [] }
before :each do
allow(subject).to receive(:metric).and_return(metric)
end
describe "#run" do
it "should register itself to the collector observer" do
expect(LogStash::Instrument::Collector.instance).to receive(:add_observer).with(subject)
expect(collector).to receive(:add_observer).with(subject)
t = Thread.new { subject.run(queue) }
sleep(0.1) # give a bit of time to the thread to start
subject.stop
@ -19,24 +21,21 @@ describe LogStash::Inputs::Metrics do
end
describe "#update" do
let(:namespaces) { [:root, :base] }
let(:key) { :foo }
let(:metric_store) { LogStash::Instrument::MetricStore.new }
it "should fill up the queue with received events" do
Thread.new { subject.run(queue) }
sleep(0.1)
subject.stop
metric_store.fetch_or_store(namespaces, key, LogStash::Instrument::MetricType::Counter.new(namespaces, key))
subject.update(LogStash::Instrument::Snapshot.new(metric_store))
metric.increment([:root, :test], :plugin)
subject.update(collector.snapshot_metric)
expect(queue.count).to eq(1)
end
end
describe "#stop" do
it "should remove itself from the the collector observer" do
expect(LogStash::Instrument::Collector.instance).to receive(:delete_observer).with(subject)
expect(collector).to receive(:delete_observer).with(subject)
t = Thread.new { subject.run(queue) }
sleep(0.1) # give a bit of time to the thread to start
subject.stop