mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
This make all designed resources for the metrics api available, this follows the initial sketches created in #4446.
As discussed in this adds: / /_node/hot_threads /_node/stats/ /_node/stats/events /_node/stats/jvm /_stats/jvm introduces also small refactors and cleanup necessary to improve the webapi code quality. adds also a way to handle references from the webapi to the internals of logstash by passing the agent refrences to the collector, this is not perfect, but for now it solve the communication situation until furder refactor. Fixes #4652 Fixes #4653
This commit is contained in:
parent
81f83a231c
commit
23dcf467e4
29 changed files with 446 additions and 290 deletions
11
Gemfile
11
Gemfile
|
@ -22,14 +22,3 @@ gem "rubyzip", "~> 1.1.7", :group => :build
|
|||
gem "gems", "~> 0.8.3", :group => :build
|
||||
gem "rack-test", :require => "rack/test", :group => :development
|
||||
gem "flores", "~> 0.0.6", :group => :development
|
||||
gem "logstash-output-elasticsearch"
|
||||
gem "logstash-codec-plain", ">= 0"
|
||||
gem "logstash-filter-clone"
|
||||
gem "logstash-filter-mutate", ">= 0"
|
||||
gem "logstash-filter-multiline"
|
||||
gem "logstash-input-generator"
|
||||
gem "logstash-input-stdin"
|
||||
gem "logstash-input-tcp"
|
||||
gem "logstash-output-stdout"
|
||||
gem "ftw", "~> 0.0.42"
|
||||
gem "longshoreman", ">= 0"
|
||||
|
|
|
@ -3,25 +3,31 @@ PATH
|
|||
specs:
|
||||
logstash-core (3.0.0.dev-java)
|
||||
cabin (~> 0.8.0)
|
||||
chronic_duration (= 0.10.6)
|
||||
clamp (~> 0.6.5)
|
||||
concurrent-ruby (= 0.9.2)
|
||||
concurrent-ruby (= 1.0.0)
|
||||
filesize (= 0.0.4)
|
||||
gems (~> 0.8.3)
|
||||
i18n (= 0.6.9)
|
||||
jrjackson (~> 0.3.7)
|
||||
jruby-monitoring (~> 0.1)
|
||||
jruby-openssl (= 0.9.13)
|
||||
logstash-core-event (~> 3.0.0.dev)
|
||||
logstash-core-event-java (~> 3.0.0.dev)
|
||||
minitar (~> 0.5.4)
|
||||
pry (~> 0.10.1)
|
||||
puma (~> 2.15, >= 2.15.3)
|
||||
rubyzip (~> 1.1.7)
|
||||
sinatra (~> 1.4, >= 1.4.6)
|
||||
stud (~> 0.0.19)
|
||||
thread_safe (~> 0.3.5)
|
||||
treetop (< 1.5.0)
|
||||
|
||||
PATH
|
||||
remote: ./logstash-core-event
|
||||
remote: ./logstash-core-event-java
|
||||
specs:
|
||||
logstash-core-event (3.0.0.dev-java)
|
||||
logstash-core-event-java (3.0.0.dev-java)
|
||||
jar-dependencies
|
||||
ruby-maven (~> 3.3.9)
|
||||
|
||||
GEM
|
||||
remote: https://rubygems.org/
|
||||
|
@ -35,6 +41,8 @@ GEM
|
|||
cabin (0.8.1)
|
||||
childprocess (0.5.9)
|
||||
ffi (~> 1.0, >= 1.0.11)
|
||||
chronic_duration (0.10.6)
|
||||
numerizer (~> 0.1.1)
|
||||
ci_reporter (2.0.0)
|
||||
builder (>= 2.1.2)
|
||||
ci_reporter_rspec (1.0.0)
|
||||
|
@ -42,7 +50,7 @@ GEM
|
|||
rspec (>= 2.14, < 4)
|
||||
clamp (0.6.5)
|
||||
coderay (1.1.0)
|
||||
concurrent-ruby (0.9.2-java)
|
||||
concurrent-ruby (1.0.0-java)
|
||||
coveralls (0.8.10)
|
||||
json (~> 1.8)
|
||||
rest-client (>= 1.6.8, < 2)
|
||||
|
@ -52,7 +60,7 @@ GEM
|
|||
tins (~> 1.6.0)
|
||||
diff-lcs (1.2.5)
|
||||
docile (1.1.5)
|
||||
domain_name (0.5.25)
|
||||
domain_name (0.5.20160128)
|
||||
unf (>= 0.0.5, < 1.0.0)
|
||||
faraday (0.9.2)
|
||||
multipart-post (>= 1.2, < 3)
|
||||
|
@ -75,7 +83,9 @@ GEM
|
|||
domain_name (~> 0.5)
|
||||
i18n (0.6.9)
|
||||
insist (1.0.0)
|
||||
jar-dependencies (0.3.2)
|
||||
jrjackson (0.3.8)
|
||||
jruby-monitoring (0.3.0)
|
||||
jruby-openssl (0.9.13-java)
|
||||
json (1.8.3-java)
|
||||
kramdown (1.9.0)
|
||||
|
@ -93,6 +103,7 @@ GEM
|
|||
minitar (0.5.4)
|
||||
multipart-post (2.0.0)
|
||||
netrc (0.11.0)
|
||||
numerizer (0.1.1)
|
||||
octokit (3.8.0)
|
||||
sawyer (~> 0.6.0, >= 0.5.3)
|
||||
polyglot (0.3.5)
|
||||
|
@ -101,7 +112,13 @@ GEM
|
|||
method_source (~> 0.8.1)
|
||||
slop (~> 3.4)
|
||||
spoon (~> 0.0)
|
||||
rake (10.4.2)
|
||||
puma (2.16.0-java)
|
||||
rack (1.6.4)
|
||||
rack-protection (1.5.3)
|
||||
rack
|
||||
rack-test (0.6.3)
|
||||
rack (>= 1.0)
|
||||
rake (10.5.0)
|
||||
rest-client (1.8.0)
|
||||
http-cookie (>= 1.0.2, < 2.0)
|
||||
mime-types (>= 1.16, < 3.0)
|
||||
|
@ -120,15 +137,22 @@ GEM
|
|||
rspec-support (3.1.2)
|
||||
rspec-wait (0.0.8)
|
||||
rspec (>= 2.11, < 3.5)
|
||||
ruby-maven (3.3.9)
|
||||
ruby-maven-libs (~> 3.3.1)
|
||||
ruby-maven-libs (3.3.3)
|
||||
rubyzip (1.1.7)
|
||||
sawyer (0.6.0)
|
||||
addressable (~> 2.3.5)
|
||||
faraday (~> 0.8, < 0.10)
|
||||
simplecov (0.11.1)
|
||||
simplecov (0.11.2)
|
||||
docile (~> 1.1.0)
|
||||
json (~> 1.8)
|
||||
simplecov-html (~> 0.10.0)
|
||||
simplecov-html (0.10.0)
|
||||
sinatra (1.4.7)
|
||||
rack (~> 1.5)
|
||||
rack-protection (~> 1.4)
|
||||
tilt (>= 1.3, < 3)
|
||||
slop (3.6.0)
|
||||
spoon (0.0.4)
|
||||
ffi
|
||||
|
@ -137,6 +161,7 @@ GEM
|
|||
tins (~> 1.0)
|
||||
thor (0.19.1)
|
||||
thread_safe (0.3.5-java)
|
||||
tilt (2.0.2)
|
||||
tins (1.6.0)
|
||||
treetop (1.4.15)
|
||||
polyglot
|
||||
|
@ -155,9 +180,10 @@ DEPENDENCIES
|
|||
fpm (~> 1.3.3)
|
||||
gems (~> 0.8.3)
|
||||
logstash-core (= 3.0.0.dev)!
|
||||
logstash-core-event (= 3.0.0.dev)!
|
||||
logstash-core-event-java (= 3.0.0.dev)!
|
||||
logstash-devutils (~> 0.0.15)
|
||||
octokit (= 3.8.0)
|
||||
rack-test
|
||||
rspec (~> 3.1.0)
|
||||
rubyzip (~> 1.1.7)
|
||||
simplecov
|
||||
|
|
|
@ -72,7 +72,7 @@ class LogStash::Agent
|
|||
# @param settings [Hash] settings that will be passed when creating the pipeline.
|
||||
# keys should be symbols such as :pipeline_workers and :pipeline_batch_delay
|
||||
def register_pipeline(pipeline_id, settings)
|
||||
pipeline = create_pipeline(settings.merge(:pipeline_id => pipeline_id))
|
||||
pipeline = create_pipeline(settings.merge(:pipeline_id => pipeline_id, :metric => metric))
|
||||
return unless pipeline.is_a?(LogStash::Pipeline)
|
||||
@pipelines[pipeline_id] = pipeline
|
||||
end
|
||||
|
@ -117,7 +117,7 @@ class LogStash::Agent
|
|||
end
|
||||
|
||||
def stop_webserver
|
||||
@webserver.stop
|
||||
@webserver.stop if @webserver
|
||||
end
|
||||
|
||||
def start_background_services
|
||||
|
@ -137,6 +137,7 @@ class LogStash::Agent
|
|||
def configure_metric
|
||||
if collect_metric?
|
||||
@logger.debug("Agent: Configuring metric collection")
|
||||
LogStash::Instrument::Collector.instance.agent = self
|
||||
@metric = LogStash::Instrument::Metric.create
|
||||
else
|
||||
@metric = LogStash::Instrument::NullMetric.new
|
||||
|
|
|
@ -4,7 +4,9 @@ Dir.glob('lib/**').each{ |d| $LOAD_PATH.unshift(File.join(ROOT, d)) }
|
|||
|
||||
require 'sinatra'
|
||||
require 'app/root'
|
||||
require 'app/stats'
|
||||
require 'app/modules/stats'
|
||||
require 'app/modules/node'
|
||||
require 'app/modules/node_stats'
|
||||
|
||||
env = ENV["RACK_ENV"].to_sym
|
||||
set :environment, env
|
||||
|
@ -16,7 +18,9 @@ configure do
|
|||
end
|
||||
run LogStash::Api::Root
|
||||
|
||||
namespaces = { "/_stats" => LogStash::Api::Stats }
|
||||
namespaces = { "/_node" => LogStash::Api::Node,
|
||||
"/_node/stats" => LogStash::Api::NodeStats,
|
||||
"/_stats" => LogStash::Api::Stats }
|
||||
|
||||
namespaces.each_pair do |namespace, app|
|
||||
map(namespace) do
|
||||
|
|
|
@ -14,5 +14,17 @@ module LogStash::Api
|
|||
raise "Not implemented"
|
||||
end
|
||||
|
||||
def hostname
|
||||
service.agent.node_name
|
||||
end
|
||||
|
||||
def uptime
|
||||
service.agent.uptime
|
||||
end
|
||||
|
||||
def started_at
|
||||
(service.agent.started_at.to_f*1000.0).to_i
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
# encoding: utf-8
|
||||
require "app/service"
|
||||
require "app/system/basicinfo_command"
|
||||
require "app/stats/events_command"
|
||||
require "app/stats/hotthreads_command"
|
||||
require "app/stats/memory_command"
|
||||
require "app/commands/system/basicinfo_command"
|
||||
require "app/commands/stats/events_command"
|
||||
require "app/commands/stats/hotthreads_command"
|
||||
require "app/commands/stats/memory_command"
|
||||
|
||||
module LogStash::Api
|
||||
class CommandFactory
|
||||
|
|
|
@ -6,14 +6,8 @@ class LogStash::Api::StatsEventsCommand < LogStash::Api::Command
|
|||
def run
|
||||
#return whatever is comming out of the snapshot event, this obvoiusly
|
||||
#need to be tailored to the right metrics for this command.
|
||||
stats = service.get(:events_stats)
|
||||
{
|
||||
:in => stats[:base][:events_in].value,
|
||||
:out => 0,
|
||||
:dropped => stats[:base][:events_filtered].value
|
||||
}
|
||||
rescue
|
||||
{}
|
||||
stats = LogStash::Json.load(service.get(:events_stats))
|
||||
stats["stats"]["events"]
|
||||
end
|
||||
|
||||
end
|
|
@ -0,0 +1,97 @@
|
|||
# encoding: utf-8
|
||||
require "app/command"
|
||||
require 'monitoring'
|
||||
require "socket"
|
||||
|
||||
class LogStash::Api::HotThreadsCommand < LogStash::Api::Command
|
||||
|
||||
STACK_TRACES_SIZE_DEFAULT = 10.freeze
|
||||
|
||||
def run(options={})
|
||||
filter = { :stacktrace_size => options.fetch(:stacktrace_size, STACK_TRACES_SIZE_DEFAULT) }
|
||||
hash = JRMonitor.threads.generate(filter)
|
||||
ThreadDump.new(hash, self, options)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
class ThreadDump
|
||||
|
||||
SKIPPED_THREADS = [ "Finalizer", "Reference Handler", "Signal Dispatcher" ].freeze
|
||||
THREADS_COUNT_DEFAULT = 10.freeze
|
||||
IGNORE_IDLE_THREADS_DEFAULT = true.freeze
|
||||
|
||||
attr_reader :top_count, :ignore, :dump
|
||||
|
||||
def initialize(dump, cmd, options={})
|
||||
@dump = dump
|
||||
@options = options
|
||||
@top_count = options.fetch(:threads, THREADS_COUNT_DEFAULT)
|
||||
@ignore = options.fetch(:ignore_idle_threads, IGNORE_IDLE_THREADS_DEFAULT)
|
||||
@cmd = cmd
|
||||
end
|
||||
|
||||
def to_s
|
||||
hash = to_hash
|
||||
report = "#{I18n.t("logstash.web_api.hot_threads.title", :hostname => hash[:hostname], :time => hash[:time], :top_count => top_count )} \n"
|
||||
hash[:threads].each do |thread|
|
||||
thread_report = ""
|
||||
thread_report = "\t #{I18n.t("logstash.web_api.hot_threads.thread_title", :percent_of_cpu_time => thread[:percent_of_cpu_time], :thread_state => thread[:state], :thread_name => thread[:name])} \n"
|
||||
thread_report = "\t #{thread[:percent_of_cpu_time]} % of of cpu usage by #{thread[:state]} thread named '#{thread[:name]}'\n"
|
||||
thread_report << "\t\t #{thread[:path]}\n" if thread[:path]
|
||||
thread[:traces].split("\n").each do |trace|
|
||||
thread_report << "#{trace}\n"
|
||||
end
|
||||
report << thread_report
|
||||
end
|
||||
report
|
||||
end
|
||||
|
||||
def to_hash
|
||||
hash = { :hostname => hostname, :time => Time.now.iso8601, :busiest_threads => top_count, :threads => [] }
|
||||
each do |thread_name, _hash|
|
||||
thread_name, thread_path = _hash["thread.name"].split(": ")
|
||||
thread = { :name => thread_name,
|
||||
:percent_of_cpu_time => cpu_time_as_percent(_hash),
|
||||
:state => _hash["thread.state"]
|
||||
}
|
||||
thread[:path] = thread_path if thread_path
|
||||
traces = ""
|
||||
_hash["thread.stacktrace"].each do |trace|
|
||||
traces << "\t\t#{trace}\n"
|
||||
end
|
||||
thread[:traces] = traces unless traces.empty?
|
||||
hash[:threads] << thread
|
||||
end
|
||||
hash
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def each(&block)
|
||||
i=0
|
||||
dump.each_pair do |thread_name, _hash|
|
||||
break if i >= top_count
|
||||
if ignore
|
||||
next if SKIPPED_THREADS.include?(thread_name)
|
||||
next if thread_name.match(/Ruby-\d+-JIT-\d+/)
|
||||
end
|
||||
block.call(thread_name, _hash)
|
||||
i += 1
|
||||
end
|
||||
end
|
||||
|
||||
def hostname
|
||||
@cmd.hostname
|
||||
end
|
||||
|
||||
def cpu_time_as_percent(hash)
|
||||
(((cpu_time(hash) / @cmd.uptime * 1.0)*10000).to_i)/100.0
|
||||
end
|
||||
|
||||
def cpu_time(hash)
|
||||
hash["cpu.time"] / 1000000.0
|
||||
end
|
||||
end
|
||||
|
||||
end
|
|
@ -0,0 +1,25 @@
|
|||
# encoding: utf-8
|
||||
require "app/command"
|
||||
require 'monitoring'
|
||||
|
||||
class LogStash::Api::JvmMemoryCommand < LogStash::Api::Command
|
||||
|
||||
def run
|
||||
memory = LogStash::Json.load(service.get(:jvm_memory_stats))
|
||||
{
|
||||
:heap_used_in_bytes => memory["heap"]["used_in_bytes"],
|
||||
:heap_used_percent => memory["heap"]["used_percent"],
|
||||
:heap_committed_in_bytes => memory["heap"]["committed_in_bytes"],
|
||||
:heap_may_in_bytes => memory["heap"]["max_in_bytes"],
|
||||
:heap_used_in_bytes => memory["heap"]["used_in_bytes"],
|
||||
:non_heap_used_in_bytes => memory["non_heap"]["used_in_bytes"],
|
||||
:non_heap_committed_in_bytes => memory["non_heap"]["committed_in_bytes"],
|
||||
:pools => memory["pools"].inject({}) do |acc, (type, hash)|
|
||||
hash.delete("committed_in_bytes")
|
||||
acc[type] = hash
|
||||
acc
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
end
|
|
@ -0,0 +1,15 @@
|
|||
# encoding: utf-8
|
||||
require "app/command"
|
||||
require "logstash/util/duration_formatter"
|
||||
|
||||
class LogStash::Api::SystemBasicInfoCommand < LogStash::Api::Command
|
||||
|
||||
def run
|
||||
{
|
||||
"hostname" => hostname,
|
||||
"version" => {
|
||||
"number" => LOGSTASH_VERSION
|
||||
}
|
||||
}
|
||||
end
|
||||
end
|
23
logstash-core/lib/logstash/api/lib/app/modules/node.rb
Normal file
23
logstash-core/lib/logstash/api/lib/app/modules/node.rb
Normal file
|
@ -0,0 +1,23 @@
|
|||
# encoding: utf-8
|
||||
require "app"
|
||||
|
||||
module LogStash::Api
|
||||
class Node < BaseApp
|
||||
|
||||
helpers AppHelpers
|
||||
|
||||
# return hot threads information
|
||||
get "/hot_threads" do
|
||||
ignore_idle_threads = params["ignore_idle_threads"] || true
|
||||
|
||||
options = {
|
||||
:ignore_idle_threads => as_boolean(ignore_idle_threads),
|
||||
:human => params.has_key?("human")
|
||||
}
|
||||
command = factory.build(:hot_threads_command)
|
||||
type = options[:human] ? :string : :json
|
||||
respond_with(command.run(options), type)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
43
logstash-core/lib/logstash/api/lib/app/modules/node_stats.rb
Normal file
43
logstash-core/lib/logstash/api/lib/app/modules/node_stats.rb
Normal file
|
@ -0,0 +1,43 @@
|
|||
# encoding: utf-8
|
||||
require "app"
|
||||
|
||||
module LogStash::Api
|
||||
class NodeStats < BaseApp
|
||||
|
||||
helpers AppHelpers
|
||||
|
||||
|
||||
# Global _stats resource where all information is
|
||||
# retrieved and show
|
||||
get "/" do
|
||||
events_command = factory.build(:events_command)
|
||||
memory_command = factory.build(:memory_command)
|
||||
payload = {
|
||||
:events => events_command.run,
|
||||
:start_time_in_millis => events_command.started_at,
|
||||
:jvm => { :memory => memory_command.run }
|
||||
}
|
||||
respond_with payload
|
||||
end
|
||||
|
||||
# Show all events stats information
|
||||
# (for ingested, emitted, dropped)
|
||||
# - #events since startup
|
||||
# - #data (bytes) since startup
|
||||
# - events/s
|
||||
# - bytes/s
|
||||
# - dropped events/s
|
||||
# - events in the pipeline
|
||||
get "/events" do
|
||||
command = factory.build(:events_command)
|
||||
respond_with({ :events => command.run })
|
||||
end
|
||||
|
||||
# return hot threads information
|
||||
get "/jvm" do
|
||||
command = factory.build(:memory_command)
|
||||
respond_with({ :memory => command.run })
|
||||
end
|
||||
|
||||
end
|
||||
end
|
21
logstash-core/lib/logstash/api/lib/app/modules/stats.rb
Normal file
21
logstash-core/lib/logstash/api/lib/app/modules/stats.rb
Normal file
|
@ -0,0 +1,21 @@
|
|||
# encoding: utf-8
|
||||
require "app"
|
||||
|
||||
module LogStash::Api
|
||||
class Stats < BaseApp
|
||||
|
||||
helpers AppHelpers
|
||||
|
||||
# return hot threads information
|
||||
get "/jvm" do
|
||||
command = factory.build(:memory_command)
|
||||
jvm_payload = {
|
||||
:timestamp => command.started_at,
|
||||
:uptime_in_millis => command.uptime,
|
||||
:mem => command.run
|
||||
}
|
||||
respond_with({:jvm => jvm_payload})
|
||||
end
|
||||
|
||||
end
|
||||
end
|
|
@ -1,6 +1,5 @@
|
|||
# encoding: utf-8
|
||||
require "app"
|
||||
require "app/system/basicinfo_command"
|
||||
|
||||
module LogStash::Api
|
||||
class Root < BaseApp
|
||||
|
|
|
@ -19,6 +19,10 @@ class LogStash::Api::Service
|
|||
LogStash::Instrument::Collector.instance.delete_observer(self)
|
||||
end
|
||||
|
||||
def agent
|
||||
LogStash::Instrument::Collector.instance.agent
|
||||
end
|
||||
|
||||
def update(snapshot)
|
||||
logger.debug("[api-service] snapshot received", :snapshot => snapshot) if logger.debug?
|
||||
if @snapshot_rotation_mutex.try_lock
|
||||
|
@ -30,11 +34,10 @@ class LogStash::Api::Service
|
|||
def get(key)
|
||||
metric_store = @snapshot.metric_store
|
||||
if key == :jvm_memory_stats
|
||||
metric_store.get(:root, :jvm, :memory)
|
||||
data = metric_store.get_with_path("jvm/memory")[:jvm][:memory]
|
||||
else
|
||||
{ :base => metric_store.get(:root, :base) }
|
||||
data = metric_store.get_with_path("stats/events")
|
||||
end
|
||||
rescue
|
||||
{}
|
||||
LogStash::Json.dump(data)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,48 +0,0 @@
|
|||
# encoding: utf-8
|
||||
require "app/command"
|
||||
require 'monitoring'
|
||||
require "socket"
|
||||
|
||||
class LogStash::Api::HotThreadsCommand < LogStash::Api::Command
|
||||
|
||||
SKIPPED_THREADS = [ "Finalizer", "Reference Handler", "Signal Dispatcher" ].freeze
|
||||
|
||||
def run(options={})
|
||||
top_count = options.fetch(:threads, 3)
|
||||
ignore = options.fetch(:ignore_idle_threads, true)
|
||||
hash = JRMonitor.threads.generate
|
||||
report = "::: {#{hostname}} \n Hot threads at #{Time.now}, busiestThreads=#{top_count}:\n"
|
||||
i = 0
|
||||
hash.each_pair do |thread_name, container|
|
||||
break if i >= top_count
|
||||
if ignore
|
||||
next if SKIPPED_THREADS.include?(thread_name)
|
||||
next if thread_name.match(/Ruby-\d+-JIT-\d+/)
|
||||
end
|
||||
report << "#{build_report(container)} \n"
|
||||
i += 1
|
||||
end
|
||||
report
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_report(hash)
|
||||
thread_name, thread_path = hash["thread.name"].split(": ")
|
||||
report = "\t #{cpu_time(hash)} micros of cpu usage by #{hash["thread.state"]} thread named '#{thread_name}'\n"
|
||||
report << "\t\t #{thread_path}\n" if thread_path
|
||||
hash["thread.stacktrace"].each do |trace|
|
||||
report << "\t\t#{trace}\n"
|
||||
end
|
||||
report
|
||||
end
|
||||
|
||||
def hostname
|
||||
Socket.gethostname
|
||||
end
|
||||
|
||||
def cpu_time(hash)
|
||||
hash["cpu.time"] / 1000
|
||||
end
|
||||
|
||||
end
|
|
@ -1,30 +0,0 @@
|
|||
# encoding: utf-8
|
||||
require "app/command"
|
||||
require 'monitoring'
|
||||
|
||||
class LogStash::Api::JvmMemoryCommand < LogStash::Api::Command
|
||||
|
||||
def run
|
||||
memory = service.get(:jvm_memory_stats)
|
||||
{
|
||||
:heap => dump(memory[:heap].marshal_dump),
|
||||
:non_heap => dump(memory[:non_heap].marshal_dump),
|
||||
:pools => memory[:pools].marshal_dump.inject({}) do |acc, (type, hash)|
|
||||
acc[type] = dump(hash.marshal_dump)
|
||||
acc
|
||||
end
|
||||
}
|
||||
rescue
|
||||
{} # Something happen, so we just return an empty hash.
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def dump(hash)
|
||||
hash.inject({}) do |h, (k,v)|
|
||||
h[k] = v.value
|
||||
h
|
||||
end
|
||||
end
|
||||
|
||||
end
|
|
@ -1,24 +0,0 @@
|
|||
# encoding: utf-8
|
||||
require "app/command"
|
||||
|
||||
class LogStash::Api::SystemBasicInfoCommand < LogStash::Api::Command
|
||||
|
||||
def run
|
||||
{
|
||||
"version" => LOGSTASH_VERSION,
|
||||
"hostname" => hostname,
|
||||
"pipeline" => pipeline
|
||||
}
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def hostname
|
||||
`hostname`.strip
|
||||
end
|
||||
|
||||
|
||||
def pipeline
|
||||
{ "status" => "ready", "uptime" => 1 }
|
||||
end
|
||||
end
|
|
@ -22,9 +22,11 @@ module LogStash module Instrument
|
|||
SNAPSHOT_ROTATION_TIME_SECS = 1 # seconds
|
||||
SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS = 10 * 60 # seconds
|
||||
|
||||
attr_accessor :agent
|
||||
|
||||
def initialize
|
||||
@metric_store = MetricStore.new
|
||||
|
||||
@agent = nil
|
||||
start_periodic_snapshotting
|
||||
|
||||
@async_worker_pool
|
||||
|
|
|
@ -5,15 +5,11 @@ require 'monitoring'
|
|||
module LogStash module Instrument module PeriodicPoller
|
||||
class JVM < Base
|
||||
|
||||
attr_reader :heap_metrics, :non_heap_metrics, :pools_metrics
|
||||
attr_reader :metric
|
||||
|
||||
def initialize(metric, options = {})
|
||||
super(metric, options)
|
||||
jvm_metrics = metric.namespace(:jvm)
|
||||
memory_metrics = jvm_metrics.namespace(:memory)
|
||||
@heap_metrics = memory_metrics.namespace(:heap)
|
||||
@non_heap_metrics = memory_metrics.namespace(:non_heap)
|
||||
@pools_metrics = memory_metrics.namespace(:pools)
|
||||
@metric = metric
|
||||
end
|
||||
|
||||
def collect
|
||||
|
@ -30,23 +26,22 @@ module LogStash module Instrument module PeriodicPoller
|
|||
heap[:used_percent] = (heap[:used_in_bytes] / heap[:max_in_bytes].to_f)*100.0
|
||||
|
||||
heap.each_pair do |key, value|
|
||||
heap_metrics.gauge(key, value.to_i)
|
||||
metric.gauge([:jvm, :memory, :heap], key, value.to_i)
|
||||
end
|
||||
end
|
||||
|
||||
def collect_non_heap_metrics(data)
|
||||
non_heap = aggregate_information_for(data["non_heap"].values)
|
||||
non_heap.each_pair do |key, value|
|
||||
non_heap_metrics.gauge(key, value.to_i)
|
||||
metric.gauge([:jvm, :memory, :non_heap],key, value.to_i)
|
||||
end
|
||||
end
|
||||
|
||||
def collect_pools_metrics(data)
|
||||
metrics = build_pools_metrics(data)
|
||||
metrics.each_pair do |key, hash|
|
||||
metric = pools_metrics.namespace(key.to_sym)
|
||||
hash.each_pair do |p,v|
|
||||
metric.gauge(p, v)
|
||||
metric.gauge([:jvm, :memory, :pools, key.to_sym], p, v)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -170,7 +170,7 @@ class LogStash::Runner < Clamp::Command
|
|||
:auto_reload => @auto_reload,
|
||||
:collect_metric => true,
|
||||
:debug => debug?,
|
||||
:node_name => node_name)
|
||||
:node_name => node_name)
|
||||
|
||||
@agent.register_pipeline("main", @pipeline_settings.merge({
|
||||
:config_string => config_string,
|
||||
|
|
|
@ -5,80 +5,81 @@ require 'puma/binder'
|
|||
require 'puma/configuration'
|
||||
require 'puma/commonlogger'
|
||||
|
||||
module LogStash
|
||||
module LogStash
|
||||
class WebServer
|
||||
|
||||
extend Forwardable
|
||||
extend Forwardable
|
||||
|
||||
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events
|
||||
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events
|
||||
|
||||
def_delegator :@runner, :stats
|
||||
def_delegator :@runner, :stats
|
||||
|
||||
def initialize(logger, options={})
|
||||
@logger = logger
|
||||
@options = {}
|
||||
@cli_options = options.merge({ :rackup => ::File.join(::File.dirname(__FILE__), "api", "init.ru") })
|
||||
@status = nil
|
||||
def initialize(logger, options={})
|
||||
@logger = logger
|
||||
@options = {}
|
||||
@cli_options = options.merge({ :rackup => ::File.join(::File.dirname(__FILE__), "api", "init.ru") })
|
||||
@status = nil
|
||||
|
||||
parse_options
|
||||
parse_options
|
||||
|
||||
@runner = nil
|
||||
@events = ::Puma::Events.strings
|
||||
@binder = ::Puma::Binder.new(@events)
|
||||
@binder.import_from_env
|
||||
@runner = nil
|
||||
@events = ::Puma::Events.strings
|
||||
@binder = ::Puma::Binder.new(@events)
|
||||
@binder.import_from_env
|
||||
|
||||
set_environment
|
||||
end
|
||||
|
||||
def run
|
||||
log "=== puma start: #{Time.now} ==="
|
||||
|
||||
@runner = Puma::Single.new(self)
|
||||
@status = :run
|
||||
@runner.run
|
||||
stop(:graceful => true)
|
||||
end
|
||||
|
||||
def log(str)
|
||||
logger.debug(str) if logger.debug?
|
||||
end
|
||||
|
||||
def error(str)
|
||||
logger.error(str) if logger.error?
|
||||
end
|
||||
|
||||
# Empty method, this method is required because of the puma usage we make through
|
||||
# the Single interface, https://github.com/puma/puma/blob/master/lib/puma/single.rb#L82
|
||||
# for more details. This can always be implemented when we want to keep track of this
|
||||
# bit of data.
|
||||
def write_state; end
|
||||
|
||||
def stop(options={})
|
||||
graceful = options.fetch(:graceful, true)
|
||||
|
||||
if graceful
|
||||
@runner.stop_blocked
|
||||
else
|
||||
@runner.stop
|
||||
set_environment
|
||||
end
|
||||
@status = :stop
|
||||
log "=== puma shutdown: #{Time.now} ==="
|
||||
end
|
||||
|
||||
private
|
||||
def run
|
||||
log "=== puma start: #{Time.now} ==="
|
||||
|
||||
def env
|
||||
@options[:debug] ? "development" : "production"
|
||||
end
|
||||
@runner = Puma::Single.new(self)
|
||||
@status = :run
|
||||
@runner.run
|
||||
stop(:graceful => true)
|
||||
end
|
||||
|
||||
def set_environment
|
||||
@options[:environment] = env
|
||||
ENV['RACK_ENV'] = env
|
||||
end
|
||||
def log(str)
|
||||
logger.debug(str) if logger.debug?
|
||||
end
|
||||
|
||||
def parse_options
|
||||
@config = ::Puma::Configuration.new(cli_options)
|
||||
@config.load
|
||||
@options = @config.options
|
||||
def error(str)
|
||||
logger.error(str) if logger.error?
|
||||
end
|
||||
|
||||
# Empty method, this method is required because of the puma usage we make through
|
||||
# the Single interface, https://github.com/puma/puma/blob/master/lib/puma/single.rb#L82
|
||||
# for more details. This can always be implemented when we want to keep track of this
|
||||
# bit of data.
|
||||
def write_state; end
|
||||
|
||||
def stop(options={})
|
||||
graceful = options.fetch(:graceful, true)
|
||||
|
||||
if graceful
|
||||
@runner.stop_blocked
|
||||
else
|
||||
@runner.stop
|
||||
end
|
||||
@status = :stop
|
||||
log "=== puma shutdown: #{Time.now} ==="
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def env
|
||||
@options[:debug] ? "development" : "production"
|
||||
end
|
||||
|
||||
def set_environment
|
||||
@options[:environment] = env
|
||||
ENV['RACK_ENV'] = env
|
||||
end
|
||||
|
||||
def parse_options
|
||||
@config = ::Puma::Configuration.new(cli_options)
|
||||
@config.load
|
||||
@options = @config.options
|
||||
end
|
||||
end
|
||||
end; end
|
||||
end
|
||||
|
|
|
@ -69,6 +69,13 @@ en:
|
|||
data loss.
|
||||
forced_sigint: >-
|
||||
SIGINT received. Terminating immediately..
|
||||
web_api:
|
||||
hot_threads:
|
||||
title: |-
|
||||
::: {%{hostname}}
|
||||
Hot threads at %{time}, busiestThreads=%{top_count}:
|
||||
thread_title: |-
|
||||
%{percent_of_cpu_time} % of of cpu usage by %{thread_state} thread named '%{thread_name}'
|
||||
runner:
|
||||
short-help: |-
|
||||
usage:
|
||||
|
|
34
logstash-core/spec/api/lib/api/node_stats_spec.rb
Normal file
34
logstash-core/spec/api/lib/api/node_stats_spec.rb
Normal file
|
@ -0,0 +1,34 @@
|
|||
# encoding: utf-8
|
||||
require_relative "../../spec_helper"
|
||||
require "sinatra"
|
||||
require "app/modules/node_stats"
|
||||
|
||||
describe LogStash::Api::NodeStats do
|
||||
|
||||
include Rack::Test::Methods
|
||||
|
||||
def app()
|
||||
described_class
|
||||
end
|
||||
|
||||
let(:mem) do
|
||||
{ :heap_used_in_bytes => 10,
|
||||
:pools => { :used_in_bytes => 20 }}
|
||||
end
|
||||
|
||||
let(:events) do
|
||||
{ :in => 10, :out => 20 }
|
||||
end
|
||||
|
||||
it "respond to the events resource" do
|
||||
expect_any_instance_of(LogStash::Api::StatsEventsCommand).to receive(:run).and_return(events)
|
||||
get "/events"
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
|
||||
it "respond to the jvm resource" do
|
||||
expect_any_instance_of(LogStash::Api::JvmMemoryCommand).to receive(:run).and_return(mem)
|
||||
get "jvm"
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
end
|
|
@ -12,22 +12,16 @@ describe LogStash::Api::Root do
|
|||
described_class
|
||||
end
|
||||
|
||||
let(:body) { LogStash::Json.load(last_response.body) }
|
||||
let(:agent) { double("agent") }
|
||||
|
||||
before(:each) do
|
||||
get "/"
|
||||
allow(agent).to receive(:node_name).and_return("foo")
|
||||
expect_any_instance_of(LogStash::Api::Service).to receive(:agent).and_return(agent)
|
||||
end
|
||||
|
||||
it "should respond to root resource" do
|
||||
get "/"
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
|
||||
it "contain a hostname" do
|
||||
expect(body).to include("hostname" => a_kind_of(String))
|
||||
end
|
||||
|
||||
it "contain a version number" do
|
||||
expect(body).to include("version" => a_kind_of(String) )
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# encoding: utf-8
|
||||
require_relative "../../spec_helper"
|
||||
require "sinatra"
|
||||
require "app/stats"
|
||||
require "app/modules/stats"
|
||||
|
||||
describe LogStash::Api::Stats do
|
||||
|
||||
|
@ -11,24 +11,20 @@ describe LogStash::Api::Stats do
|
|||
described_class
|
||||
end
|
||||
|
||||
it "respond to the events resource" do
|
||||
get "/events"
|
||||
let(:mem) do
|
||||
{ :heap_used_in_bytes => 10,
|
||||
:pools => { :used_in_bytes => 20 }}
|
||||
end
|
||||
|
||||
before(:each) do
|
||||
expect_any_instance_of(LogStash::Api::JvmMemoryCommand).to receive(:started_at).and_return(1234567890)
|
||||
expect_any_instance_of(LogStash::Api::JvmMemoryCommand).to receive(:uptime).and_return(10)
|
||||
expect_any_instance_of(LogStash::Api::JvmMemoryCommand).to receive(:run).and_return(mem)
|
||||
end
|
||||
|
||||
it "respond to the jvm resource" do
|
||||
get "/jvm"
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
|
||||
context "jvm" do
|
||||
let(:type) { "jvm" }
|
||||
|
||||
it "respond to the hot_threads resource" do
|
||||
get "#{type}/hot_threads"
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
|
||||
it "respond to the memory resource" do
|
||||
get "#{type}/memory"
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
# encoding: utf-8
|
||||
require_relative "../../spec_helper"
|
||||
require "app/stats/events_command"
|
||||
require 'ostruct'
|
||||
require "app/commands/stats/events_command"
|
||||
|
||||
describe LogStash::Api::StatsEventsCommand do
|
||||
|
||||
|
@ -10,18 +9,20 @@ describe LogStash::Api::StatsEventsCommand do
|
|||
subject { described_class.new(service) }
|
||||
|
||||
let(:stats) do
|
||||
{ :base => { :events_in => OpenStruct.new(:value => 100), :events_filtered => OpenStruct.new(:value => 200) } }
|
||||
{ "stats" => { "events" => { "in" => 100,
|
||||
"out" => 0,
|
||||
"filtered" => 200 }}}
|
||||
end
|
||||
|
||||
before(:each) do
|
||||
allow(service).to receive(:get).with(:events_stats).and_return(stats)
|
||||
allow(service).to receive(:get).with(:events_stats).and_return(LogStash::Json.dump(stats))
|
||||
end
|
||||
|
||||
context "#schema" do
|
||||
let(:report) { subject.run }
|
||||
|
||||
it "return events information" do
|
||||
expect(report).to include({:in => 100, :dropped => 200 })
|
||||
expect(report).to include({"in" => 100, "filtered" => 200 })
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -1,17 +1,25 @@
|
|||
# encoding: utf-8
|
||||
require_relative "../../spec_helper"
|
||||
require "app/stats/hotthreads_command"
|
||||
require "app/stats/memory_command"
|
||||
require "app/commands/stats/hotthreads_command"
|
||||
require "app/commands/stats/memory_command"
|
||||
|
||||
describe "JVM stats" do
|
||||
|
||||
let(:agent) { double("agent") }
|
||||
|
||||
describe LogStash::Api::HotThreadsCommand do
|
||||
|
||||
before(:each) do
|
||||
allow(agent).to receive(:node_name).and_return("foo")
|
||||
expect_any_instance_of(LogStash::Api::Service).to receive(:agent).and_return(agent)
|
||||
expect(subject).to receive(:uptime).and_return(10).at_least(:once)
|
||||
end
|
||||
|
||||
context "#schema" do
|
||||
let(:report) { subject.run }
|
||||
|
||||
it "return hot threads information" do
|
||||
expect(report).not_to be_empty
|
||||
expect(report.to_s).not_to be_empty
|
||||
end
|
||||
|
||||
end
|
||||
|
@ -19,7 +27,6 @@ describe "JVM stats" do
|
|||
|
||||
describe LogStash::Api::JvmMemoryCommand do
|
||||
|
||||
|
||||
context "#schema" do
|
||||
|
||||
let(:service) { double("snapshot-service") }
|
||||
|
@ -31,6 +38,7 @@ describe "JVM stats" do
|
|||
end
|
||||
|
||||
before(:each) do
|
||||
allow(service).to receive(:agent).and_return(agent)
|
||||
allow(service).to receive(:get).with(:jvm_memory_stats).and_return(stats)
|
||||
end
|
||||
|
||||
|
@ -44,15 +52,13 @@ describe "JVM stats" do
|
|||
end
|
||||
|
||||
it "return heap information" do
|
||||
expect(report.keys).to include(:heap)
|
||||
expect(report.keys).to include(:heap_used_in_bytes)
|
||||
end
|
||||
|
||||
it "return non heap information" do
|
||||
expect(report.keys).to include(:non_heap)
|
||||
expect(report.keys).to include(:non_heap_used_in_bytes)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
end
|
||||
|
|
|
@ -15,35 +15,5 @@ Rack::Builder.parse_file(File.join(ROOT, 'init.ru'))
|
|||
|
||||
def read_fixture(name)
|
||||
path = File.join(File.dirname(__FILE__), "fixtures", name)
|
||||
HashWithIndifferentAccess.new(JSON.parse(File.read(path)))
|
||||
end
|
||||
|
||||
class HashWithIndifferentAccess
|
||||
|
||||
extend Forwardable
|
||||
def_delegators :@hash, :inject, :keys
|
||||
|
||||
def initialize(hash)
|
||||
@hash = hash
|
||||
end
|
||||
|
||||
def [](key)
|
||||
v = @hash[key.to_s]
|
||||
if (v.is_a?(Hash))
|
||||
return HashWithIndifferentAccess.new(v)
|
||||
end
|
||||
return OpenStruct.new(:value => v)
|
||||
end
|
||||
|
||||
def marshal_dump
|
||||
h = {}
|
||||
@hash.each_pair do |k, v|
|
||||
if (!v.is_a?(Hash))
|
||||
h[k] = OpenStruct.new(:value => v)
|
||||
else
|
||||
h[k] = HashWithIndifferentAccess.new(v)
|
||||
end
|
||||
end
|
||||
HashWithIndifferentAccess.new(h)
|
||||
end
|
||||
File.read(path)
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue