Fixes #6128
This commit is contained in:
Tal Levy 2016-10-25 23:57:36 -07:00
parent e8f32f23fb
commit 19d3232873
13 changed files with 380 additions and 8 deletions

View file

@ -35,6 +35,49 @@ appender.json_rolling.layout.type = JSONLayout
appender.json_rolling.layout.compact = true
appender.json_rolling.layout.eventEol = true
rootLogger.level = ${sys:ls.log.level}
rootLogger.appenderRef.console.ref = ${sys:ls.log.format}_console
rootLogger.appenderRef.rolling.ref = ${sys:ls.log.format}_rolling
# Slowlog
appender.console_slowlog.type = Console
appender.console_slowlog.name = plain_console_slowlog
appender.console_slowlog.layout.type = PatternLayout
appender.console_slowlog.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
appender.json_console_slowlog.type = Console
appender.json_console_slowlog.name = json_console_slowlog
appender.json_console_slowlog.layout.type = JSONLayout
appender.json_console_slowlog.layout.compact = true
appender.json_console_slowlog.layout.eventEol = true
appender.rolling_slowlog.type = RollingFile
appender.rolling_slowlog.name = plain_rolling_slowlog
appender.rolling_slowlog.fileName = ${sys:ls.logs}/logstash-slowlog-${sys:ls.log.format}.log
appender.rolling_slowlog.filePattern = ${sys:ls.logs}/logstash-slowlog-${sys:ls.log.format}-%d{yyyy-MM-dd}.log
appender.rolling_slowlog.policies.type = Policies
appender.rolling_slowlog.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling_slowlog.policies.time.interval = 1
appender.rolling_slowlog.policies.time.modulate = true
appender.rolling_slowlog.layout.type = PatternLayout
appender.rolling_slowlog.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %.10000m%n
appender.json_rolling_slowlog.type = RollingFile
appender.json_rolling_slowlog.name = json_rolling_slowlog
appender.json_rolling_slowlog.fileName = ${sys:ls.logs}/logstash-slowlog-${sys:ls.log.format}.log
appender.json_rolling_slowlog.filePattern = ${sys:ls.logs}/logstash-slowlog-${sys:ls.log.format}-%d{yyyy-MM-dd}.log
appender.json_rolling_slowlog.policies.type = Policies
appender.json_rolling_slowlog.policies.time.type = TimeBasedTriggeringPolicy
appender.json_rolling_slowlog.policies.time.interval = 1
appender.json_rolling_slowlog.policies.time.modulate = true
appender.json_rolling_slowlog.layout.type = JSONLayout
appender.json_rolling_slowlog.layout.compact = true
appender.json_rolling_slowlog.layout.eventEol = true
logger.slowlog.name = slowlog
logger.slowlog.level = trace
logger.slowlog.appenderRef.console_slowlog.ref = ${sys:ls.log.format}_console_slowlog
logger.slowlog.appenderRef.rolling_slowlog.ref = ${sys:ls.log.format}_rolling_slowlog
logger.slowlog.additivity = false

View file

@ -21,6 +21,38 @@ Logstash ships with a `log4j2.properties` file with out-of-the-box settings. You
rotation policy, type, and other https://logging.apache.org/log4j/2.x/manual/configuration.html#Loggers[log4j2 configuration].
You must restart Lostash to apply any changes that you make to this file.
==== Slowlog
Slow-log for Logstash adds the ability to log when a specific event takes an abnormal amount of time to make its way
through the pipeline. Just like the normal application log, you can find slow-logs in your `--path.logs` directory.
Slowlog is configured in the `logstash.yml` settings file with the following options:
------------------------------
[source]
slowlog.threshold.warn (default: -1)
slowlog.threshold.info (default: -1)
slowlog.threshold.debug (default: -1)
slowlog.threshold.trace (default: -1)
------------------------------
By default, these values are set to `-1nanos` to represent an infinite threshold where no slowlog will be invoked. These `slowlog.threshold`
fields are configured using a time-value format which enables a wide range of trigger intervals. The positive numeric ranges
can be specified using the following time units: `nanos` (nanoseconds), `micros` (microseconds), `ms` (milliseconds), `s` (second), `m` (minute),
`h` (hour), `d` (day).
Here is an example:
------------------------------
[source]
slowlog.threshold.warn: 2s
slowlog.threshold.info 1s
slowlog.threshold.debug 500ms
slowlog.threshold.trace 100ms
------------------------------
In the above configuration, events that take longer than two seconds to be processed within a filter will be logged.
The logs will include the full event and filter configuration that are responsible for the slowness.
==== Logging APIs
You could modify the `log4j2.properties` file and restart your Logstash, but that is both tedious and leads to unnecessary

View file

@ -43,6 +43,10 @@ module LogStash
Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]),
Setting::Bytes.new("queue.page_capacity", "250mb"),
Setting::Numeric.new("queue.max_events", 0), # 0 is unlimited
Setting::TimeValue.new("slowlog.threshold.warn", "-1"),
Setting::TimeValue.new("slowlog.threshold.info", "-1"),
Setting::TimeValue.new("slowlog.threshold.debug", "-1"),
Setting::TimeValue.new("slowlog.threshold.trace", "-1")
].each {|setting| SETTINGS.register(setting) }
# Compute the default queue path based on `path.data`

View file

@ -139,6 +139,14 @@ class LogStash::Filters::Base < LogStash::Plugin
raise "#{self.class}#filter must be overidden"
end # def filter
public
def do_filter(event, &block)
time = java.lang.System.nanoTime
filter(event, &block)
@slow_logger.on_event("event processing time", @original_params, event, java.lang.System.nanoTime - time)
end
# in 1.5.0 multi_filter is meant to be used in the generated filter function in LogStash::Config::AST::Plugin only
# and is temporary until we refactor the filter method interface to accept events list and return events list,
# just list in multi_filter see https://github.com/elastic/logstash/issues/2872.
@ -153,7 +161,7 @@ class LogStash::Filters::Base < LogStash::Plugin
events.each do |event|
unless event.cancelled?
result << event
filter(event){|new_event| result << new_event}
do_filter(event){|new_event| result << new_event}
end
end
result
@ -161,7 +169,7 @@ class LogStash::Filters::Base < LogStash::Plugin
public
def execute(event, &block)
filter(event, &block)
do_filter(event, &block)
end # def execute
public

View file

@ -3,11 +3,12 @@ require "uri"
module LogStash
module Logging
java_import org.apache.logging.log4j.Level
java_import org.apache.logging.log4j.LogManager
java_import org.apache.logging.log4j.core.config.Configurator
java_import org.apache.logging.log4j.core.config.DefaultConfiguration
class Logger
java_import org.apache.logging.log4j.Level
java_import org.apache.logging.log4j.LogManager
java_import org.apache.logging.log4j.core.config.Configurator
java_import org.apache.logging.log4j.core.config.DefaultConfiguration
@@config_mutex = Mutex.new
@@logging_context = nil
@ -90,5 +91,37 @@ module LogStash
return @@logging_context
end
end
class SlowLogger
def initialize(name, warn_threshold, info_threshold, debug_threshold, trace_threshold)
slowlog_name = ["slowlog", name].join('.')
@slowlogger = LogManager.getLogger(slowlog_name)
@warn_threshold = warn_threshold
@info_threshold = info_threshold
@debug_threshold = debug_threshold
@trace_threshold = trace_threshold
end
def as_data(plugin_params, event, took_in_nanos)
{
:plugin_params => plugin_params,
:took_in_nanos => took_in_nanos,
:took_in_millis => took_in_nanos / 1000,
:event => event.to_json
}
end
def on_event(message, plugin_params, event, took_in_nanos)
if @warn_threshold >= 0 and took_in_nanos > @warn_threshold
@slowlogger.warn(message, as_data(plugin_params, event, took_in_nanos))
elsif @info_threshold >= 0 and took_in_nanos > @info_threshold
@slowlogger.info(message, as_data(plugin_params, event, took_in_nanos))
elsif @debug_threshold >= 0 and took_in_nanos > @debug_threshold
@slowlogger.debug(message, as_data(plugin_params, event, took_in_nanos))
elsif @trace_threshold >= 0 and took_in_nanos > @trace_threshold
@slowlogger.trace(message, as_data(plugin_params, event, took_in_nanos))
end
end
end
end
end

View file

@ -46,6 +46,12 @@ class LogStash::Plugin
def initialize(params=nil)
@logger = self.logger
# need to access settings statically because plugins are initialized in config_ast with no context.
settings = LogStash::SETTINGS
@slow_logger = self.slow_logger(settings.get("slowlog.threshold.warn"),
settings.get("slowlog.threshold.info"),
settings.get("slowlog.threshold.debug"),
settings.get("slowlog.threshold.trace"))
@params = LogStash::Util.deep_clone(params)
# The id should always be defined normally, but in tests that might not be the case
# In the future we may make this more strict in the Plugin API

View file

@ -2,6 +2,7 @@
require "logstash/util/loggable"
require "fileutils"
require "logstash/util/byte_value"
require "logstash/util/time_value"
module LogStash
class Settings
@ -463,8 +464,20 @@ module LogStash
end
end
end
class TimeValue < Coercible
def initialize(name, default, strict=true, &validator_proc)
super(name, ::Fixnum, default, strict, &validator_proc)
end
def coerce(value)
return value if value.is_a?(::Fixnum)
Util::TimeValue.from_value(value).to_nanos
end
end
end
SETTINGS = Settings.new
end

View file

@ -5,15 +5,27 @@ require "logstash/namespace"
module LogStash module Util
module Loggable
def self.included(klass)
def klass.logger
def klass.log4j_name
ruby_name = self.name || self.class.name || self.class.to_s
log4j_name = ruby_name.gsub('::', '.').downcase
ruby_name.gsub('::', '.').downcase
end
def klass.logger
@logger ||= LogStash::Logging::Logger.new(log4j_name)
end
def klass.slow_logger(warn_threshold, info_threshold, debug_threshold, trace_threshold)
@slow_logger ||= LogStash::Logging::SlowLogger.new(log4j_name, warn_threshold, info_threshold, debug_threshold, trace_threshold)
end
def logger
self.class.logger
end
def slow_logger(warn_threshold, info_threshold, debug_threshold, trace_threshold)
self.class.slow_logger(warn_threshold, info_threshold, debug_threshold, trace_threshold)
end
end
end
end; end

View file

@ -0,0 +1,70 @@
module LogStash
module Util
class TimeValue
def initialize(duration, time_unit)
@duration = duration
@time_unit = time_unit
end
def self.from_value(value)
if value.is_a?(TimeValue)
TimeValue.new(value.duration, value.time_unit)
elsif value.is_a?(::String)
normalized = value.downcase.strip
if normalized.end_with?("nanos")
TimeValue.new(parse(normalized, 5), :nanosecond)
elsif normalized.end_with?("micros")
TimeValue.new(parse(normalized, 6), :microsecond)
elsif normalized.end_with?("ms")
TimeValue.new(parse(normalized, 2), :millisecond)
elsif normalized.end_with?("s")
TimeValue.new(parse(normalized, 1), :second)
elsif normalized.end_with?("m")
TimeValue.new(parse(normalized, 1), :minute)
elsif normalized.end_with?("h")
TimeValue.new(parse(normalized, 1), :hour)
elsif normalized.end_with?("d")
TimeValue.new(parse(normalized, 1), :day)
elsif normalized =~ /^-0*1/
TimeValue.new(-1, :nanosecond)
else
raise ArgumentError.new("invalid time unit: \"#{value}\"")
end
else
raise ArgumentError.new("value is not a string: #{value} [#{value.class}]")
end
end
def to_nanos
case @time_unit
when :day
86400000000000 * @duration
when :hour
3600000000000 * @duration
when :minute
60000000000 * @duration
when :second
1000000000 * @duration
when :millisecond
1000000 * @duration
when :microsecond
1000 * @duration
when :nanosecond
@duration
end
end
def ==(other)
self.duration == other.duration and self.time_unit == other.time_unit
end
def self.parse(value, suffix)
Integer(value[0..(value.size - suffix - 1)].strip)
end
private_class_method :parse
attr_reader :duration
attr_reader :time_unit
end
end
end

View file

@ -0,0 +1,31 @@
# encoding: utf-8
require "spec_helper"
require "logstash/settings"
describe LogStash::Setting::TimeValue do
subject { described_class.new("option", "-1") }
describe "#set" do
it "should coerce the default correctly" do
expect(subject.value).to eq(LogStash::Util::TimeValue.new(-1, :nanosecond).to_nanos)
end
context "when a value is given outside of possible_values" do
it "should raise an ArgumentError" do
expect { subject.set("invalid") }.to raise_error(ArgumentError)
end
end
context "when a value is given as a time value" do
it "should set the value" do
subject.set("18m")
expect(subject.value).to eq(LogStash::Util::TimeValue.new(18, :minute).to_nanos)
end
end
context "when a value is given as a nanosecond" do
it "should set the value" do
subject.set(5)
expect(subject.value).to eq(LogStash::Util::TimeValue.new(5, :nanosecond).to_nanos)
end
end
end
end

View file

@ -0,0 +1,59 @@
# encoding: utf-8
require "logstash/util/time_value"
require "spec_helper"
RSpec.shared_examples "coercion example" do |value, expected|
let(:value) { value }
let(:expected) { expected }
it 'coerces correctly' do
expect(LogStash::Util::TimeValue.from_value(value)).to eq(expected)
end
end
module LogStash module Util
describe TimeValue do
it_behaves_like "coercion example", TimeValue.new(100, :hour), TimeValue.new(100, :hour)
it_behaves_like "coercion example", "18nanos", TimeValue.new(18, :nanosecond)
it_behaves_like "coercion example", "18micros", TimeValue.new(18, :microsecond)
it_behaves_like "coercion example", "18ms", TimeValue.new(18, :millisecond)
it_behaves_like "coercion example", "18s", TimeValue.new(18, :second)
it_behaves_like "coercion example", "18m", TimeValue.new(18, :minute)
it_behaves_like "coercion example", "18h", TimeValue.new(18, :hour)
it_behaves_like "coercion example", "18d", TimeValue.new(18, :day)
it "coerces with a space between the duration and the unit" do
expected = TimeValue.new(18, :hour)
actual = TimeValue.from_value("18 h")
expect(actual).to eq(expected)
end
it "fails to coerce non-ints" do
begin
a = TimeValue.from_value("f18 nanos")
fail "should not parse"
rescue ArgumentError => e
expect(e.message).to eq("invalid value for Integer(): \"f18\"")
end
end
it "fails to coerce invalid units" do
begin
a = TimeValue.from_value("18xyz")
fail "should not parse"
rescue ArgumentError => e
expect(e.message).to eq("invalid time unit: \"18xyz\"")
end
end
it "fails to coerce invalid value types" do
begin
a = TimeValue.from_value(32)
fail "should not parse"
rescue ArgumentError => e
expect(e.message).to eq("value is not a string: 32 [Fixnum]")
end
end
end
end
end

View file

@ -0,0 +1,15 @@
---
services:
- logstash
config: |-
input {
generator {
count => 4
}
}
filter {
sleep { time => 1 every => 2 }
}
output {
null {}
}

View file

@ -0,0 +1,46 @@
require_relative '../framework/fixture'
require_relative '../framework/settings'
require_relative '../services/logstash_service'
require_relative '../framework/helpers'
require "logstash/devutils/rspec/spec_helper"
require "yaml"
describe "Test Logstash Slowlog" do
before(:all) {
@fixture = Fixture.new(__FILE__)
# used in multiple LS tests
@ls = @fixture.get_service("logstash")
}
after(:all) {
@fixture.teardown
}
before(:each) {
# backup the application settings file -- logstash.yml
FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original")
}
after(:each) {
@ls.teardown
# restore the application settings file -- logstash.yml
FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file)
}
let(:temp_dir) { Stud::Temporary.directory("logstash-slowlog-test") }
let(:config) { @fixture.config("root") }
it "should write logs to a new dir" do
settings = {
"path.logs" => temp_dir,
"slowlog.threshold.warn" => "500ms"
}
IO.write(@ls.application_settings_file, settings.to_yaml)
@ls.spawn_logstash("-e", config)
@ls.wait_for_logstash
sleep 1 until @ls.exited?
slowlog_file = "#{temp_dir}/logstash-slowlog-plain.log"
expect(File.exists?(slowlog_file)).to be true
expect(IO.read(slowlog_file).split("\n").size).to eq(2)
end
end