mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
587debd3e6
commit
47239b30c7
13 changed files with 76 additions and 142 deletions
|
@ -55,7 +55,6 @@ class LogStash::Agent
|
|||
@thread = Thread.current # this var is implicilty used by Stud.stop?
|
||||
@logger.info("starting agent")
|
||||
|
||||
start_background_services
|
||||
start_pipelines
|
||||
start_webserver
|
||||
|
||||
|
@ -114,8 +113,13 @@ class LogStash::Agent
|
|||
((Time.now.to_f - STARTED_AT.to_f) * 1000.0).to_i
|
||||
end
|
||||
|
||||
def stop_collecting_metric
|
||||
@collector.stop
|
||||
@periodic_pollers.stop
|
||||
end
|
||||
|
||||
def shutdown
|
||||
stop_background_services
|
||||
stop_collecting_metric
|
||||
stop_webserver
|
||||
shutdown_pipelines
|
||||
end
|
||||
|
@ -133,7 +137,7 @@ class LogStash::Agent
|
|||
private
|
||||
def start_webserver
|
||||
options = {:http_host => @http_host, :http_port => @http_port, :http_environment => @http_environment }
|
||||
@webserver = LogStash::WebServer.new(@logger, options)
|
||||
@webserver = LogStash::WebServer.new(@logger, self, options)
|
||||
Thread.new(@webserver) do |webserver|
|
||||
LogStash::Util.set_thread_name("Api Webserver")
|
||||
webserver.run
|
||||
|
@ -144,20 +148,6 @@ class LogStash::Agent
|
|||
@webserver.stop if @webserver
|
||||
end
|
||||
|
||||
def start_background_services
|
||||
if collect_metrics?
|
||||
@logger.debug("Agent: Starting metric periodic pollers")
|
||||
@periodic_pollers.start
|
||||
end
|
||||
end
|
||||
|
||||
def stop_background_services
|
||||
if collect_metrics?
|
||||
@logger.debug("Agent: Stopping metric periodic pollers")
|
||||
@periodic_pollers.stop
|
||||
end
|
||||
end
|
||||
|
||||
def configure_metrics_collectors
|
||||
@collector = LogStash::Instrument::Collector.new
|
||||
|
||||
|
@ -170,13 +160,12 @@ class LogStash::Agent
|
|||
|
||||
|
||||
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric)
|
||||
@periodic_pollers.start
|
||||
end
|
||||
|
||||
def reset_metrics_collectors
|
||||
@periodic_pollers.stop
|
||||
@collector.stop
|
||||
stop_collecting_metric
|
||||
configure_metrics_collectors
|
||||
@periodic_pollers.start
|
||||
end
|
||||
|
||||
def collect_metrics?
|
||||
|
|
|
@ -21,9 +21,9 @@ module LogStash
|
|||
|
||||
helpers AppHelpers
|
||||
|
||||
def initialize(app=nil)
|
||||
def initialize(app=nil, agent)
|
||||
super(app)
|
||||
@factory = ::LogStash::Api::CommandFactory.new(LogStash::Api::Service.instance)
|
||||
@factory = ::LogStash::Api::CommandFactory.new(LogStash::Api::Service.new(agent))
|
||||
end
|
||||
|
||||
not_found do
|
||||
|
|
|
@ -19,7 +19,6 @@ module LogStash
|
|||
as = options[:human] ? :string : :json
|
||||
respond_with(stats.hot_threads(options), {:as => as})
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -73,8 +73,9 @@ module LogStash
|
|||
end
|
||||
end
|
||||
|
||||
def self.app(logger, environment)
|
||||
namespaces = rack_namespaces
|
||||
def self.app(logger, agent, environment)
|
||||
namespaces = rack_namespaces(agent)
|
||||
|
||||
Rack::Builder.new do
|
||||
# Custom logger object. Rack CommonLogger does not work with cabin
|
||||
use ApiLogger, logger
|
||||
|
@ -87,21 +88,23 @@ module LogStash
|
|||
use ApiErrorHandler, logger
|
||||
end
|
||||
|
||||
run LogStash::Api::Modules::Root
|
||||
run LogStash::Api::Modules::Root.new(nil, agent)
|
||||
namespaces.each_pair do |namespace, app|
|
||||
map(namespace) do
|
||||
run app
|
||||
# Pass down a reference to the current agent
|
||||
# This allow the API to have direct access to the collector
|
||||
run app.new(nil, agent)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.rack_namespaces
|
||||
def self.rack_namespaces(agent)
|
||||
{
|
||||
"/_node" => LogStash::Api::Modules::Node,
|
||||
"/_stats" => LogStash::Api::Modules::Stats,
|
||||
"/_node/stats" => LogStash::Api::Modules::NodeStats,
|
||||
"/_plugins" => LogStash::Api::Modules::Plugins
|
||||
"/_plugins" => LogStash::Api::Modules::Plugins,
|
||||
}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,40 +5,21 @@ require "logstash/util/loggable"
|
|||
module LogStash
|
||||
module Api
|
||||
class Service
|
||||
|
||||
include Singleton
|
||||
include LogStash::Util::Loggable
|
||||
|
||||
def initialize
|
||||
@snapshot_rotation_mutex = Mutex.new
|
||||
@snapshot = nil
|
||||
attr_reader :agent
|
||||
|
||||
def initialize(agent)
|
||||
@agent = agent
|
||||
logger.debug("[api-service] start") if logger.debug?
|
||||
LogStash::Instrument::Collector.instance.add_observer(self)
|
||||
end
|
||||
|
||||
def stop
|
||||
logger.debug("[api-service] stop") if logger.debug?
|
||||
LogStash::Instrument::Collector.instance.delete_observer(self)
|
||||
end
|
||||
|
||||
def agent
|
||||
LogStash::Instrument::Collector.instance.agent
|
||||
end
|
||||
|
||||
def started?
|
||||
!@snapshot.nil? && has_counters?
|
||||
end
|
||||
|
||||
def update(snapshot)
|
||||
logger.debug("[api-service] snapshot received", :snapshot_time => snapshot.created_at) if logger.debug?
|
||||
|
||||
@snapshot_rotation_mutex.synchronize do
|
||||
@snapshot = snapshot
|
||||
end
|
||||
true
|
||||
end
|
||||
|
||||
def snapshot
|
||||
@snapshot_rotation_mutex.synchronize { @snapshot }
|
||||
agent.metric.collector.snapshot_metric
|
||||
end
|
||||
|
||||
def get_shallow(*path)
|
||||
|
@ -58,15 +39,7 @@ module LogStash
|
|||
private
|
||||
|
||||
def has_counters?
|
||||
(["LogStash::Instrument::MetricType::Counter", "LogStash::Instrument::MetricType::Gauge"] - metric_types).empty?
|
||||
end
|
||||
|
||||
def metric_types
|
||||
types = []
|
||||
@snapshot_rotation_mutex.synchronize do
|
||||
types = @snapshot.metric_store.all.map { |t| t.class.to_s }
|
||||
end
|
||||
return types
|
||||
true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -21,7 +21,7 @@ module LogStash module Inputs
|
|||
@queue = queue
|
||||
|
||||
# we register to the collector after receiving the pipeline queue
|
||||
LogStash::Instrument::Collector.instance.add_observer(self)
|
||||
metric.collector.add_observer(self)
|
||||
|
||||
# Keep this plugin thread alive,
|
||||
# until we shutdown the metric pipeline
|
||||
|
@ -30,7 +30,7 @@ module LogStash module Inputs
|
|||
|
||||
def stop
|
||||
@logger.debug("Metrics input: stopped")
|
||||
LogStash::Instrument::Collector.instance.delete_observer(self)
|
||||
metric.collector.delete_observer(self)
|
||||
end
|
||||
|
||||
def update(snapshot)
|
||||
|
|
|
@ -7,7 +7,7 @@ module LogStash
|
|||
class WebServer
|
||||
extend Forwardable
|
||||
|
||||
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events, :http_host, :http_port, :http_environment
|
||||
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events, :http_host, :http_port, :http_environment, :agent
|
||||
|
||||
def_delegator :@runner, :stats
|
||||
|
||||
|
@ -15,12 +15,13 @@ module LogStash
|
|||
DEFAULT_PORT = 9600.freeze
|
||||
DEFAULT_ENVIRONMENT = 'production'.freeze
|
||||
|
||||
def initialize(logger, options={})
|
||||
@logger = logger
|
||||
@http_host = options[:http_host] || DEFAULT_HOST
|
||||
@http_port = options[:http_port] || DEFAULT_PORT
|
||||
def initialize(logger, agent, options={})
|
||||
@logger = logger
|
||||
@agent = agent
|
||||
@http_host = options[:http_host] || DEFAULT_HOST
|
||||
@http_port = options[:http_port] || DEFAULT_PORT
|
||||
@http_environment = options[:http_environment] || DEFAULT_ENVIRONMENT
|
||||
@options = {}
|
||||
@options = {}
|
||||
@cli_options = options.merge({ :rackup => ::File.join(::File.dirname(__FILE__), "api", "init.ru"),
|
||||
:binds => ["tcp://#{http_host}:#{http_port}"],
|
||||
:debug => logger.debug?,
|
||||
|
@ -37,7 +38,7 @@ module LogStash
|
|||
|
||||
stop # Just in case
|
||||
|
||||
app = LogStash::Api::RackApp.app(logger, http_environment)
|
||||
app = LogStash::Api::RackApp.app(logger, agent, http_environment)
|
||||
@server = ::Puma::Server.new(app)
|
||||
@server.add_tcp_listener(http_host, http_port)
|
||||
|
||||
|
|
|
@ -5,12 +5,7 @@ require "logstash/api/modules/node"
|
|||
require "logstash/json"
|
||||
|
||||
describe LogStash::Api::Modules::Node do
|
||||
|
||||
include Rack::Test::Methods
|
||||
|
||||
def app()
|
||||
described_class
|
||||
end
|
||||
include_context "api setup"
|
||||
|
||||
describe "#hot threads" do
|
||||
|
||||
|
|
|
@ -5,12 +5,9 @@ require "logstash/api/modules/node_stats"
|
|||
require "logstash/json"
|
||||
|
||||
describe LogStash::Api::Modules::NodeStats do
|
||||
include Rack::Test::Methods
|
||||
extend ResourceDSLMethods
|
||||
include_context "api setup"
|
||||
|
||||
def app() # Used by Rack::Test::Methods
|
||||
described_class
|
||||
end
|
||||
extend ResourceDSLMethods
|
||||
|
||||
# DSL describing response structure
|
||||
root_structure = {
|
||||
|
|
|
@ -5,12 +5,7 @@ require "logstash/api/modules/plugins"
|
|||
require "logstash/json"
|
||||
|
||||
describe LogStash::Api::Modules::Plugins do
|
||||
|
||||
include Rack::Test::Methods
|
||||
|
||||
def app()
|
||||
described_class
|
||||
end
|
||||
include_context "api setup"
|
||||
|
||||
before(:all) do
|
||||
get "/"
|
||||
|
@ -52,6 +47,5 @@ describe LogStash::Api::Modules::Plugins do
|
|||
expect(plugin["version"]).not_to be_empty
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,16 +5,11 @@ require "logstash/api/modules/root"
|
|||
require "logstash/json"
|
||||
|
||||
describe LogStash::Api::Modules::Root do
|
||||
|
||||
include Rack::Test::Methods
|
||||
|
||||
def app()
|
||||
described_class
|
||||
end
|
||||
include_context "api setup"
|
||||
|
||||
it "should respond to root resource" do
|
||||
do_request { get "/" }
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
# encoding: utf-8
|
||||
API_ROOT = File.expand_path(File.join(File.dirname(__FILE__), "..", "..", "lib", "logstash", "api"))
|
||||
|
||||
|
||||
|
||||
require "logstash/devutils/rspec/spec_helper"
|
||||
|
||||
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__)))
|
||||
require "lib/api/support/resource_dsl_methods"
|
||||
|
||||
require 'rspec/expectations'
|
||||
require "logstash/settings"
|
||||
require 'rack/test'
|
||||
require 'rspec'
|
||||
|
@ -74,14 +71,13 @@ class LogStashRunner
|
|||
private
|
||||
|
||||
def wait_until_ready
|
||||
# Wait until the service and pipeline have started
|
||||
while !(LogStash::Api::Service.instance.started? && agent.pipelines["main"].running?) do
|
||||
sleep 0.5
|
||||
end
|
||||
# # Wait until the service and pipeline have started
|
||||
# while !(LogStash::Api::Service.instance.started? && agent.pipelines["main"].running?) do
|
||||
# sleep 0.5
|
||||
# end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
##
|
||||
# Method used to wrap up a request in between of a running
|
||||
# pipeline, this makes the whole execution model easier and
|
||||
|
@ -95,30 +91,6 @@ def do_request(&block)
|
|||
ret_val
|
||||
end
|
||||
|
||||
##
|
||||
# Helper module that setups necessary mocks when doing the requests,
|
||||
# this could be just included in the test and the runner will be
|
||||
# started managed for all tests.
|
||||
##
|
||||
module LogStash; module RSpec; module RunnerConfig
|
||||
def self.included(klass)
|
||||
klass.before(:all) do
|
||||
LogStashRunner.instance.start
|
||||
end
|
||||
|
||||
klass.before(:each) do
|
||||
runner = LogStashRunner.instance
|
||||
allow(LogStash::Instrument::Collector.instance).to receive(:agent).and_return(runner.agent)
|
||||
end
|
||||
|
||||
klass.after(:all) do
|
||||
LogStashRunner.instance.stop
|
||||
end
|
||||
end
|
||||
end; end; end
|
||||
|
||||
require 'rspec/expectations'
|
||||
|
||||
RSpec::Matchers.define :be_available? do
|
||||
match do |plugin|
|
||||
begin
|
||||
|
@ -129,3 +101,20 @@ RSpec::Matchers.define :be_available? do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
shared_context "api setup" do
|
||||
before :all do
|
||||
@runner = LogStashRunner.new
|
||||
@runner.start
|
||||
end
|
||||
|
||||
after :all do
|
||||
@runner.stop
|
||||
end
|
||||
|
||||
include Rack::Test::Methods
|
||||
|
||||
def app()
|
||||
described_class.new(nil, @runner.agent)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,15 +3,17 @@ require "logstash/inputs/metrics"
|
|||
require "spec_helper"
|
||||
|
||||
describe LogStash::Inputs::Metrics do
|
||||
before :each do
|
||||
LogStash::Instrument::Collector.instance.clear
|
||||
end
|
||||
|
||||
let(:collector) { LogStash::Instrument::Collector.new }
|
||||
let(:metric) { LogStash::Instrument::Metric.new(collector) }
|
||||
let(:queue) { [] }
|
||||
|
||||
before :each do
|
||||
allow(subject).to receive(:metric).and_return(metric)
|
||||
end
|
||||
|
||||
describe "#run" do
|
||||
it "should register itself to the collector observer" do
|
||||
expect(LogStash::Instrument::Collector.instance).to receive(:add_observer).with(subject)
|
||||
expect(collector).to receive(:add_observer).with(subject)
|
||||
t = Thread.new { subject.run(queue) }
|
||||
sleep(0.1) # give a bit of time to the thread to start
|
||||
subject.stop
|
||||
|
@ -19,24 +21,21 @@ describe LogStash::Inputs::Metrics do
|
|||
end
|
||||
|
||||
describe "#update" do
|
||||
let(:namespaces) { [:root, :base] }
|
||||
let(:key) { :foo }
|
||||
let(:metric_store) { LogStash::Instrument::MetricStore.new }
|
||||
|
||||
it "should fill up the queue with received events" do
|
||||
Thread.new { subject.run(queue) }
|
||||
sleep(0.1)
|
||||
subject.stop
|
||||
|
||||
metric_store.fetch_or_store(namespaces, key, LogStash::Instrument::MetricType::Counter.new(namespaces, key))
|
||||
subject.update(LogStash::Instrument::Snapshot.new(metric_store))
|
||||
metric.increment([:root, :test], :plugin)
|
||||
|
||||
subject.update(collector.snapshot_metric)
|
||||
expect(queue.count).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#stop" do
|
||||
it "should remove itself from the the collector observer" do
|
||||
expect(LogStash::Instrument::Collector.instance).to receive(:delete_observer).with(subject)
|
||||
expect(collector).to receive(:delete_observer).with(subject)
|
||||
t = Thread.new { subject.run(queue) }
|
||||
sleep(0.1) # give a bit of time to the thread to start
|
||||
subject.stop
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue