1.5 pipeline performance regressions fixes

rework output function init

removed double logger.debug

use event constant

fix LogStash::SHUTDOWN to actually be the shutdown signal event push in the pipeline

fix outputs conditionals handling

code gen methods not lambdas, remove intermediate arrays, base multi_filter to simplify code gen

remove block from filter_func

force_encoding at compile time

commented multi_filter method

typo

DRY & cleanups

optimizations: string constants, timestamp initialization, useless logger instance

specs for signal events

Filters::Base interface and multi_filter method

closes #2870

Fixes #2869
This commit is contained in:
Colin Surprenant 2015-03-19 14:46:06 -07:00 committed by Jordan Sissel
parent 4a3c9e7be5
commit dbe4154aa6
7 changed files with 93 additions and 56 deletions

View file

@ -81,16 +81,15 @@ module LogStash; module Config; module AST
["filter", "output"].each do |type|
# defines @filter_func and @output_func
definitions << "@#{type}_func = lambda do |event, &block|"
definitions << " events = [event]"
definitions << "def #{type}_func(event)"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"
sections.select { |s| s.plugin_type.text_value == type }.each do |s|
definitions << s.compile.split("\n", -1).map { |e| " #{e}" }
end
if type == "filter"
definitions << " events.flatten.each{|e| block.call(e) }"
end
definitions << " events" if type == "filter"
definitions << "end"
end
@ -211,13 +210,7 @@ module LogStash; module Config; module AST
return "start_input(#{variable_name})"
when "filter"
return <<-CODE
events = events.flat_map do |event|
next [] if event.cancelled?
new_events = []
#{variable_name}.filter(event){|new_event| new_events << new_event}
event.cancelled? ? new_events : new_events.unshift(event)
end
events = #{variable_name}.multi_filter(events)
CODE
when "output"
return "#{variable_name}.handle(event)\n"
@ -287,7 +280,7 @@ module LogStash; module Config; module AST
module Unicode
def self.wrap(text)
return "(" + text.inspect + ".force_encoding(Encoding::UTF_8)" + ")"
return "(" + text.force_encoding(Encoding::UTF_8).inspect + ")"
end
end
@ -364,12 +357,8 @@ module LogStash; module Config; module AST
# at the end, events is returned to handle the case where no branch match and no branch code is executed
# so we must make sure to return the current event.
return <<-CODE
events = events.flat_map do |event|
events = [event]
#{super}
end
events
<<-CODE
#{super}
end
CODE
end

View file

@ -16,6 +16,13 @@ require "logstash/json"
class LogStash::ShutdownEvent; end
class LogStash::FlushEvent; end
module LogStash
FLUSH = LogStash::FlushEvent.new
# LogStash::SHUTDOWN is used by plugins
SHUTDOWN = LogStash::ShutdownEvent.new
end
# the logstash event object.
#
# An event is simply a tuple of (timestamp, data).
@ -48,25 +55,26 @@ class LogStash::Event
TIMESTAMP_FAILURE_TAG = "_timestampparsefailure"
TIMESTAMP_FAILURE_FIELD = "_@timestamp"
METADATA = "@metadata".freeze
METADATA_BRACKETS = "[#{METADATA}]".freeze
# Floats outside of these upper and lower bounds are forcibly converted
# to scientific notation by Float#to_s
MIN_FLOAT_BEFORE_SCI_NOT = 0.0001
MAX_FLOAT_BEFORE_SCI_NOT = 1000000000000000.0
LOGGER = Cabin::Channel.get(LogStash)
public
def initialize(data = {})
@logger = Cabin::Channel.get(LogStash)
@cancelled = false
@data = data
@accessors = LogStash::Util::Accessors.new(data)
@data[VERSION] ||= VERSION_ONE
@data[TIMESTAMP] = init_timestamp(@data[TIMESTAMP])
ts = @data[TIMESTAMP]
@data[TIMESTAMP] = ts ? init_timestamp(ts) : LogStash::Timestamp.now
@metadata = if @data.include?("@metadata")
@data.delete("@metadata")
else
{}
end
@metadata = @data.delete(METADATA) || {}
@metadata_accessors = LogStash::Util::Accessors.new(@metadata)
end # def initialize
@ -113,9 +121,6 @@ class LogStash::Event
raise DeprecatedMethod
end # def unix_timestamp
# field-related access
METADATA = "@metadata".freeze
METADATA_BRACKETS = "[#{METADATA}]".freeze
public
def [](fieldref)
if fieldref.start_with?(METADATA_BRACKETS)
@ -267,12 +272,12 @@ class LogStash::Event
def init_timestamp(o)
begin
timestamp = o ? LogStash::Timestamp.coerce(o) : LogStash::Timestamp.now
timestamp = LogStash::Timestamp.coerce(o)
return timestamp if timestamp
@logger.warn("Unrecognized #{TIMESTAMP} value, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD}field", :value => o.inspect)
LOGGER.warn("Unrecognized #{TIMESTAMP} value, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD}field", :value => o.inspect)
rescue LogStash::TimestampParserError => e
@logger.warn("Error parsing #{TIMESTAMP} string, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD} field", :value => o.inspect, :exception => e.message)
LOGGER.warn("Error parsing #{TIMESTAMP} string, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD} field", :value => o.inspect, :exception => e.message)
end
@data["tags"] ||= []
@ -287,7 +292,7 @@ class LogStash::Event
if @metadata.nil?
to_hash
else
to_hash.merge("@metadata" => @metadata)
to_hash.merge(METADATA => @metadata)
end
end

View file

@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/event"
require "logstash/logging"
require "logstash/plugin"
require "logstash/config/mixin"
@ -145,6 +146,24 @@ class LogStash::Filters::Base < LogStash::Plugin
raise "#{self.class}#filter must be overidden"
end # def filter
# in 1.5.0 multi_filter is meant to be used in the generated filter function in LogStash::Config::AST::Plugin only
# and is temporary until we refactor the filter method interface to accept events list and return events list,
# just list in multi_filter see https://github.com/elastic/logstash/issues/2872.
# refactoring the filter method will mean updating all plugins which we want to avoid doing for 1.5.0.
#
# @param events [Array<LogStash::Event] list of events to filter
# @return [Array<LogStash::Event] filtered events and any new events generated by the filter
public
def multi_filter(events)
result = []
events.each do |event|
result << event
filter(event){|new_event| result << new_event}
end
result
end
public
def execute(event, &block)
filter(event, &block)

View file

@ -12,6 +12,4 @@ module LogStash
module Util; end
module PluginMixins; end
module PluginManager; end
SHUTDOWN = :shutdown
end # module LogStash

View file

@ -11,8 +11,6 @@ require "logstash/outputs/base"
class LogStash::Pipeline
FLUSH_EVENT = LogStash::FlushEvent.new
def initialize(configstr)
@logger = Cabin::Channel.get(LogStash)
grammar = LogStashConfigParser.new
@ -113,7 +111,7 @@ class LogStash::Pipeline
def shutdown_filters
@flusher_lock.synchronize { @flusher_thread.kill }
@input_to_filter.push(LogStash::ShutdownEvent.new)
@input_to_filter.push(LogStash::SHUTDOWN)
end
def wait_filters
@ -122,7 +120,7 @@ class LogStash::Pipeline
def shutdown_outputs
# nothing, filters will do this
@filter_to_output.push(LogStash::ShutdownEvent.new)
@filter_to_output.push(LogStash::SHUTDOWN)
end
def wait_outputs
@ -154,7 +152,7 @@ class LogStash::Pipeline
end
@flusher_lock = Mutex.new
@flusher_thread = Thread.new { Stud.interval(5) { @flusher_lock.synchronize { @input_to_filter.push(FLUSH_EVENT) } } }
@flusher_thread = Thread.new { Stud.interval(5) { @flusher_lock.synchronize { @input_to_filter.push(LogStash::FLUSH) } } }
end
def start_outputs
@ -203,11 +201,7 @@ class LogStash::Pipeline
case event
when LogStash::Event
# use events array to guarantee ordering of origin vs created events
# where created events are emitted by filters like split or metrics
events = []
filter(event) { |newevent| events << newevent }
events.each { |event| @filter_to_output.push(event) }
filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
# don't have to deal with thread safety implementing the flush method
@ -231,8 +225,8 @@ class LogStash::Pipeline
while true
event = @filter_to_output.pop
break if event.is_a?(LogStash::ShutdownEvent)
output(event)
break if event == LogStash::SHUTDOWN
output_func(event)
end # while true
@outputs.each do |output|
@ -271,12 +265,9 @@ class LogStash::Pipeline
return klass.new(*args)
end
# for backward compatibility in devutils for the rspec helpers
def filter(event, &block)
@filter_func.call(event, &block)
end
def output(event)
@output_func.call(event)
filter_func(event).each { |e| block.call(e) }
end
# perform filters flush and yeild flushed event to the passed block

View file

@ -289,8 +289,7 @@ describe LogStash::Event do
it "should tag and warn for invalid value" do
ts = LogStash::Timestamp.now
expect(LogStash::Timestamp).to receive(:now).twice.and_return(ts)
expect(Cabin::Channel).to receive(:get).twice.and_return(logger)
expect(logger).to receive(:warn).twice
expect(LogStash::Event::LOGGER).to receive(:warn).twice
event = LogStash::Event.new("@timestamp" => :foo)
expect(event.timestamp.to_i).to eq(ts.to_i)
@ -306,8 +305,7 @@ describe LogStash::Event do
it "should tag and warn for invalid string format" do
ts = LogStash::Timestamp.now
expect(LogStash::Timestamp).to receive(:now).and_return(ts)
expect(Cabin::Channel).to receive(:get).and_return(logger)
expect(logger).to receive(:warn)
expect(LogStash::Event::LOGGER).to receive(:warn)
event = LogStash::Event.new("@timestamp" => "foo")
expect(event.timestamp.to_i).to eq(ts.to_i)
@ -400,7 +398,15 @@ describe LogStash::Event do
expect(subject["foo"]).to eq("bar")
end
end
end
context "signal events" do
it "should define the shutdown event" do
# the SHUTDOWN and FLUSH constants are part of the plugin API contract
# if they are changed, all plugins must be updated
expect(LogStash::SHUTDOWN).to be_a(LogStash::ShutdownEvent)
expect(LogStash::FLUSH).to be_a(LogStash::FlushEvent)
end
end
end

View file

@ -14,6 +14,35 @@ class LogStash::Filters::NOOP < LogStash::Filters::Base
end
end
describe LogStash::Filters::Base do
subject {LogStash::Filters::Base.new({})}
it "should provide method interfaces to override" do
expect{subject.register}.to raise_error(RuntimeError)
expect{subject.filter(:foo)}.to raise_error(RuntimeError)
end
it "should provide class public API" do
[:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method|
expect(subject).to respond_to(method)
end
end
it "should multi_filter without new events" do
allow(subject).to receive(:filter) do |event, &block|
nil
end
expect(subject.multi_filter([:foo])).to eq([:foo])
end
it "should multi_filter with new events" do
allow(subject).to receive(:filter) do |event, &block|
block.call(:bar)
end
expect(subject.multi_filter([:foo])).to eq([:foo, :bar])
end
end
describe LogStash::Filters::NOOP do
describe "adding multiple values to one field" do