Collecting Logstash metrics at runtime

This commit introduce the collection of the main metrics inside the
logstash pipeline and records it to the metric store. This code is also
the initial stone to define an internal metric api.

Collected metrics from the pipeline and system will be exposed by the web
api.

This work was done in collaboration by @ph and @purbon, commits were
squashed to simplify the merging process.

Fixes #4653
This commit is contained in:
Pier-Hugues Pellerin 2016-02-04 15:36:35 +01:00
parent fa5b154ef8
commit a04da0f76d
66 changed files with 2703 additions and 32 deletions

13
Gemfile
View file

@ -3,7 +3,6 @@
source "https://rubygems.org"
gem "logstash-core", "3.0.0.dev", :path => "./logstash-core"
# gem "logstash-core-event", "3.0.0.dev", :path => "./logstash-core-event"
gem "logstash-core-event-java", "3.0.0.dev", :path => "./logstash-core-event-java"
gem "file-dependencies", "0.1.6"
gem "ci_reporter_rspec", "1.0.0", :group => :development
@ -21,4 +20,16 @@ gem "stud", "~> 0.0.21", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.1.7", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "rack-test", :require => "rack/test", :group => :development
gem "flores", "~> 0.0.6", :group => :development
gem "logstash-output-elasticsearch"
gem "logstash-codec-plain", ">= 0"
gem "logstash-filter-clone"
gem "logstash-filter-mutate", ">= 0"
gem "logstash-filter-multiline"
gem "logstash-input-generator"
gem "logstash-input-stdin"
gem "logstash-input-tcp"
gem "logstash-output-stdout"
gem "ftw", "~> 0.0.42"
gem "longshoreman", ">= 0"

3
benchmark/collector.rb Normal file
View file

@ -0,0 +1,3 @@
# encoding: utf-8
require "benchmark/ips"
require "logstash/instrument/collector"

View file

@ -1,3 +1,4 @@
# encoding: utf-8
require "benchmark/ips"
require "lib/logstash/event"

View file

@ -2,7 +2,13 @@
require "logstash/environment"
require "logstash/errors"
require "logstash/config/cpu_core_strategy"
require "logstash/instrument/collector"
require "logstash/instrument/metric"
require "logstash/instrument/periodic_pollers"
require "logstash/instrument/collector"
require "logstash/instrument/metric"
require "logstash/pipeline"
require "logstash/webserver"
require "stud/trap"
require "logstash/config/loader"
require "uri"
@ -12,7 +18,7 @@ require "securerandom"
LogStash::Environment.load_locale!
class LogStash::Agent
attr_reader :logger, :pipelines
attr_reader :metric, :debug, :node_name, :started_at, :pipelines, :logger
# initialize method for LogStash::Agent
# @param params [Hash] potential parameters are:
@ -23,19 +29,26 @@ class LogStash::Agent
def initialize(params)
@logger = params[:logger]
@auto_reload = params[:auto_reload]
@pipelines = {}
@debug = params.fetch(:debug, false)
@pipelines = {}
@started_at = Time.now
@node_name = params[:node_name] || Socket.gethostname
@config_loader = LogStash::Config::Loader.new(@logger)
@reload_interval = params[:reload_interval] || 3 # seconds
@upgrade_mutex = Mutex.new
@collect_metric = params.fetch(:collect_metric, false)
configure_metric
end
def execute
@thread = Thread.current # this var is implicilty used by Stud.stop?
@logger.info("starting agent")
start_background_services
start_pipelines
start_webserver
return 1 if clean_state?
@ -76,15 +89,98 @@ class LogStash::Agent
end
end
# Calculate the Logstash uptime in milliseconds
#
# @return [Fixnum] Uptime in milliseconds
def uptime
((Time.now.to_f - started_at.to_f) * 1000.0).to_i
end
def shutdown
shutdown_pipelines
end
def node_uuid
@node_uuid ||= SecureRandom.uuid
end
private
def start_webserver
options = { :debug => debug }
@webserver = LogStash::WebServer.new(@logger, options)
Thread.new(@webserver) do |webserver|
webserver.run
end
end
def stop_webserver
@webserver.stop
end
def start_background_services
if collect_metric?
@logger.debug("Agent: Starting metric periodic pollers")
@periodic_pollers.start
end
end
def stop_background_services
if collect_metric?
@logger.debug("Agent: Stopping metric periodic pollers")
@periodic_pollers.stop
end
end
def shutdown
stop_background_services
stop_webserver
shutdown_pipelines
end
private
def node_uuid
@node_uuid ||= SecureRandom.uuid
end
def configure_metric
if collect_metric?
@logger.debug("Agent: Configuring metric collection")
@metric = LogStash::Instrument::Metric.create
add_metric_pipeline
else
@metric = LogStash::Instrument::NullMetric.new
end
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(metric)
end
def collect_metric?
@collect_metric
end
# Add a new pipeline sitting next to the main pipeline,
# This pipeline should only contains one input: the `metrics`
# and multiple shippers.
def add_metric_pipeline
@logger.debug("Agent: Adding metric pipeline")
metric_pipeline_config =<<-EOS
input {
metrics {}
}
output {
elasticsearch {
flush_size => 1
hosts => "127.0.0.1"
index => "metrics-%{+YYYY.MM.dd}"
}
}
EOS
@pipelines[:metric] = LogStash::Pipeline.new(metric_pipeline_config, { :pipeline_id => :metric })
end
def create_pipeline(settings)
begin
config = fetch_config(settings)
@ -174,5 +270,5 @@ class LogStash::Agent
def clean_state?
@pipelines.empty?
end
end # class LogStash::Agent

View file

@ -0,0 +1,22 @@
ROOT = File.expand_path(File.dirname(__FILE__))
$LOAD_PATH.unshift File.join(ROOT, 'lib')
Dir.glob('lib/**').each{ |d| $LOAD_PATH.unshift(File.join(ROOT, d)) }
require 'sinatra'
require 'app/root'
require 'app/stats'
env = ENV["RACK_ENV"].to_sym
set :environment, env
set :service, LogStash::Api::Service.instance
run LogStash::Api::Root
namespaces = { "/_stats" => LogStash::Api::Stats }
namespaces.each_pair do |namespace, app|
map(namespace) do
run app
end
end

View file

@ -0,0 +1,25 @@
# encoding: utf-8
require "cabin"
require "logstash/json"
require "app_helpers"
require "app/service"
require "app/command_factory"
module LogStash::Api
class BaseApp < ::Sinatra::Application
attr_reader :factory
if settings.environment != :production
set :show_exceptions, :after_handler
end
helpers AppHelpers
def initialize(app=nil)
super(app)
@factory = CommandFactory.new(settings.service)
end
end
end

View file

@ -0,0 +1,18 @@
# encoding: utf-8
require "app/service"
module LogStash::Api
class Command
attr_reader :service
def initialize(service = LogStash::Api::Service.instance)
@service = service
end
def run
raise "Not implemented"
end
end
end

View file

@ -0,0 +1,27 @@
# encoding: utf-8
require "app/service"
require "app/system/basicinfo_command"
require "app/stats/events_command"
require "app/stats/hotthreads_command"
require "app/stats/memory_command"
module LogStash::Api
class CommandFactory
attr_reader :factory, :service
def initialize(service)
@service = service
@factory = {}.merge(
:system_basic_info => SystemBasicInfoCommand,
:events_command => StatsEventsCommand,
:hot_threads_command => HotThreadsCommand,
:memory_command => JvmMemoryCommand
)
end
def build(klass)
factory[klass].new(service)
end
end
end

View file

@ -0,0 +1,14 @@
# encoding: utf-8
require "app"
require "app/system/basicinfo_command"
module LogStash::Api
class Root < BaseApp
get "/" do
command = factory.build(:system_basic_info)
respond_with command.run
end
end
end

View file

@ -0,0 +1,40 @@
# encoding: utf-8
require "logstash/instrument/collector"
require "logstash/util/loggable"
class LogStash::Api::Service
include Singleton
include LogStash::Util::Loggable
def initialize
@snapshot_rotation_mutex = Mutex.new
@snapshot = nil
logger.debug("[api-service] start") if logger.debug?
LogStash::Instrument::Collector.instance.add_observer(self)
end
def stop
logger.debug("[api-service] stop") if logger.debug?
LogStash::Instrument::Collector.instance.delete_observer(self)
end
def update(snapshot)
logger.debug("[api-service] snapshot received", :snapshot => snapshot) if logger.debug?
if @snapshot_rotation_mutex.try_lock
@snapshot = snapshot
@snapshot_rotation_mutex.unlock
end
end
def get(key)
metric_store = @snapshot.metric_store
if key == :jvm_memory_stats
metric_store.get(:root, :jvm, :memory)
else
{ :base => metric_store.get(:root, :base) }
end
rescue
{}
end
end

View file

@ -0,0 +1,56 @@
# encoding: utf-8
require "app"
require "app/stats/events_command"
require "app/stats/hotthreads_command"
module LogStash::Api
class Stats < BaseApp
helpers AppHelpers
# Global _stats resource where all information is
# retrieved and show
get "/" do
events_command = factory.build(:events_command)
memory_command = factory.build(:memory_command)
payload = {
:events => events_command.run,
:jvm => { :memory => memory_command.run }
}
respond_with payload
end
# Show all events stats information
# (for ingested, emitted, dropped)
# - #events since startup
# - #data (bytes) since startup
# - events/s
# - bytes/s
# - dropped events/s
# - events in the pipeline
get "/events" do
command = factory.build(:events_command)
respond_with({ :events => command.run })
end
# return hot threads information
get "/jvm/hot_threads" do
top_threads_count = params["threads"] || 3
ignore_idle_threads = params["ignore_idle_threads"] || true
options = {
:threads => top_threads_count.to_i,
:ignore_idle_threads => as_boolean(ignore_idle_threads)
}
command = factory.build(:hot_threads_command)
respond_with(command.run(options), :string)
end
# return hot threads information
get "/jvm/memory" do
command = factory.build(:memory_command)
respond_with({ :memory => command.run })
end
end
end

View file

@ -0,0 +1,19 @@
# encoding: utf-8
require "app/command"
class LogStash::Api::StatsEventsCommand < LogStash::Api::Command
def run
#return whatever is comming out of the snapshot event, this obvoiusly
#need to be tailored to the right metrics for this command.
stats = service.get(:events_stats)
{
:in => stats[:base][:events_in].value,
:out => 0,
:dropped => stats[:base][:events_filtered].value
}
rescue
{}
end
end

View file

@ -0,0 +1,48 @@
# encoding: utf-8
require "app/command"
require 'monitoring'
require "socket"
class LogStash::Api::HotThreadsCommand < LogStash::Api::Command
SKIPPED_THREADS = [ "Finalizer", "Reference Handler", "Signal Dispatcher" ].freeze
def run(options={})
top_count = options.fetch(:threads, 3)
ignore = options.fetch(:ignore_idle_threads, true)
hash = JRMonitor.threads.generate
report = "::: {#{hostname}} \n Hot threads at #{Time.now}, busiestThreads=#{top_count}:\n"
i = 0
hash.each_pair do |thread_name, container|
break if i >= top_count
if ignore
next if SKIPPED_THREADS.include?(thread_name)
next if thread_name.match(/Ruby-\d+-JIT-\d+/)
end
report << "#{build_report(container)} \n"
i += 1
end
report
end
private
def build_report(hash)
thread_name, thread_path = hash["thread.name"].split(": ")
report = "\t #{cpu_time(hash)} micros of cpu usage by #{hash["thread.state"]} thread named '#{thread_name}'\n"
report << "\t\t #{thread_path}\n" if thread_path
hash["thread.stacktrace"].each do |trace|
report << "\t\t#{trace}\n"
end
report
end
def hostname
Socket.gethostname
end
def cpu_time(hash)
hash["cpu.time"] / 1000
end
end

View file

@ -0,0 +1,30 @@
# encoding: utf-8
require "app/command"
require 'monitoring'
class LogStash::Api::JvmMemoryCommand < LogStash::Api::Command
def run
memory = service.get(:jvm_memory_stats)
{
:heap => dump(memory[:heap].marshal_dump),
:non_heap => dump(memory[:non_heap].marshal_dump),
:pools => memory[:pools].marshal_dump.inject({}) do |acc, (type, hash)|
acc[type] = dump(hash.marshal_dump)
acc
end
}
rescue
{} # Something happen, so we just return an empty hash.
end
private
def dump(hash)
hash.inject({}) do |h, (k,v)|
h[k] = v.value
h
end
end
end

View file

@ -0,0 +1,24 @@
# encoding: utf-8
require "app/command"
class LogStash::Api::SystemBasicInfoCommand < LogStash::Api::Command
def run
{
"version" => LOGSTASH_VERSION,
"hostname" => hostname,
"pipeline" => pipeline
}
end
private
def hostname
`hostname`.strip
end
def pipeline
{ "status" => "ready", "uptime" => 1 }
end
end

View file

@ -0,0 +1,21 @@
# encoding: utf-8
require "logstash/json"
module LogStash::Api::AppHelpers
def respond_with(data, as=:json)
if as == :json
content_type "application/json"
LogStash::Json.dump(data)
else
content_type "text/plain"
data.to_s
end
end
def as_boolean(string)
return true if string == true || string =~ (/(true|t|yes|y|1)$/i)
return false if string == false || string.blank? || string =~ (/(false|f|no|n|0)$/i)
raise ArgumentError.new("invalid value for Boolean: \"#{string}\"")
end
end

View file

@ -0,0 +1,63 @@
# encoding: utf-8
#
module LogStash
class FilterDelegator
extend Forwardable
def_delegators :@filter,
:register,
:close,
:threadsafe?,
:do_close,
:do_stop,
:periodic_flush
def initialize(logger, klass, metric, *args)
options = args.reduce({}, :merge)
@logger = logger
@klass = klass
@filter = klass.new(options)
# Scope the metrics to the plugin
namespaced_metric = metric.namespace(@filter.id.to_sym)
@filter.metric = metric
@metric_events = namespaced_metric.namespace(:events)
# Not all the filters will do bufferings
define_flush_method if @filter.respond_to?(:flush)
end
def config_name
@klass.config_name
end
def multi_filter(events)
@metric_events.increment(:in, events.size)
new_events = @filter.multi_filter(events)
# There is no garantee in the context of filter
# that EVENTS_INT == EVENTS_OUT, see the aggregates and
# the split filter
@metric_events.increment(:out, new_events.size) unless new_events.nil?
return new_events
end
private
def define_flush_method
define_singleton_method(:flush) do |options = {}|
# we also need to trace the number of events
# coming from a specific filters.
new_events = @filter.flush(options)
# Filter plugins that does buffering or spooling of events like the
# `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
@metric_events.increment(:out, new_events.size) unless new_events.nil?
new_events
end
end
end
end

View file

@ -0,0 +1,47 @@
# encoding: utf-8
require "logstash/event"
require "logstash/inputs/base"
require "logstash/instrument/collector"
module LogStash module Inputs
# The Metrics inputs is responable of registring itself to the collector.
# The collector class will periodically emits new snapshot of the system,
# The metrics need to take that information and transform it into
# a `Logstash::Event`, which can be consumed by the shipper and send to
# Elasticsearch
class Metrics < LogStash::Inputs::Base
config_name "metrics"
milestone 3
def register
end
def run(queue)
@logger.debug("Metric: input started")
@queue = queue
# we register to the collector after receiving the pipeline queue
LogStash::Instrument::Collector.instance.add_observer(self)
# Keep this plugin thread alive,
# until we shutdown the metric pipeline
sleep(1) while !stop?
end
def stop
@logger.debug("Metrics input: stopped")
LogStash::Instrument::Collector.instance.delete_observer(self)
end
def update(snapshot)
@logger.debug("Metrics input: received a new snapshot", :created_at => snapshot.created_at, :snapshot => snapshot, :event => snapshot.metric_store.to_event) if @logger.debug?
# The back pressure is handled in the collector's
# scheduled task (running into his own thread) if something append to one of the listener it will
# will timeout. In a sane pipeline, with a low traffic of events it shouldn't be a problems.
snapshot.metric_store.each do |metric|
@queue << LogStash::Event.new({ "@timestamp" => snapshot.created_at }.merge(metric.to_hash))
end
end
end
end;end

View file

@ -0,0 +1,106 @@
# encoding: utf-8
require "logstash/instrument/snapshot"
require "logstash/instrument/metric_store"
require "logstash/util/loggable"
require "concurrent/timer_task"
require "observer"
require "singleton"
require "thread"
module LogStash module Instrument
# The Collector singleton is the single point of reference for all
# the metrics collection inside logstash, the metrics library will make
# direct calls to this class.
#
# This class is an observable responsable of periodically emitting view of the system
# to other components like the internal metrics pipelines.
class Collector
include LogStash::Util::Loggable
include Observable
include Singleton
SNAPSHOT_ROTATION_TIME_SECS = 1 # seconds
SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS = 10 * 60 # seconds
def initialize
@metric_store = MetricStore.new
start_periodic_snapshotting
@async_worker_pool
end
# The metric library will call this unique interface
# its the job of the collector to update the store with new metric
# of update the metric
#
# If there is a problem with the key or the type of metric we will record an error
# but we wont stop processing events, theses errors are not considered fatal.
#
def push(namespaces_path, key, type, *metric_type_params)
begin
metric = @metric_store.fetch_or_store(namespaces_path, key) do
LogStash::Instrument::MetricType.create(type, namespaces_path, key)
end
metric.execute(*metric_type_params)
changed # we had changes coming in so we can notify the observers
rescue MetricStore::NamespacesExpectedError => e
logger.error("Collector: Cannot record metric", :exception => e)
rescue NameError => e
logger.error("Collector: Cannot create concrete class for this metric type",
:type => type,
:namespaces_path => namespaces_path,
:key => key,
:metrics_params => metric_type_params,
:exception => e,
:stacktrace => e.backtrace)
end
end
# Monitor the `Concurrent::TimerTask` this update is triggered on every successful or not
# run of the task, TimerTask implement Observable and the collector acts as
# the observer and will keep track if something went wrong in the execution.
#
# @param [Time] Time of execution
# @param [result] Result of the execution
# @param [Exception] Exception
def update(time_of_execution, result, exception)
return true if exception.nil?
logger.error("Collector: Something went wrong went sending data to the observers",
:execution_time => time_of_execution,
:result => result,
:exception => exception)
end
# Snapshot the current Metric Store and return it immediately,
# This is useful if you want to get access to the current metric store without
# waiting for a periodic call.
#
# @return [LogStash::Instrument::MetricStore]
def snapshot_metric
Snapshot.new(@metric_store)
end
# Configure and start the periodic task for snapshotting the `MetricStore`
def start_periodic_snapshotting
@snapshot_task = Concurrent::TimerTask.new { publish_snapshot }
@snapshot_task.execution_interval = SNAPSHOT_ROTATION_TIME_SECS
@snapshot_task.timeout_interval = SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS
@snapshot_task.add_observer(self)
@snapshot_task.execute
end
# Create a snapshot of the MetricStore and send it to to the registered observers
# The observer will receive the following signature in the update methode.
#
# `#update(created_at, metric_store)`
def publish_snapshot
created_at = Time.now
logger.debug("Collector: Sending snapshot to observers", :created_at => created_at) if logger.debug?
notify_observers(snapshot_metric)
end
end
end; end

View file

@ -0,0 +1,109 @@
# encoding: utf-8
require "logstash/instrument/collector"
require "concurrent"
module LogStash module Instrument
class MetricException < Exception; end
class MetricNoKeyProvided < MetricException; end
class MetricNoBlockProvided < MetricException; end
class MetricNoNamespaceProvided < MetricException; end
# This class provide the interface between the code, the collector and the format
# of the recorded metric.
class Metric
attr_reader :collector
def initialize(collector)
@collector = collector
end
def increment(namespace, key, value = 1)
validate_key!(key)
collector.push(namespace, key, :counter, :increment, value)
end
def decrement(namespace, key, value = 1)
validate_key!(key)
collector.push(namespace, key, :counter, :decrement, value)
end
def gauge(namespace, key, value)
validate_key!(key)
collector.push(namespace, key, :gauge, :set, value)
end
def time(namespace, key)
validate_key!(key)
if block_given?
timer = TimedExecution.new(self, namespace, key)
content = yield
timer.stop
return content
else
TimedExecution.new(self, namespace, key)
end
end
def report_time(namespace, key, duration)
collector.push(namespace, key, :mean, :increment, duration)
end
# This method return a metric instance tied to a specific namespace
# so instead of specifying the namespace on every call.
#
# Example:
# metric.increment(:namespace, :mykey, 200)
# metric.increment(:namespace, :mykey_2, 200)
#
# namespaced_metric = metric.namespace(:namespace)
# namespaced_metric.increment(:mykey, 200)
# namespaced_metric.increment(:mykey_2, 200)
# ```
#
# @param name [Array<String>] Name of the namespace
# @param name [String] Name of the namespace
def namespace(name)
raise MetricNoNamespaceProvided if name.nil? || name.empty?
NamespacedMetric.new(self, name)
end
# Create a Metric instrance using the default Collector singleton reference
#
#
def self.create(collector = LogStash::Instrument::Collector.instance)
Metric.new(collector)
end
private
def validate_key!(key)
raise MetricNoKeyProvided if key.nil? || key.empty?
end
# Allow to calculate the execution of a block of code.
# This class support 2 differents syntax a block or the return of
# the object itself, but in the later case the metric wont be recorded
# Until we call `#stop`.
#
# @see LogStash::Instrument::Metric#time
class TimedExecution
MILLISECONDS = 1_000_000.0.freeze
def initialize(metric, namespace, key)
@metric = metric
@namespace = namespace
@key = key
start
end
def start
@start_time = Time.now
end
def stop
@metric.report_time(@namespace, @key, (MILLISECONDS * (Time.now - @start_time)).to_i)
end
end
end
end; end

View file

@ -0,0 +1,183 @@
# encoding: utf-8
require "concurrent"
require "logstash/event"
require "logstash/instrument/metric_type"
module LogStash module Instrument
# The Metric store the data structure that make sure the data is
# saved in a retrievable way, this is a wrapper around multiples ConcurrentHashMap
# acting as a tree like structure.
class MetricStore
class NamespacesExpectedError < StandardError; end
class MetricNotFound < StandardError; end
KEY_PATH_SEPARATOR = "/".freeze
# Lets me a bit flexible on the coma usage in the path
# definition
FILTER_KEYS_SEPARATOR = /\s?*,\s*/.freeze
def initialize
# We keep the structured cache to allow
# the api to search the content of the differents nodes
@store = Concurrent::Map.new
end
# This method use the namespace and key to search the corresponding value of
# the hash, if it doesn't exist it will create the appropriate namespaces
# path in the hash and return `new_value`
#
# @param [Array] The path where the values should be located
# @param [Object] The default object if the value is not found in the path
# @return [Object] Return the new_value of the retrieve object in the tree
def fetch_or_store(namespaces, key, default_value = nil)
fetch_or_store_namespaces(namespaces).fetch_or_store(key, block_given? ? yield(key) : default_value)
end
# This method allow to retrieve values for a specific path,
# This method support the following queries
#
# stats/pipelines/pipeline_X
# stats/pipelines/pipeline_X,pipeline_2
# stats/os,jvm
#
# If you use the `,` on a key the metric store will return the both values at that level
#
# The returned hash will keep the same structure as it had in the `Concurrent::Map`
# but will be a normal ruby hash. This will allow the api to easily seriliaze the content
# of the map
#
# @param [Array] The path where values should be located
# @return nil if the values are not found
def get_with_path(path)
key_paths = path.gsub(/^#{KEY_PATH_SEPARATOR}+/, "").split(KEY_PATH_SEPARATOR)
get(*key_paths)
end
# Use an array of symbols instead of path
def get(*key_paths)
# Normalize the symbols access
key_paths.map(&:to_sym)
new_hash = Hash.new
get_recursively(key_paths, @store, new_hash)
new_hash
end
# Return all the individuals Metric,
# This call mimic a Enum's each if a block is provided
def each(path = nil, &block)
metrics = if path.nil?
get_all
else
transform_to_array(get_with_path(path))
end
block_given? ? metrics.each(&block) : metrics
end
alias_method :all, :each
private
def get_all
each_recursively(@store).flatten
end
def get_recursively(key_paths, map, new_hash)
key_candidates = extract_filter_keys(key_paths.shift)
key_candidates.each do |key_candidate|
raise MetricNotFound, "For path: #{key_candidate}" if map[key_candidate].nil?
if key_paths.empty? # End of the user requested path
if map[key_candidate].is_a?(Concurrent::Map)
new_hash[key_candidate] = transform_to_hash(map[key_candidate])
else
new_hash[key_candidate] = map[key_candidate]
end
else
if map[key_candidate].is_a?(Concurrent::Map)
new_hash[key_candidate] = get_recursively(key_paths, map[key_candidate], {})
else
new_hash[key_candidate] = map[key_candidate]
end
end
end
return new_hash
end
def extract_filter_keys(key)
key.to_s.strip.split(FILTER_KEYS_SEPARATOR).map(&:to_sym)
end
def transform_to_array(map)
map.values.collect do |value|
value.is_a?(Hash) ? transform_to_array(value) : value
end.flatten
end
def transform_to_hash(map, new_hash = Hash.new)
map.each_pair do |key, value|
if value.is_a?(Concurrent::Map)
new_hash[key] = {}
transform_to_hash(value, new_hash[key])
else
new_hash[key] = value
end
end
return new_hash
end
# Recursively fetch only the leaf node that should be an instance
# of the `MetricType`
def each_recursively(values)
events = []
values.each_value do |value|
if value.is_a?(Concurrent::Map)
events << each_recursively(value)
else
events << value
end
end
return events
end
# This method iterate through the namespace path and try to find the corresponding
# value for the path, if any part of the path is not found it will
# create it.
#
# @param [Array] The path where values should be located
# @raise [ConcurrentMapExpected] Raise if the retrieved object isn't a `Concurrent::Map`
# @return [Concurrent::Map] Map where the metrics should be saved
def fetch_or_store_namespaces(namespaces_path)
path_map = fetch_or_store_namespace_recursively(@store, namespaces_path)
# This mean one of the namespace and key are colliding
# and we have to deal it upstream.
unless path_map.is_a?(Concurrent::Map)
raise NamespacesExpectedError, "Expecting a `Namespaces` but found class: #{path_map.class.name} for namespaces_path: #{namespaces_path}"
end
return path_map
end
# Recursively fetch or create the namespace paths through the `MetricStove`
# This algorithm use an index to known which keys to search in the map.
# This doesn't cloning the array if we want to give a better feedback to the user
#
# @param [Concurrent::Map] Map to search for the key
# @param [Array] List of path to create
# @param [Fixnum] Which part from the list to create
#
def fetch_or_store_namespace_recursively(map, namespaces_path, idx = 0)
current = namespaces_path[idx]
# we are at the end of the namespace path, break out of the recursion
return map if current.nil?
new_map = map.fetch_or_store(current) { Concurrent::Map.new }
return fetch_or_store_namespace_recursively(new_map, namespaces_path, idx + 1)
end
end
end; end

View file

@ -0,0 +1,24 @@
# encoding: utf-8
require "logstash/instrument/metric_type/counter"
require "logstash/instrument/metric_type/mean"
require "logstash/instrument/metric_type/gauge"
module LogStash module Instrument
module MetricType
METRIC_TYPE_LIST = {
:counter => LogStash::Instrument::MetricType::Counter,
:mean => LogStash::Instrument::MetricType::Mean,
:gauge => LogStash::Instrument::MetricType::Gauge
}.freeze
# Use the string to generate a concrete class for this metrics
#
# @param [String] The name of the class
# @param [Array] Namespaces list
# @param [String] The metric key
# @raise [NameError] If the class is not found
def self.create(type, namespaces, key)
METRIC_TYPE_LIST[type].new(namespaces, key)
end
end
end; end

View file

@ -0,0 +1,35 @@
# encoding: utf-8
require "logstash/event"
require "logstash/util"
module LogStash module Instrument module MetricType
class Base
attr_reader :namespaces, :key
def initialize(namespaces, key)
@namespaces = namespaces
@key = key
end
def inspect
"#{self.class.name} - namespaces: #{namespaces} key: #{key} value: #{value}"
end
def to_hash
{
"namespaces" => namespaces,
"key" => key,
"type" => type,
"value" => value
}
end
def to_json_data
value
end
def type
@type ||= LogStash::Util.class_name(self).downcase
end
end
end; end; end

View file

@ -0,0 +1,29 @@
# encoding: utf-8
require "logstash/instrument/metric_type/base"
require "concurrent"
module LogStash module Instrument module MetricType
class Counter < Base
def initialize(namespaces, key, value = 0)
super(namespaces, key)
@counter = Concurrent::AtomicFixnum.new(value)
end
def increment(value = 1)
@counter.increment(value)
end
def decrement(value = 1)
@counter.decrement(value)
end
def execute(action, value = 1)
@counter.send(action, value)
end
def value
@counter.value
end
end
end; end; end

View file

@ -0,0 +1,22 @@
# encoding: utf-8
require "logstash/instrument/metric_type/base"
require "concurrent/atomic_reference/mutex_atomic"
require "logstash/json"
module LogStash module Instrument module MetricType
class Gauge < Base
def initialize(namespaces, key)
super(namespaces, key)
@gauge = Concurrent::MutexAtomicReference.new()
end
def execute(action, value = nil)
@gauge.set(value)
end
def value
@gauge.get
end
end
end; end; end

View file

@ -0,0 +1,33 @@
# encoding: utf-8
require "logstash/instrument/metric_type/base"
require "concurrent"
module LogStash module Instrument module MetricType
class Mean < Base
def initialize(namespaces, key)
super(namespaces, key)
@counter = Concurrent::AtomicFixnum.new
@sum = Concurrent::AtomicFixnum.new
end
def increment(value = 1)
@counter.increment
@sum.increment(value)
end
def decrement(value = 1)
@counter.decrement
@sum.decrement(value)
end
def mean
if @counter > 0
@sum.value / @counter.value
else
0
end
end
alias_method :value, :mean
end
end; end; end

View file

@ -0,0 +1,54 @@
# encoding: utf-8
require "logstash/instrument/metric"
module LogStash module Instrument
# This class acts a a proxy between the metric library and the user calls.
#
# This is the class that plugins authors will use to interact with the `MetricStore`
# It has the same public interface as `Metric` class but doesnt require to send
# the namespace on every call.
#
# @see Logstash::Instrument::Metric
class NamespacedMetric
attr_reader :namespace_name
# Create metric with a specific namespace
#
# @param metric [LogStash::Instrument::Metric] The metric instance to proxy
# @param namespace [Array] The namespace to use
def initialize(metric, namespace_name)
@metric = metric
@namespace_name = Array(namespace_name)
end
def increment(key, value = 1)
@metric.increment(namespace_name, key, value)
end
def decrement(namespace, key, value = 1)
@metric.decrement(namespace_name, key, value)
end
def gauge(key, value)
@metric.gauge(namespace_name, key, value)
end
def report_time(key, duration)
@metric.report_time(namespace_name, key, duration)
end
def time(key, &block)
@metric.time(namespace_name, key, &block)
end
def collector
@metric.collector
end
def namespace(name)
NamespacedMetric.new(metric, namespace_name.concat(Array(name)))
end
private
attr_reader :metric
end
end; end

View file

@ -0,0 +1,46 @@
# encoding: utf-8
require "logstash/instrument/metric"
module LogStash module Instrument
# This class is used in the context when we disable the metric collection
# for specific plugin to replace the `NamespacedMetric` class with this one
# which doesn't produce any metric to the collector.
class NullMetric
attr_reader :namespace_name, :collector
def increment(key, value = 1)
end
def decrement(namespace, key, value = 1)
end
def gauge(key, value)
end
def report_time(key, duration)
end
# We have to manually redefine this method since it can return an
# object this object also has to be implemented as a NullObject
def time(key)
if block_given?
yield
else
NullTimedExecution
end
end
def namespace(key)
self.class.new
end
private
# Null implementation of the internal timer class
#
# @see LogStash::Instrument::TimedExecution`
class NullTimedExecution
def self.stop
end
end
end
end; end

View file

@ -0,0 +1,57 @@
# encoding: utf-8
require "logstash/util/loggable"
require "logstash/util"
require "concurrent"
module LogStash module Instrument module PeriodicPoller
class Base
include LogStash::Util::Loggable
DEFAULT_OPTIONS = {
:polling_interval => 1,
:polling_timeout => 60
}
public
def initialize(metric, options = {})
@metric = metric
@options = DEFAULT_OPTIONS.merge(options)
configure_task
end
def update(time, result, exception)
return unless exception
logger.error("PeriodicPoller: exception",
:poller => self,
:result => result,
:exception => exception,
:executed_at => time)
end
def collect
raise NotImplementedError, "#{self.class.name} need to implement `#collect`"
end
def start
logger.debug("PeriodicPoller: Starting", :poller => self,
:polling_interval => @options[:polling_interval],
:polling_timeout => @options[:polling_timeout]) if logger.debug?
@task.execute
end
def stop
logger.debug("PeriodicPoller: Stopping", :poller => self)
@task.shutdown
end
protected
def configure_task
@task = Concurrent::TimerTask.new { collect }
@task.execution_interval = @options[:polling_interval]
@task.timeout_interval = @options[:polling_timeout]
@task.add_observer(self)
end
end
end
end; end

View file

@ -0,0 +1,87 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/base"
require 'monitoring'
module LogStash module Instrument module PeriodicPoller
class JVM < Base
attr_reader :heap_metrics, :non_heap_metrics, :pools_metrics
def initialize(metric, options = {})
super(metric, options)
jvm_metrics = metric.namespace(:jvm)
memory_metrics = jvm_metrics.namespace(:memory)
@heap_metrics = memory_metrics.namespace(:heap)
@non_heap_metrics = memory_metrics.namespace(:non_heap)
@pools_metrics = memory_metrics.namespace(:pools)
end
def collect
raw = JRMonitor.memory.generate
collect_heap_metrics(raw)
collect_non_heap_metrics(raw)
collect_pools_metrics(raw)
end
private
def collect_heap_metrics(data)
heap = aggregate_information_for(data["heap"].values)
heap[:used_percent] = (heap[:used_in_bytes] / heap[:max_in_bytes].to_f)*100.0
heap.each_pair do |key, value|
heap_metrics.gauge(key, value.to_i)
end
end
def collect_non_heap_metrics(data)
non_heap = aggregate_information_for(data["non_heap"].values)
non_heap.each_pair do |key, value|
non_heap_metrics.gauge(key, value.to_i)
end
end
def collect_pools_metrics(data)
metrics = build_pools_metrics(data)
metrics.each_pair do |key, hash|
metric = pools_metrics.namespace(key.to_sym)
hash.each_pair do |p,v|
metric.gauge(p, v)
end
end
end
def build_pools_metrics(data)
{
"young" => aggregate_information_for(data["heap"]["Par Eden Space"]),
"old" => aggregate_information_for(data["heap"]["CMS Old Gen"]),
"survivor" => aggregate_information_for(data["heap"]["Par Survivor Space"]),
}
end
def aggregate_information_for(collection)
collection.reduce(default_information_accumulator) do |m,e|
e = { e[0] => e[1] } if e.is_a?(Array)
e.each_pair do |k,v|
m[:used_in_bytes] += v if k.include?("used")
m[:committed_in_bytes] += v if k.include?("committed")
m[:max_in_bytes] += v if k.include?("max")
m[:peak_max_in_bytes] += v if k.include?("peak.max")
m[:peak_used_in_bytes] += v if k.include?("peak.used")
end
m
end
end
def default_information_accumulator
{
:used_in_bytes => 0,
:committed_in_bytes => 0,
:max_in_bytes => 0,
:peak_used_in_bytes => 0,
:peak_max_in_bytes => 0
}
end
end
end; end; end

View file

@ -0,0 +1,13 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/base"
module LogStash module Instrument module PeriodicPoller
class Os < Base
def initialize(metric, options = {})
super(metric, options)
end
def collect
end
end
end; end; end

View file

@ -0,0 +1,19 @@
# encoding: utf-8
module LogStash module Instrument module PeriodicPoller
class PeriodicPollerObserver
include LogStash::Util::Loggable
def initialize(poller)
@poller = poller
end
def update(time, result, exception)
if exception
logger.error("PeriodicPoller exception", :poller => @poller,
:result => result,
:exception => exception,
:executed_at => time)
end
end
end
end; end; end

View file

@ -0,0 +1,26 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/os"
require "logstash/instrument/periodic_poller/jvm"
module LogStash module Instrument
# Each PeriodPoller manager his own thread to do the poller
# of the stats, this class encapsulate the starting and stopping of the poller
# if the unique timer uses too much resource we can refactor this behavior here.
class PeriodicPollers
attr_reader :metric
def initialize(metric)
@metric = metric
@periodic_pollers = [PeriodicPoller::Os.new(metric),
PeriodicPoller::JVM.new(metric)]
end
def start
@periodic_pollers.map(&:start)
end
def stop
@periodic_pollers.map(&:stop)
end
end
end; end

View file

@ -0,0 +1,16 @@
# encoding: utf-8
require "logstash/util/loggable"
require "logstash/event"
module LogStash module Instrument
class Snapshot
include LogStash::Util::Loggable
attr_reader :metric_store, :created_at
def initialize(metric_store, created_at = Time.now)
@metric_store = metric_store
@created_at = created_at
end
end
end; end

View file

@ -10,4 +10,5 @@ module LogStash
module Util; end
module PluginMixins; end
module PluginManager; end
module Api; end
end # module LogStash

View file

@ -12,12 +12,18 @@ module LogStash class OutputDelegator
# The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them
# Internally these just get merged together into a single hash
def initialize(logger, klass, default_worker_count, *args)
def initialize(logger, klass, default_worker_count, metric, *args)
@logger = logger
@threadsafe = klass.threadsafe?
@config = args.reduce({}, :merge)
@klass = klass
# Create an instance of the input so we can fetch the identifier
output = @klass.new(*args)
# Scope the metrics to the plugin
@metric = metric.namespace(output.id.to_sym)
# We define this as an array regardless of threadsafety
# to make reporting simpler, even though a threadsafe plugin will just have
# a single instance
@ -39,6 +45,7 @@ module LogStash class OutputDelegator
@workers += (@worker_count - 1).times.map do
inst = @klass.new(*args)
inst.metric = @metric
inst.register
inst
end
@ -107,6 +114,7 @@ module LogStash class OutputDelegator
def threadsafe_multi_receive(events)
@events_received.increment(events.length)
@metric.increment(:events_in, events.length)
@threadsafe_worker.multi_receive(events)
end
@ -147,4 +155,4 @@ module LogStash class OutputDelegator
private
# Needed for testing, so private
attr_reader :threadsafe_worker, :worker_queue
end end
end end

View file

@ -14,10 +14,28 @@ require "logstash/util/defaults_printer"
require "logstash/shutdown_watcher"
require "logstash/util/wrapped_synchronous_queue"
require "logstash/pipeline_reporter"
require "logstash/instrument/metric"
require "logstash/instrument/namespaced_metric"
require "logstash/instrument/null_metric"
require "logstash/instrument/collector"
require "logstash/output_delegator"
require "logstash/filter_delegator"
module LogStash; class Pipeline
attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger, :thread, :config_str, :original_settings
attr_reader :inputs,
:filters,
:outputs,
:worker_threads,
:events_consumed,
:events_filtered,
:reporter,
:pipeline_id,
:metric,
:logger,
:started_at,
:thread,
:config_str,
:original_settings
DEFAULT_SETTINGS = {
:default_pipeline_workers => LogStash::Config::CpuCoreStrategy.maximum,
@ -52,6 +70,16 @@ module LogStash; class Pipeline
@worker_threads = []
# Metric object should be passed upstream, multiple pipeline share the same metric
# and collector only the namespace will changes.
# If no metric is given, we use a `NullMetric` for all internal calls.
# We also do this to make the changes backward compatible with previous testing of the
# pipeline.
#
# This need to be configured before we evaluate the code to make
# sure the metric instance is correctly send to the plugin.
@metric = settings.fetch(:metric, Instrument::NullMetric.new)
grammar = LogStashConfigParser.new
@config = grammar.parse(config_str)
if @config.nil?
@ -61,9 +89,12 @@ module LogStash; class Pipeline
# The code will initialize all the plugins and define the
# filter and output methods.
code = @config.compile
@code = code
# The config code is hard to represent as a log message...
# So just print it.
@logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}")
begin
eval(code)
rescue => e
@ -99,7 +130,7 @@ module LogStash; class Pipeline
safe_filters, unsafe_filters = @filters.partition(&:threadsafe?)
if unsafe_filters.any?
plugins = unsafe_filters.collect { |f| f.class.config_name }
plugins = unsafe_filters.collect { |f| f.config_name }
case thread_count
when nil
# user did not specify a worker thread count
@ -128,6 +159,9 @@ module LogStash; class Pipeline
end
def run
@started_at = Time.now
LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
@logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings))
@thread = Thread.current
@ -179,7 +213,7 @@ module LogStash; class Pipeline
begin
start_inputs
@outputs.each {|o| o.register }
@filters.each {|f| f.register}
@filters.each {|f| f.register }
pipeline_workers = safe_pipeline_worker_count
batch_size = @settings[:pipeline_batch_size]
@ -209,16 +243,21 @@ module LogStash; class Pipeline
end
# Main body of what a worker thread does
# Repeatedly takes batches off the queu, filters, then outputs them
# Repeatedly takes batches off the queue, filters, then outputs them
def worker_loop(batch_size, batch_delay)
running = true
namespace_events = metric.namespace([:stats, :events])
namespace_pipeline = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
while running
# To understand the purpose behind this synchronize please read the body of take_batch
input_batch, signal = @input_queue_pop_mutex.synchronize { take_batch(batch_size, batch_delay) }
running = false if signal == LogStash::SHUTDOWN
@events_consumed.increment(input_batch.size)
namespace_events.increment(:in, input_batch.size)
namespace_pipeline.increment(:in, input_batch.size)
filtered_batch = filter_batch(input_batch)
@ -229,8 +268,14 @@ module LogStash; class Pipeline
@events_filtered.increment(filtered_batch.size)
namespace_events.increment(:filtered, filtered_batch.size)
namespace_pipeline.increment(:filtered, filtered_batch.size)
output_batch(filtered_batch)
namespace_events.increment(:out, filtered_batch.size)
namespace_pipeline.increment(:out, filtered_batch.size)
inflight_batches_synchronize { set_current_thread_inflight_batch(nil) }
end
end
@ -408,10 +453,14 @@ module LogStash; class Pipeline
def plugin(plugin_type, name, *args)
args << {} if args.empty?
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
klass = LogStash::Plugin.lookup(plugin_type, name)
if plugin_type == "output"
LogStash::OutputDelegator.new(@logger, klass, default_output_workers, *args)
LogStash::OutputDelegator.new(@logger, klass, default_output_workers, pipeline_scoped_metric.namespace(:outputs), *args)
elsif plugin_type == "filter"
LogStash::FilterDelegator.new(@logger, klass, pipeline_scoped_metric.namespace(:filters), *args)
else
klass.new(*args)
end
@ -463,6 +512,15 @@ module LogStash; class Pipeline
end
end
# Calculate the uptime in milliseconds
#
# @return [Fixnum] Uptime in milliseconds, 0 if the pipeline is not started
def uptime
return 0 if started_at.nil?
((Time.now.to_f - started_at.to_f) * 1000.0).to_i
end
# perform filters flush into the output queue
# @param options [Hash]
# @option options [Boolean] :final => true to signal a final shutdown flush

View file

@ -2,8 +2,10 @@
require "logstash/namespace"
require "logstash/logging"
require "logstash/config/mixin"
require "logstash/instrument/null_metric"
require "cabin"
require "concurrent"
require "securerandom"
class LogStash::Plugin
attr_accessor :params
@ -11,26 +13,54 @@ class LogStash::Plugin
NL = "\n"
public
include LogStash::Config::Mixin
# Disable or enable metric logging for this specific plugin instance
# by default we record all the metrics we can, but you can disable metrics collection
# for a specific plugin.
config :enable_metric, :validate => :boolean, :default => true
# Add a unique `ID` to the plugin instance, this `ID` is used for tracking
# information for a specific configuration of the plugin.
#
# ```
# output {
# stdout {
# id => "ABC"
# }
# }
# ```
#
# If you don't explicitely set this variable Logstash will generate a unique name.
config :id, :validate => :string, :default => ""
def hash
params.hash ^
self.class.name.hash
end
public
def eql?(other)
self.class.name == other.class.name && @params == other.params
end
public
def initialize(params=nil)
@params = LogStash::Util.deep_clone(params)
@logger = Cabin::Channel.get(LogStash)
end
# Return a uniq ID for this plugin configuration, by default
# we will generate a UUID
#
# If the user defines a `id => 'ABC'` in the configuration we will return
#
# @return [String] A plugin ID
def id
(@params["id"].nil? || @params["id"].empty?) ? SecureRandom.uuid : @params["id"]
end
# close is called during shutdown, after the plugin worker
# main task terminates
public
def do_close
@logger.debug("closing", :plugin => self)
close
@ -38,7 +68,6 @@ class LogStash::Plugin
# Subclasses should implement this close method if you need to perform any
# special tasks during shutdown (like flushing, etc.)
public
def close
# ..
end
@ -47,7 +76,6 @@ class LogStash::Plugin
return "#{self.class.name}: #{@params}"
end
public
def inspect
if !@params.nil?
description = @params
@ -59,13 +87,19 @@ class LogStash::Plugin
end
end
public
def debug_info
[self.class.to_s, original_params]
end
def metric=(new_metric)
@metric = new_metric
end
def metric
@metric_plugin ||= enable_metric ? @metric : LogStash::Instrument::NullMetric.new
end
# Look up a plugin by type and name.
public
def self.lookup(type, name)
path = "logstash/#{type}s/#{name}"
@ -86,7 +120,6 @@ class LogStash::Plugin
end
private
# lookup a plugin by type and name in the existing LogStash module namespace
# ex.: namespace_lookup("filter", "grok") looks for LogStash::Filters::Grok
# @param type [String] plugin type, "input", "ouput", "filter"

View file

@ -117,6 +117,11 @@ class LogStash::Runner < Clamp::Command
require "stud/task"
require "cabin" # gem 'cabin'
# Configure Logstash logging facility, this need to be done before everything else to
# make sure the logger has the correct settings and the log level is correctly defined.
configure_logging(log_file)
LogStash::Util::set_thread_name(self.class.name)
if RUBY_VERSION < "1.9.2"
@ -162,7 +167,10 @@ class LogStash::Runner < Clamp::Command
end
@agent = create_agent(:logger => @logger,
:auto_reload => @auto_reload)
:auto_reload => @auto_reload,
:collect_metric => true,
:debug => debug?,
:node_name => node_name)
@agent.register_pipeline("main", @pipeline_settings.merge({
:config_string => config_string,
@ -235,7 +243,6 @@ class LogStash::Runner < Clamp::Command
#
# Log file stuff, plugin path checking, etc.
def configure
configure_logging(log_file)
configure_plugin_paths(plugin_paths)
end # def configure
@ -254,6 +261,7 @@ class LogStash::Runner < Clamp::Command
# Point logging at a specific path.
def configure_logging(path)
@logger = Cabin::Channel.get(LogStash)
# Set with the -v (or -vv...) flag
if quiet?
@logger.level = :error

View file

@ -184,6 +184,15 @@ module LogStash::Util
end
end
# Take a instance reference and return the name of the class
# stripping all the modules.
#
# @param [Object] The object to return the class)
# @return [String] The name of the class
def self.class_name(instance)
instance.class.name.split("::").last
end
def self.deep_clone(o)
case o
when Hash

View file

@ -0,0 +1,15 @@
# encoding: utf-8
require "chronic_duration"
module LogStash::Util::DurationFormatter
CHRONIC_OPTIONS = { :format => :short }
# Take a duration in milliseconds and transform it into
# a format that a human can understand. This is currently used by
# the API.
#
# @param [Fixnum] Duration in milliseconds
# @return [String] Duration in human format
def self.human_format(duration)
ChronicDuration.output(duration / 1000, CHRONIC_OPTIONS)
end
end

View file

@ -0,0 +1,29 @@
# encoding: utf-8
require "logstash/namespace"
require "cabin"
module LogStash module Util
module Loggable
class << self
def logger=(new_logger)
@logger = new_logger
end
def logger
@logger ||= Cabin::Channel.get(LogStash)
end
end
def self.included(base)
class << base
def logger
Loggable.logger
end
end
end
def logger
Loggable.logger
end
end
end; end

View file

@ -0,0 +1,84 @@
# encoding: utf-8
require "puma"
require 'puma/single'
require 'puma/binder'
require 'puma/configuration'
require 'puma/commonlogger'
module LogStash
class WebServer
extend Forwardable
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events
def_delegator :@runner, :stats
def initialize(logger, options={})
@logger = logger
@options = {}
@cli_options = options.merge({ :rackup => ::File.join(::File.dirname(__FILE__), "api", "init.ru") })
@status = nil
parse_options
@runner = nil
@events = ::Puma::Events.strings
@binder = ::Puma::Binder.new(@events)
@binder.import_from_env
set_environment
end
def run
log "=== puma start: #{Time.now} ==="
@runner = Puma::Single.new(self)
@status = :run
@runner.run
stop(:graceful => true)
end
def log(str)
logger.debug(str) if logger.debug?
end
def error(str)
logger.error(str) if logger.error?
end
# Empty method, this method is required because of the puma usage we make through
# the Single interface, https://github.com/puma/puma/blob/master/lib/puma/single.rb#L82
# for more details. This can always be implemented when we want to keep track of this
# bit of data.
def write_state; end
def stop(options={})
graceful = options.fetch(:graceful, true)
if graceful
@runner.stop_blocked
else
@runner.stop
end
@status = :stop
log "=== puma shutdown: #{Time.now} ==="
end
private
def env
@options[:debug] ? "development" : "production"
end
def set_environment
@options[:environment] = env
ENV['RACK_ENV'] = env
end
def parse_options
@config = ::Puma::Configuration.new(cli_options)
@config.load
@options = @config.options
end
end; end

View file

@ -25,8 +25,12 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "clamp", "~> 0.6.5" #(MIT license) for command line args/flags
gem.add_runtime_dependency "filesize", "0.0.4" #(MIT license) for :bytes config validator
gem.add_runtime_dependency "gems", "~> 0.8.3" #(MIT license)
gem.add_runtime_dependency "concurrent-ruby", "0.9.2"
gem.add_runtime_dependency "concurrent-ruby", "1.0.0"
gem.add_runtime_dependency "sinatra", '~> 1.4', '>= 1.4.6'
gem.add_runtime_dependency "puma", '~> 2.15', '>= 2.15.3'
gem.add_runtime_dependency "jruby-openssl", "0.9.13" # Required to support TLSv1.2
gem.add_runtime_dependency "chronic_duration", "0.10.6"
gem.add_runtime_dependency "jruby-monitoring", '~> 0.1'
# TODO(sissel): Treetop 1.5.x doesn't seem to work well, but I haven't
# investigated what the cause might be. -Jordan

View file

@ -0,0 +1,42 @@
{
"heap": {
"used_in_bytes": 1,
"committed_in_bytes": 2,
"max_in_bytes": 3,
"peak_used_in_bytes": 4,
"peak_max_in_bytes": 5,
"used_percent": 7
},
"non_heap": {
"used_in_bytes": 1,
"committed_in_bytes": 2,
"max_in_bytes": 3,
"peak_used_in_bytes": 4,
"peak_max_in_bytes": 5
},
"pools": {
"young": {
"used_in_bytes": 1,
"committed_in_bytes": 2,
"max_in_bytes": 3,
"peak_used_in_bytes": 4,
"peak_max_in_bytes": 5
},
"old": {
"used_in_bytes": 1,
"committed_in_bytes": 2,
"max_in_bytes": 3,
"peak_used_in_bytes": 4,
"peak_max_in_bytes": 5
},
"survivor": {
"used_in_bytes": 1,
"committed_in_bytes": 2,
"max_in_bytes": 3,
"peak_used_in_bytes": 4,
"peak_max_in_bytes": 5
}
}
}

View file

@ -0,0 +1,33 @@
# encoding: utf-8
require_relative "../../spec_helper"
require "sinatra"
require "app/root"
require "logstash/json"
describe LogStash::Api::Root do
include Rack::Test::Methods
def app()
described_class
end
let(:body) { LogStash::Json.load(last_response.body) }
before(:each) do
get "/"
end
it "should respond to root resource" do
expect(last_response).to be_ok
end
it "contain a hostname" do
expect(body).to include("hostname" => a_kind_of(String))
end
it "contain a version number" do
expect(body).to include("version" => a_kind_of(String) )
end
end

View file

@ -0,0 +1,34 @@
# encoding: utf-8
require_relative "../../spec_helper"
require "sinatra"
require "app/stats"
describe LogStash::Api::Stats do
include Rack::Test::Methods
def app()
described_class
end
it "respond to the events resource" do
get "/events"
expect(last_response).to be_ok
end
context "jvm" do
let(:type) { "jvm" }
it "respond to the hot_threads resource" do
get "#{type}/hot_threads"
expect(last_response).to be_ok
end
it "respond to the memory resource" do
get "#{type}/memory"
expect(last_response).to be_ok
end
end
end

View file

@ -0,0 +1,28 @@
# encoding: utf-8
require_relative "../../spec_helper"
require "app/stats/events_command"
require 'ostruct'
describe LogStash::Api::StatsEventsCommand do
let(:service) { double("snapshot-service") }
subject { described_class.new(service) }
let(:stats) do
{ :base => { :events_in => OpenStruct.new(:value => 100), :events_filtered => OpenStruct.new(:value => 200) } }
end
before(:each) do
allow(service).to receive(:get).with(:events_stats).and_return(stats)
end
context "#schema" do
let(:report) { subject.run }
it "return events information" do
expect(report).to include({:in => 100, :dropped => 200 })
end
end
end

View file

@ -0,0 +1,58 @@
# encoding: utf-8
require_relative "../../spec_helper"
require "app/stats/hotthreads_command"
require "app/stats/memory_command"
describe "JVM stats" do
describe LogStash::Api::HotThreadsCommand do
context "#schema" do
let(:report) { subject.run }
it "return hot threads information" do
expect(report).not_to be_empty
end
end
end
describe LogStash::Api::JvmMemoryCommand do
context "#schema" do
let(:service) { double("snapshot-service") }
subject { described_class.new(service) }
let(:stats) do
read_fixture("memory.json")
end
before(:each) do
allow(service).to receive(:get).with(:jvm_memory_stats).and_return(stats)
end
let(:report) do
subject.run
end
it "return hot threads information" do
expect(report).not_to be_empty
end
it "return heap information" do
expect(report.keys).to include(:heap)
end
it "return non heap information" do
expect(report.keys).to include(:non_heap)
end
end
end
end

View file

@ -0,0 +1,49 @@
# encoding: utf-8
ROOT = File.expand_path(File.join(File.dirname(__FILE__), "..", "..", "lib", "logstash", "api"))
$LOAD_PATH.unshift File.join(ROOT, 'lib')
Dir.glob(File.join(ROOT, "lib" "**")).each{ |d| $LOAD_PATH.unshift(d) }
require "logstash/devutils/rspec/spec_helper"
require 'rack/test'
require 'rspec'
require "json"
ENV['RACK_ENV'] = 'test'
Rack::Builder.parse_file(File.join(ROOT, 'init.ru'))
def read_fixture(name)
path = File.join(File.dirname(__FILE__), "fixtures", name)
HashWithIndifferentAccess.new(JSON.parse(File.read(path)))
end
class HashWithIndifferentAccess
extend Forwardable
def_delegators :@hash, :inject, :keys
def initialize(hash)
@hash = hash
end
def [](key)
v = @hash[key.to_s]
if (v.is_a?(Hash))
return HashWithIndifferentAccess.new(v)
end
return OpenStruct.new(:value => v)
end
def marshal_dump
h = {}
@hash.each_pair do |k, v|
if (!v.is_a?(Hash))
h[k] = OpenStruct.new(:value => v)
else
h[k] = HashWithIndifferentAccess.new(v)
end
end
HashWithIndifferentAccess.new(h)
end
end

View file

@ -182,6 +182,17 @@ describe LogStash::Agent do
fetched_config = subject.send(:fetch_config, settings)
expect(fetched_config.strip).to eq(cli_config + IO.read(tmp_config_path))
end
end
context "#started_at" do
it "return the start time when the agent is started" do
expect(subject.started_at).to be_kind_of(Time)
end
end
context "#uptime" do
it "return the number of milliseconds since start time" do
expect(subject.uptime).to be >= 0
end
end
end

View file

@ -0,0 +1,49 @@
# encoding: utf-8
require "logstash/inputs/metrics"
require "spec_helper"
describe LogStash::Inputs::Metrics do
let(:queue) { [] }
describe "#run" do
it "should register itself to the collector observer" do
expect(LogStash::Instrument::Collector.instance).to receive(:add_observer).with(subject)
t = Thread.new { subject.run(queue) }
sleep(0.1) # give a bit of time to the thread to start
subject.stop
end
end
describe "#update" do
let(:namespaces) { [:root, :base] }
let(:key) { :foo }
let(:metric_store) { LogStash::Instrument::MetricStore.new }
it "should fill up the queue with received events" do
Thread.new { subject.run(queue) }
sleep(0.1)
subject.stop
metric_store.fetch_or_store(namespaces, key, LogStash::Instrument::MetricType::Counter.new(namespaces, key))
subject.update(LogStash::Instrument::Snapshot.new(metric_store))
expect(queue.count).to eq(1)
end
end
describe "#stop" do
it "should remove itself from the the collector observer" do
expect(LogStash::Instrument::Collector.instance).to receive(:delete_observer).with(subject)
t = Thread.new { subject.run(queue) }
sleep(0.1) # give a bit of time to the thread to start
subject.stop
end
it "should unblock the input" do
t = Thread.new { subject.run(queue) }
sleep(0.1) # give a bit of time to the thread to start
subject.do_stop
wait_for { t.status }.to be_falsey
end
end
end

View file

@ -0,0 +1,49 @@
# encoding: utf-8
require "logstash/instrument/collector"
require "spec_helper"
describe LogStash::Instrument::Collector do
subject { LogStash::Instrument::Collector.instance }
describe "#push" do
let(:namespaces_path) { [:root, :pipelines, :pipelines01] }
let(:key) { :my_key }
context "when the `MetricType` exist" do
it "store the metric of type `counter`" do
subject.push(namespaces_path, key, :counter, :increment)
end
end
context "when the `MetricType` doesn't exist" do
let(:wrong_type) { :donotexist }
it "logs an error but dont crash" do
expect(subject.logger).to receive(:error)
.with("Collector: Cannot create concrete class for this metric type",
hash_including({ :type => wrong_type, :namespaces_path => namespaces_path }))
subject.push(namespaces_path, key, wrong_type, :increment)
end
end
context "when there is a conflict with the metric key" do
let(:conflicting_namespaces) { [namespaces_path, key].flatten }
it "logs an error but dont crash" do
subject.push(namespaces_path, key, :counter, :increment)
expect(subject.logger).to receive(:error)
.with("Collector: Cannot record metric",
hash_including({ :exception => instance_of(LogStash::Instrument::MetricStore::NamespacesExpectedError) }))
subject.push(conflicting_namespaces, :random_key, :counter, :increment)
end
end
end
describe "#snapshot_metric" do
it "return a `LogStash::Instrument::MetricStore`" do
expect(subject.snapshot_metric).to be_kind_of(LogStash::Instrument::Snapshot)
end
end
end

View file

@ -0,0 +1,110 @@
# encoding: utf-8
require "logstash/instrument/metric"
require "logstash/instrument/collector"
require_relative "../../support/matchers"
require "spec_helper"
describe LogStash::Instrument::Metric do
let(:collector) { [] }
let(:namespace) { :root }
subject { LogStash::Instrument::Metric.new(collector) }
context "#increment" do
it "a counter by 1" do
metric = subject.increment(:root, :error_rate)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :increment, 1)
end
it "a counter by a provided value" do
metric = subject.increment(:root, :error_rate, 20)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :increment, 20)
end
it "raises an exception if the key is an empty string" do
expect { subject.increment(:root, "", 20) }.to raise_error(LogStash::Instrument::MetricNoKeyProvided)
end
it "raise an exception if the key is nil" do
expect { subject.increment(:root, nil, 20) }.to raise_error(LogStash::Instrument::MetricNoKeyProvided)
end
end
context "#decrement" do
it "a counter by 1" do
metric = subject.decrement(:root, :error_rate)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :decrement, 1)
end
it "a counter by a provided value" do
metric = subject.decrement(:root, :error_rate, 20)
expect(collector).to be_a_metric_event([:root, :error_rate], :counter, :decrement, 20)
end
it "raises an exception if the key is an empty string" do
expect { subject.decrement(:root, "", 20) }.to raise_error(LogStash::Instrument::MetricNoKeyProvided)
end
it "raise an exception if the key is nil" do
expect { subject.decrement(:root, nil, 20) }.to raise_error(LogStash::Instrument::MetricNoKeyProvided)
end
end
context "#gauge" do
it "set the value of a key" do
metric = subject.gauge(:root, :size_queue, 20)
expect(collector).to be_a_metric_event([:root, :size_queue], :gauge, :set, 20)
end
it "raises an exception if the key is an empty string" do
expect { subject.gauge(:root, "", 20) }.to raise_error(LogStash::Instrument::MetricNoKeyProvided)
end
it "raise an exception if the key is nil" do
expect { subject.gauge(:root, nil, 20) }.to raise_error(LogStash::Instrument::MetricNoKeyProvided)
end
end
context "#time" do
let(:sleep_time) { 2 }
let(:sleep_time_ms) { sleep_time * 1_000_000 }
it "records the duration" do
subject.time(:root, :duration_ms) { sleep(sleep_time) }
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 5000)
expect(collector[0]).to match(:root)
expect(collector[1]).to be(:duration_ms)
expect(collector[2]).to be(:mean)
end
it "returns the value of the executed block" do
expect(subject.time(:root, :testing) { "hello" }).to eq("hello")
end
it "return a TimedExecution" do
execution = subject.time(:root, :duration_ms)
sleep(sleep_time)
execution.stop
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 0.1)
expect(collector[0]).to match(:root)
expect(collector[1]).to be(:duration_ms)
expect(collector[2]).to be(:mean)
end
end
context "#namespace" do
let(:sub_key) { :my_sub_key }
it "creates a new metric object and append the `sub_key` to the `base_key`" do
expect(subject.namespace(sub_key).namespace_name).to eq([sub_key])
end
it "uses the same collector as the creator class" do
child = subject.namespace(sub_key)
metric = child.increment(:error_rate)
expect(collector).to be_a_metric_event([sub_key, :error_rate], :counter, :increment, 1)
end
end
end

View file

@ -0,0 +1,163 @@
# encoding: utf-8
require "logstash/instrument/metric_store"
require "logstash/instrument/metric_type/base"
describe LogStash::Instrument::MetricStore do
let(:namespaces) { [ :root, :pipelines, :pipeline_01 ] }
let(:key) { :events_in }
let(:counter) { LogStash::Instrument::MetricType::Counter.new(namespaces, key) }
context "when the metric object doesn't exist" do
it "store the object" do
expect(subject.fetch_or_store(namespaces, key, counter)).to eq(counter)
end
it "support a block as argument" do
expect(subject.fetch_or_store(namespaces, key) { counter }).to eq(counter)
end
end
context "when the metric object exist in the namespace" do
let(:new_counter) { LogStash::Instrument::MetricType::Counter.new(namespaces, key) }
it "return the object" do
subject.fetch_or_store(namespaces, key, counter)
expect(subject.fetch_or_store(namespaces, key, new_counter)).to eq(counter)
end
end
context "when the namespace end node isn't a map" do
let(:conflicting_namespaces) { [:root, :pipelines, :pipeline_01, :events_in] }
it "raise an exception" do
subject.fetch_or_store(namespaces, key, counter)
expect { subject.fetch_or_store(conflicting_namespaces, :new_key, counter) }.to raise_error(LogStash::Instrument::MetricStore::NamespacesExpectedError)
end
end
context "retrieving events" do
let(:metric_events) {
[
[[:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch"], :event_in, :increment],
[[:node, :sashimi, :pipelines, :pipeline01], :processed_events_in, :increment],
[[:node, :sashimi, :pipelines, :pipeline01], :processed_events_out, :increment],
[[:node, :sashimi, :pipelines, :pipeline02], :processed_events_out, :increment],
]
}
before :each do
# Lets add a few metrics in the store before trying to find them
metric_events.each do |namespaces, metric_key, action|
metric = subject.fetch_or_store(namespaces, metric_key, LogStash::Instrument::MetricType::Counter.new(namespaces, metric_key))
metric.execute(action)
end
end
describe "#get" do
context "when the path exist" do
it "retrieves end of of a branch" do
metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => anything)))))))
end
it "retrieves branch" do
metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01)
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything)))))
end
it "allow to retrieve a specific metrics" do
metrics = subject.get(:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch", :event_in)
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => a_hash_including(:event_in => be_kind_of(LogStash::Instrument::MetricType::Base)))))))))
end
context "with filtered keys" do
it "allows to retrieve multiple keys on the same level" do
metrics = subject.get(:node, :sashimi, :pipelines, :"pipeline01,pipeline02")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything, :pipeline02 => anything)))))
end
it "supports space in the keys" do
metrics = subject.get(:node, :sashimi, :pipelines, :"pipeline01, pipeline02 ")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything, :pipeline02 => anything)))))
end
it "retrieves only the requested keys" do
metrics = subject.get(:node, :sashimi, :pipelines, :"pipeline01,pipeline02", :processed_events_in)
expect(metrics[:node][:sashimi][:pipelines].keys).to include(:pipeline01, :pipeline02)
end
end
context "when the path doesnt exist" do
it "raise an exception" do
expect { subject.get(:node, :sashimi, :dontexist) }.to raise_error(LogStash::Instrument::MetricStore::MetricNotFound, /dontexist/)
end
end
end
describe "#get_with_path" do
context "when the path exist" do
it "removes the first `/`" do
metrics = subject.get_with_path("/node/sashimi/")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => anything)))
end
it "retrieves end of of a branch" do
metrics = subject.get_with_path("node/sashimi/pipelines/pipeline01/plugins/logstash-output-elasticsearch")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => anything)))))))
end
it "retrieves branch" do
metrics = subject.get_with_path("node/sashimi/pipelines/pipeline01")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything)))))
end
it "allow to retrieve a specific metrics" do
metrics = subject.get_with_path("node/sashimi/pipelines/pipeline01/plugins/logstash-output-elasticsearch/event_in")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => a_hash_including(:plugins => a_hash_including(:"logstash-output-elasticsearch" => a_hash_including(:event_in => be_kind_of(LogStash::Instrument::MetricType::Base)))))))))
end
context "with filtered keys" do
it "allows to retrieve multiple keys on the same level" do
metrics = subject.get_with_path("node/sashimi/pipelines/pipeline01,pipeline02/plugins/logstash-output-elasticsearch/event_in")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything, :pipeline02 => anything)))))
end
it "supports space in the keys" do
metrics = subject.get_with_path("node/sashimi/pipelines/pipeline01, pipeline02 /plugins/logstash-output-elasticsearch/event_in")
expect(metrics).to match(a_hash_including(:node => a_hash_including(:sashimi => a_hash_including(:pipelines => a_hash_including(:pipeline01 => anything, :pipeline02 => anything)))))
end
it "retrieves only the requested keys" do
metrics = subject.get(:node, :sashimi, :pipelines, :"pipeline01,pipeline02", :processed_events_in)
expect(metrics[:node][:sashimi][:pipelines].keys).to include(:pipeline01, :pipeline02)
end
end
end
end
context "when the path doesnt exist" do
it "raise an exception" do
expect { subject.get_with_path("node/sashimi/dontexist, pipeline02 /plugins/logstash-output-elasticsearch/event_in") }.to raise_error(LogStash::Instrument::MetricStore::MetricNotFound, /dontexist/)
end
end
end
describe "#each" do
it "retrieves all the metric" do
expect(subject.each.size).to eq(metric_events.size)
end
it "returns metric types" do
metrics = []
subject.each { |i| metrics << i }
expect(metrics.size).to eq(metric_events.size)
end
it "retrieves all the metrics from a specific branch" do
metrics = []
subject.each("node/sashimi/pipelines/pipeline01") { |i| metrics << i }
expect(metrics.size).to eq(3)
end
end
end
end

View file

@ -0,0 +1,40 @@
# encoding: utf-8
require "logstash/instrument/metric_type/counter"
require "spec_helper"
describe LogStash::Instrument::MetricType::Counter do
let(:namespaces) { [:root, :pipelines, :pipeline_01] }
let(:key) { :mykey }
subject { LogStash::Instrument::MetricType::Counter.new(namespaces, key) }
describe "#increment" do
it "increment the counter" do
expect{ subject.increment }.to change { subject.value }.by(1)
end
end
describe "#decrement" do
it "decrement the counter" do
expect{ subject.decrement }.to change { subject.value }.by(-1)
end
end
context "When serializing to JSON" do
it "serializes the value" do
expect(LogStash::Json.dump(subject)).to eq("0")
end
end
context "When creating a hash " do
it "creates the hash from all the values" do
metric_hash = {
"key" => key,
"namespaces" => namespaces,
"value" => 0,
"type" => "counter"
}
expect(subject.to_hash).to match(metric_hash)
end
end
end

View file

@ -0,0 +1,40 @@
# encoding: utf-8
require "logstash/instrument/metric_type/gauge"
require "logstash/json"
require "spec_helper"
describe LogStash::Instrument::MetricType::Gauge do
let(:namespaces) { [:root, :pipelines, :pipeline_01] }
let(:key) { :mykey }
let(:value) { "hello" }
subject { described_class.new(namespaces, key) }
before :each do
subject.execute(:set, value)
end
describe "#execute" do
it "set the value of the gauge" do
expect(subject.value).to eq(value)
end
end
context "When serializing to JSON" do
it "serializes the value" do
expect(LogStash::Json.dump(subject)).to eq("\"#{value}\"")
end
end
context "When creating a hash " do
it "creates the hash from all the values" do
metric_hash = {
"key" => key,
"namespaces" => namespaces,
"value" => value,
"type" => "gauge"
}
expect(subject.to_hash).to match(metric_hash)
end
end
end

View file

@ -0,0 +1,25 @@
# encoding: utf-8
require "logstash/instrument/namespaced_metric"
require "logstash/instrument/metric"
require_relative "../../support/matchers"
require "spec_helper"
describe LogStash::Instrument::NamespacedMetric do
let(:namespace) { :stats }
let(:collector) { [] }
let(:metric) { LogStash::Instrument::Metric.new(collector) }
subject { described_class.new(metric, namespace) }
it "defines the same interface as `Metric`" do
expect(described_class).to implement_interface_of(LogStash::Instrument::Metric)
end
it "returns a TimedException when we call without a block" do
expect(subject.time(:duration_ms)).to be_kind_of(LogStash::Instrument::Metric::TimedExecution)
end
it "returns the value of the block" do
expect(subject.time(:duration_ms) { "hello" }).to eq("hello")
end
end

View file

@ -0,0 +1,21 @@
# encoding: utf-8
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_metric"
require_relative "../../support/matchers"
describe LogStash::Instrument::NullMetric do
it "defines the same interface as `Metric`" do
expect(described_class).to implement_interface_of(LogStash::Instrument::NamespacedMetric)
end
describe "#time" do
it "returns the value of the block without recording any metrics" do
expect(subject.time(:execution_time) { "hello" }).to eq("hello")
end
it "return a TimedExecution" do
execution = subject.time(:do_something)
expect { execution.stop }.not_to raise_error
end
end
end

View file

@ -1,4 +1,5 @@
# encoding: utf-8
require "logstash/output_delegator"
require 'spec_helper'
describe LogStash::OutputDelegator do
@ -6,18 +7,20 @@ describe LogStash::OutputDelegator do
let(:events) { 7.times.map { LogStash::Event.new }}
let(:default_worker_count) { 1 }
subject { described_class.new(logger, out_klass, default_worker_count) }
subject { described_class.new(logger, out_klass, default_worker_count, LogStash::Instrument::NullMetric.new) }
context "with a plain output plugin" do
let(:out_klass) { double("output klass") }
let(:out_inst) { double("output instance") }
before do
before(:each) do
allow(out_klass).to receive(:new).with(any_args).and_return(out_inst)
allow(out_klass).to receive(:threadsafe?).and_return(false)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
allow(out_inst).to receive(:register)
allow(out_inst).to receive(:multi_receive)
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
allow(logger).to receive(:debug).with(any_args)
end
@ -56,6 +59,8 @@ describe LogStash::OutputDelegator do
before do
allow(out_klass).to receive(:threadsafe?).and_return(false)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
end
it "should instantiate multiple workers" do
@ -71,6 +76,8 @@ describe LogStash::OutputDelegator do
describe "threadsafe outputs" do
before do
allow(out_klass).to receive(:threadsafe?).and_return(true)
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
end

View file

@ -17,6 +17,21 @@ class DummyInput < LogStash::Inputs::Base
end
end
class DummyInputGenerator < LogStash::Inputs::Base
config_name "dummyinputgenerator"
milestone 2
def register
end
def run(queue)
queue << Logstash::Event.new while !stop?
end
def close
end
end
class DummyCodec < LogStash::Codecs::Base
config_name "dummycodec"
milestone 2
@ -53,10 +68,14 @@ class DummyOutput < LogStash::Outputs::Base
end
def close
@num_closes += 1
@num_closes = 1
end
end
class DummyOutputMore < DummyOutput
config_name "dummyoutputmore"
end
class DummyFilter < LogStash::Filters::Base
config_name "dummyfilter"
milestone 2
@ -120,8 +139,7 @@ describe LogStash::Pipeline do
context "when there is no command line -w N set" do
it "starts one filter thread" do
msg = "Defaulting pipeline worker threads to 1 because there are some" +
" filters that might not work with multiple worker threads"
msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads"
pipeline = TestPipeline.new(test_config_with_filters)
expect(pipeline.logger).to receive(:warn).with(msg,
{:count_was=>worker_thread_count, :filters=>["dummyfilter"]})
@ -132,8 +150,7 @@ describe LogStash::Pipeline do
context "when there is command line -w N set" do
it "starts multiple filter thread" do
msg = "Warning: Manual override - there are filters that might" +
" not work with multiple worker threads"
msg = "Warning: Manual override - there are filters that might not work with multiple worker threads"
pipeline = TestPipeline.new(test_config_with_filters)
expect(pipeline.logger).to receive(:warn).with(msg,
{:worker_threads=> override_thread_count, :filters=>["dummyfilter"]})
@ -356,6 +373,39 @@ describe LogStash::Pipeline do
end
end
context "metrics" do
config <<-CONFIG
input { }
filter { }
output { }
CONFIG
it "uses a `NullMetric` object if no metric is given" do
pipeline = LogStash::Pipeline.new(config)
expect(pipeline.metric).to be_kind_of(LogStash::Instrument::NullMetric)
end
end
context "Multiples pipelines" do
before do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinputgenerator").and_return(DummyInputGenerator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore)
end
let(:pipeline1) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
let(:pipeline2) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutputmore {}}") }
it "should handle evaluating different config" do
expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline2.output_func(LogStash::Event.new)).not_to include(nil)
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
end
end
context "Periodic Flush" do
let(:number_of_events) { 100 }
let(:config) do
@ -414,8 +464,8 @@ describe LogStash::Pipeline do
it "should handle evaluating different config" do
# When the functions are compiled from the AST it will generate instance
# variables that are unique to the actual config, the intance are pointing
# to conditionals/plugins.
# variables that are unique to the actual config, the intances are pointing
# to conditionals and/or plugins.
#
# Before the `defined_singleton_method`, the definition of the method was
# not unique per class, but the `instance variables` were unique per class.
@ -429,4 +479,53 @@ describe LogStash::Pipeline do
expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil)
end
end
context "#started_at" do
let(:config) do
<<-EOS
input {
generator {}
}
EOS
end
subject { described_class.new(config) }
it "returns nil when the pipeline isnt started" do
expect(subject.started_at).to be_nil
end
it "return when the pipeline started working" do
t = Thread.new { subject.run }
sleep(0.1)
expect(subject.started_at).to be < Time.now
t.kill rescue nil
end
end
context "#uptime" do
let(:config) do
<<-EOS
input {
generator {}
}
EOS
end
subject { described_class.new(config) }
context "when the pipeline is not started" do
it "returns 0" do
expect(subject.uptime).to eq(0)
end
end
context "when the pipeline is started" do
it "return the duration in milliseconds" do
t = Thread.new { subject.run }
sleep(0.1)
expect(subject.uptime).to be > 0
t.kill rescue nil
end
end
end
end

View file

@ -1,6 +1,10 @@
# encoding: utf-8
require "spec_helper"
require "logstash/plugin"
require "logstash/outputs/base"
require "logstash/codecs/base"
require "logstash/inputs/base"
require "logstash/filters/base"
describe LogStash::Plugin do
it "should fail lookup on inexisting type" do
@ -166,4 +170,56 @@ describe LogStash::Plugin do
end
end
describe "#id" do
plugin_types = [
LogStash::Filters::Base,
LogStash::Codecs::Base,
LogStash::Outputs::Base,
LogStash::Inputs::Base
]
plugin_types.each do |plugin_type|
let(:plugin) do
Class.new(plugin_type) do
config_name "simple_plugin"
config :host, :validate => :string
config :export, :validte => :boolean
def register; end
end
end
let(:config) do
{
"host" => "127.0.0.1",
"export" => true
}
end
subject { plugin.new(config) }
context "plugin type is #{plugin_type}" do
context "when there is not ID configured for the output" do
it "it uses a UUID to identify this plugins" do
expect(subject.id).not_to eq(nil)
end
it "will be different between instance of plugins" do
expect(subject.id).not_to eq(plugin.new(config).id)
end
end
context "When a user provide an ID for the plugin" do
let(:id) { "ABC" }
let(:config) { super.merge("id" => id) }
it "uses the user provided ID" do
expect(subject.id).to eq(id)
end
end
end
end
end
end

View file

@ -3,6 +3,7 @@ require "spec_helper"
require "logstash/runner"
require "stud/task"
require "stud/trap"
require "logstash/util/java_version"
class NullRunner
def run(args); end
@ -40,6 +41,13 @@ describe LogStash::Runner do
context "with no arguments" do
let(:args) { [] }
let(:agent) { double("agent") }
before(:each) do
allow(LogStash::Agent).to receive(:new).and_return(agent)
allow(LogStash::Util::JavaVersion).to receive(:warn_on_bad_java_version)
end
it "should show help" do
expect($stderr).to receive(:puts).once
expect(subject).to receive(:signal_usage_error).once.and_call_original
@ -93,6 +101,7 @@ describe LogStash::Runner do
let(:pipeline) { double("pipeline") }
before(:each) do
allow_any_instance_of(LogStash::Agent).to receive(:execute).and_return(true)
task = Stud::Task.new { 1 }
allow(pipeline).to receive(:run).and_return(task)
allow(pipeline).to receive(:shutdown)
@ -101,6 +110,8 @@ describe LogStash::Runner do
context "when :pipeline_workers is not defined by the user" do
it "should not pass the value to the pipeline" do
expect(LogStash::Pipeline).to receive(:new).once.with(pipeline_string, hash_excluding(:pipeline_workers)).and_return(pipeline)
expect(LogStash::Pipeline).to receive(:new).with(anything, hash_including(:pipeline_id => :metric)).and_return(pipeline)
args = ["-e", pipeline_string]
subject.run("bin/logstash", args)
end
@ -110,6 +121,10 @@ describe LogStash::Runner do
it "should pass the value to the pipeline" do
main_pipeline_settings[:pipeline_workers] = 2
expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, hash_including(main_pipeline_settings)).and_return(pipeline)
base_pipeline_settings[:pipeline_workers] = 2
expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, hash_including(:pipeline_id => "base", :pipeline_workers => 2, :metric => anything)).and_return(pipeline)
expect(LogStash::Pipeline).to receive(:new).with(anything, hash_including(:pipeline_id => :metric)).and_return(pipeline)
args = ["-w", "2", "-e", pipeline_string]
subject.run("bin/logstash", args)
end

View file

@ -0,0 +1,11 @@
# encoding: utf-8
require "logstash/util/duration_formatter"
require "spec_helper"
describe LogStash::Util::DurationFormatter do
let(:duration) { 3600 * 1000 } # in milliseconds
it "returns a human format" do
expect(subject.human_format(duration)).to eq("1h")
end
end

View file

@ -3,8 +3,18 @@ require 'spec_helper'
require "logstash/util"
class ClassNameTest
end
module TestingClassName
class TestKlass
end
end
describe LogStash::Util do
subject { described_class }
context "stringify_keys" do
it "should convert hash symbol keys to strings" do
expect(LogStash::Util.stringify_symbols({:a => 1, "b" => 2})).to eq({"a" => 1, "b" => 2})
@ -32,4 +42,22 @@ describe LogStash::Util do
expect(LogStash::Util.stringify_symbols([:a, [1, :b]])).to eq(["a", [1, "b"]])
end
end
describe ".class_name" do
context "when the class is a top level class" do
let(:klass) { ClassNameTest.new }
it "returns the name of the class" do
expect(subject.class_name(klass)).to eq("ClassNameTest")
end
end
context "when the class is nested inside modules" do
let(:klass) { TestingClassName::TestKlass.new }
it "returns the name of the class" do
expect(subject.class_name(klass)).to eq("TestKlass")
end
end
end
end

View file

@ -0,0 +1,30 @@
# encoding: utf-8
require "rspec"
require "rspec/expectations"
RSpec::Matchers.define :be_a_metric_event do |namespace, type, *args|
match do
namespace == Array(actual[0]).concat(Array(actual[1])) &&
type == actual[2] &&
args == actual[3..-1]
end
end
# Match to test `NullObject` pattern
RSpec::Matchers.define :implement_interface_of do |type, key, value|
match do |actual|
all_instance_methods_implemented?
end
def missing_methods
expected.instance_methods.select { |method| !actual.instance_methods.include?(method) }
end
def all_instance_methods_implemented?
expected.instance_methods.all? { |method| actual.instance_methods.include?(method) }
end
failure_message do
"Expecting `#{expected}` to implements instance methods of `#{actual}`, missing methods: #{missing_methods.join(",")}"
end
end