PERFORMANCE: LIR Pipeline Execution

Fixes #8357
This commit is contained in:
Armin 2017-10-05 22:19:08 +02:00 committed by Armin Braun
parent 116ed1d540
commit 982c44bc1f
33 changed files with 4129 additions and 167 deletions

View file

@ -101,12 +101,15 @@ module LogStashCompilerLSCLGrammar; module LogStash; module Compiler; module LSC
else
[k,v]
end
}.reduce({}) do |hash,kv|
k,v = kv
hash[k] = v
}.reduce({}) do |hash, kv|
k, v = kv
if hash[k].nil?
hash[k] = v
else
hash[k] += v
end
hash
end
end
end
@ -327,8 +330,12 @@ module LogStashCompilerLSCLGrammar; module LogStash; module Compiler; module LSC
case op
when :and
return jdsl.eAnd(left, right);
when :nand
return jdsl.eNand(left, right);
when :or
return jdsl.eOr(left, right);
when :xor
return jdsl.eXor(left, right);
else
raise "Unknown op #{jop}"
end
@ -511,8 +518,12 @@ module LogStashCompilerLSCLGrammar; module LogStash; module Compiler; module LSC
case self.text_value
when "and"
AND_METHOD
when "nand"
NAND_METHOD
when "or"
OR_METHOD
when "xor"
XOR_METHOD
else
raise "Unknown operator #{self.text_value}"
end

View file

@ -50,6 +50,8 @@ module LogStashCompilerLSCLGrammar; module LogStash; module Compiler; module LSC
end
AND_METHOD = jdsl.method(:eAnd)
NAND_METHOD = jdsl.method(:eNand)
OR_METHOD = jdsl.method(:eOr)
XOR_METHOD = jdsl.method(:eXor)
end
end; end; end; end; end
end; end; end; end; end

View file

@ -40,6 +40,7 @@ module LogStash
Setting::PositiveInteger.new("pipeline.batch.size", 125),
Setting::Numeric.new("pipeline.batch.delay", 5), # in milliseconds
Setting::Boolean.new("pipeline.unsafe_shutdown", false),
Setting::Boolean.new("pipeline.java_execution", false),
Setting::Boolean.new("pipeline.reloadable", true),
Setting.new("path.plugins", Array, []),
Setting::NullableString.new("interactive", nil, false),

View file

@ -0,0 +1,75 @@
# encoding: utf-8
#
module LogStash
class JavaFilterDelegator
include org.logstash.config.ir.compiler.RubyIntegration::Filter
extend Forwardable
DELEGATED_METHODS = [
:register,
:close,
:threadsafe?,
:do_close,
:do_stop,
:periodic_flush,
:reloadable?
]
def_delegators :@filter, *DELEGATED_METHODS
attr_reader :id
def initialize(logger, klass, metric, execution_context, plugin_args)
@logger = logger
@klass = klass
@id = plugin_args["id"]
@filter = klass.new(plugin_args)
# Scope the metrics to the plugin
namespaced_metric = metric.namespace(@id.to_sym)
@filter.metric = namespaced_metric
@filter.execution_context = execution_context
@metric_events = namespaced_metric.namespace(:events)
@metric_events_in = @metric_events.counter(:in)
@metric_events_out = @metric_events.counter(:out)
@metric_events_time = @metric_events.counter(:duration_in_millis)
namespaced_metric.gauge(:name, config_name)
# Not all the filters will do bufferings
@flushes = @filter.respond_to?(:flush)
end
def config_name
@klass.config_name
end
def multi_filter(events)
@metric_events_in.increment(events.size)
start_time = java.lang.System.nano_time
new_events = @filter.multi_filter(events)
@metric_events_time.increment((java.lang.System.nano_time - start_time) / 1_000_000)
# There is no guarantee in the context of filter
# that EVENTS_IN == EVENTS_OUT, see the aggregates and
# the split filter
c = new_events.count { |event| !event.cancelled? }
@metric_events_out.increment(c) if c > 0
new_events
end
def has_flush
@flushes
end
def flush(options = {})
# we also need to trace the number of events
# coming from a specific filters.
new_events = @filter.flush(options)
# Filter plugins that does buffering or spooling of events like the
# `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
@metric_events_out.increment(new_events.size) if new_events && new_events.size > 0
new_events
end
end
end

View file

@ -0,0 +1,744 @@
# encoding: utf-8
require "thread"
require "stud/interval"
require "concurrent"
require "logstash/namespace"
require "logstash/errors"
require "logstash-core/logstash-core"
require "logstash/event"
require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/shutdown_watcher"
require "logstash/pipeline_reporter"
require "logstash/instrument/metric"
require "logstash/instrument/namespaced_metric"
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector"
require "logstash/instrument/wrapped_write_client"
require "logstash/util/dead_letter_queue_manager"
require "logstash/output_delegator"
require "logstash/java_filter_delegator"
require "logstash/queue_factory"
require "logstash/compiler"
require "logstash/execution_context"
require "securerandom"
java_import org.logstash.common.DeadLetterQueueFactory
java_import org.logstash.common.SourceWithMetadata
java_import org.logstash.common.io.DeadLetterQueueWriter
java_import org.logstash.config.ir.CompiledPipeline
module LogStash; class JavaBasePipeline
include org.logstash.config.ir.compiler.RubyIntegration::Pipeline
include LogStash::Util::Loggable
attr_reader :settings, :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id, :lir, :execution_context, :ephemeral_id
attr_reader :pipeline_config
def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
@logger = self.logger
@mutex = Mutex.new
@ephemeral_id = SecureRandom.uuid
@pipeline_config = pipeline_config
@config_str = pipeline_config.config_string
@settings = pipeline_config.settings
@config_hash = Digest::SHA1.hexdigest(@config_str)
@lir = compile_lir
# Every time #plugin is invoked this is incremented to give each plugin
# a unique id when auto-generating plugin ids
@plugin_counter ||= 0
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
# A list of plugins indexed by id
@plugins_by_id = {}
@agent = agent
@dlq_writer = dlq_writer
@lir_execution = CompiledPipeline.new(@lir, self)
if settings.get_value("config.debug") && @logger.debug?
@logger.debug("Compiled pipeline code", default_logging_keys(:code => @lir.get_graph.to_string))
end
@inputs = @lir_execution.inputs
@filters = @lir_execution.filters
@outputs = @lir_execution.outputs
end
def dlq_writer
if settings.get_value("dead_letter_queue.enable")
@dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes"))
else
@dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
end
end
def close_dlq_writer
@dlq_writer.close
if settings.get_value("dead_letter_queue.enable")
DeadLetterQueueFactory.release(pipeline_id)
end
end
def compile_lir
sources_with_metadata = [
SourceWithMetadata.new("str", "pipeline", 0, 0, self.config_str)
]
LogStash::Compiler.compile_sources(sources_with_metadata, @settings)
end
def buildOutput(name, line, column, *args)
plugin("output", name, line, column, *args)
end
def buildFilter(name, line, column, *args)
plugin("filter", name, line, column, *args)
end
def buildInput(name, line, column, *args)
plugin("input", name, line, column, *args)
end
def buildCodec(name, *args)
plugin("codec", name, 0, 0, *args)
end
def plugin(plugin_type, name, line, column, *args)
@plugin_counter += 1
# Collapse the array of arguments into a single merged hash
args = args.reduce({}, &:merge)
if plugin_type == "codec"
id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here
else
# Pull the ID from LIR to keep IDs consistent between the two representations
id = lir.graph.vertices.filter do |v|
v.source_with_metadata &&
v.source_with_metadata.line == line &&
v.source_with_metadata.column == column
end.findFirst.get.id
end
args["id"] = id # some code pulls the id out of the args
if !id
raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}"
end
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
@plugins_by_id[id] = true
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
metric = @metric || Instrument::NullMetric.new
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
# Scope plugins of type 'input' to 'inputs'
type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)
klass = Plugin.lookup(plugin_type, name)
execution_context = ExecutionContext.new(self, @agent, id, klass.config_name, @dlq_writer)
if plugin_type == "output"
OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args)
elsif plugin_type == "filter"
JavaFilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args)
else # input
input_plugin = klass.new(args)
scoped_metric = type_scoped_metric.namespace(id.to_sym)
scoped_metric.gauge(:name, input_plugin.config_name)
input_plugin.metric = scoped_metric
input_plugin.execution_context = execution_context
input_plugin
end
end
def reloadable?
configured_as_reloadable? && reloadable_plugins?
end
def configured_as_reloadable?
settings.get("pipeline.reloadable")
end
def reloadable_plugins?
non_reloadable_plugins.empty?
end
def non_reloadable_plugins
(inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
end
private
def default_logging_keys(other_keys = {})
{ :pipeline_id => pipeline_id }.merge(other_keys)
end
end; end
module LogStash; class JavaPipeline < JavaBasePipeline
attr_reader \
:worker_threads,
:events_consumed,
:events_filtered,
:reporter,
:started_at,
:thread,
:settings,
:metric,
:filter_queue_client,
:input_queue_client,
:queue
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
@settings = pipeline_config.settings
# This needs to be configured before we call super which will evaluate the code to make
# sure the metric instance is correctly send to the plugins to make the namespace scoping work
@metric = if namespaced_metric
settings.get("metric.collect") ? namespaced_metric : Instrument::NullMetric.new(namespaced_metric.collector)
else
Instrument::NullMetric.new
end
@ephemeral_id = SecureRandom.uuid
@settings = settings
@reporter = PipelineReporter.new(@logger, self)
@worker_threads = []
super
begin
@queue = LogStash::QueueFactory.create(settings)
rescue => e
@logger.error("Logstash failed to create queue", default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
raise e
end
@input_queue_client = @queue.write_client
@filter_queue_client = @queue.read_client
@signal_queue = java.util.concurrent.LinkedBlockingQueue.new
# Note that @inflight_batches as a central mechanism for tracking inflight
# batches will fail if we have multiple read clients here.
@filter_queue_client.set_events_metric(metric.namespace([:stats, :events]))
@filter_queue_client.set_pipeline_metric(
metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
)
@drain_queue = @settings.get_value("queue.drain")
@events_filtered = Concurrent::AtomicFixnum.new(0)
@events_consumed = Concurrent::AtomicFixnum.new(0)
@input_threads = []
# @ready requires thread safety since it is typically polled from outside the pipeline thread
@ready = Concurrent::AtomicBoolean.new(false)
@running = Concurrent::AtomicBoolean.new(false)
@flushing = Concurrent::AtomicReference.new(false)
@outputs_registered = Concurrent::AtomicBoolean.new(false)
@finished_execution = Concurrent::AtomicBoolean.new(false)
end # def initialize
def ready?
@ready.value
end
def safe_pipeline_worker_count
default = @settings.get_default("pipeline.workers")
pipeline_workers = @settings.get("pipeline.workers") #override from args "-w 8" or config
safe_filters, unsafe_filters = @filters.partition(&:threadsafe?)
plugins = unsafe_filters.collect { |f| f.config_name }
return pipeline_workers if unsafe_filters.empty?
if @settings.set?("pipeline.workers")
if pipeline_workers > 1
@logger.warn("Warning: Manual override - there are filters that might not work with multiple worker threads", default_logging_keys(:worker_threads => pipeline_workers, :filters => plugins))
end
else
# user did not specify a worker thread count
# warn if the default is multiple
if default > 1
@logger.warn("Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads",
default_logging_keys(:count_was => default, :filters => plugins))
return 1 # can't allow the default value to propagate if there are unsafe filters
end
end
pipeline_workers
end
def filters?
@filters.any?
end
def start
# Since we start lets assume that the metric namespace is cleared
# this is useful in the context of pipeline reloading
collect_stats
collect_dlq_stats
@logger.debug("Starting pipeline", default_logging_keys)
@finished_execution.make_false
@thread = Thread.new do
begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
run
@finished_execution.make_true
rescue => e
close
logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
end
end
status = wait_until_started
if status
logger.debug("Pipeline started successfully", default_logging_keys(:pipeline_id => pipeline_id))
end
status
end
def wait_until_started
while true do
# This should be changed with an appropriate FSM
# It's an edge case, if we have a pipeline with
# a generator { count => 1 } its possible that `Thread#alive?` doesn't return true
# because the execution of the thread was successful and complete
if @finished_execution.true?
return true
elsif thread.nil? || !thread.alive?
return false
elsif running?
return true
else
sleep 0.01
end
end
end
def run
@started_at = Time.now
@thread = Thread.current
Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
start_workers
@logger.info("Pipeline started", "pipeline.id" => @pipeline_id)
# Block until all inputs have stopped
# Generally this happens if SIGINT is sent and `shutdown` is called from an external thread
transition_to_running
start_flusher # Launches a non-blocking thread for flush events
wait_inputs
transition_to_stopped
@logger.debug("Input plugins stopped! Will shutdown filter/output workers.", default_logging_keys)
shutdown_flusher
shutdown_workers
close
@logger.debug("Pipeline has been shutdown", default_logging_keys)
# exit code
return 0
end # def run
def close
@filter_queue_client.close
@queue.close
close_dlq_writer
end
def transition_to_running
@running.make_true
end
def transition_to_stopped
@running.make_false
end
def running?
@running.true?
end
def stopped?
@running.false?
end
def system?
settings.get_value("pipeline.system")
end
# register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins
# @param plugins [Array[Plugin]] the list of plugins to register
def register_plugins(plugins)
registered = []
plugins.each { |plugin| registered << @lir_execution.registerPlugin(plugin) }
rescue => e
registered.each(&:do_close)
raise e
end
def start_workers
@worker_threads.clear # In case we're restarting the pipeline
@outputs_registered.make_false
begin
maybe_setup_out_plugins
pipeline_workers = safe_pipeline_worker_count
batch_size = @settings.get("pipeline.batch.size")
batch_delay = @settings.get("pipeline.batch.delay")
max_inflight = batch_size * pipeline_workers
config_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :config])
config_metric.gauge(:workers, pipeline_workers)
config_metric.gauge(:batch_size, batch_size)
config_metric.gauge(:batch_delay, batch_delay)
config_metric.gauge(:config_reload_automatic, @settings.get("config.reload.automatic"))
config_metric.gauge(:config_reload_interval, @settings.get("config.reload.interval"))
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
config_metric.gauge(:dead_letter_queue_path, @dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
@logger.info("Starting pipeline", default_logging_keys(
"pipeline.workers" => pipeline_workers,
"pipeline.batch.size" => batch_size,
"pipeline.batch.delay" => batch_delay,
"pipeline.max_inflight" => max_inflight))
if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
end
@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
pipeline_workers.times do |t|
batched_execution = @lir_execution.buildExecution
thread = Thread.new(self, batched_execution) do |_pipeline, _batched_execution|
_pipeline.worker_loop(_batched_execution)
end
thread.name="[#{pipeline_id}]>worker#{t}"
@worker_threads << thread
end
# inputs should be started last, after all workers
begin
start_inputs
rescue => e
# if there is any exception in starting inputs, make sure we shutdown workers.
# exception will already by logged in start_inputs
shutdown_workers
raise e
end
ensure
# it is important to guarantee @ready to be true after the startup sequence has been completed
# to potentially unblock the shutdown method which may be waiting on @ready to proceed
@ready.make_true
end
end
def dlq_enabled?
@settings.get("dead_letter_queue.enable")
end
# Main body of what a worker thread does
# Repeatedly takes batches off the queue, filters, then outputs them
def worker_loop(batched_execution)
shutdown_requested = false
while true
signal = @signal_queue.poll || NO_SIGNAL
shutdown_requested |= signal.shutdown? # latch on shutdown signal
batch = @filter_queue_client.read_batch # metrics are started in read_batch
@events_consumed.increment(batch.size)
execute_batch(batched_execution, batch, signal.flush?)
@filter_queue_client.close_batch(batch)
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
break if (shutdown_requested && !draining_queue?)
end
# we are shutting down, queue is drained if it was required, now perform a final flush.
# for this we need to create a new empty batch to contain the final flushed events
batch = @filter_queue_client.new_batch
@filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
batched_execution.compute(batch, true, true)
@filter_queue_client.close_batch(batch)
end
def wait_inputs
@input_threads.each(&:join)
end
def start_inputs
moreinputs = []
@inputs.each do |input|
if input.threadable && input.threads > 1
(input.threads - 1).times do |i|
moreinputs << input.clone
end
end
end
@inputs += moreinputs
# first make sure we can register all input plugins
register_plugins(@inputs)
# then after all input plugins are successfully registered, start them
@inputs.each { |input| start_input(input) }
end
def start_input(plugin)
@input_threads << Thread.new { inputworker(plugin) }
end
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
begin
input_queue_client = wrapped_write_client(plugin)
plugin.run(input_queue_client)
rescue => e
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
default_logging_keys(:plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace))
return
end
# otherwise, report error and restart
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
default_logging_keys(
:plugin => plugin.inspect,
:error => e.message,
:exception => e.class,
:stacktrace => e.backtrace.join("\n"))))
# Assuming the failure that caused this exception is transient,
# let's sleep for a bit and execute #run again
sleep(1)
retry
ensure
plugin.do_close
end
end # def inputworker
# initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread
# @param before_stop [Proc] code block called before performing stop operation on input plugins
def shutdown(&before_stop)
# shutdown can only start once the pipeline has completed its startup.
# avoid potential race condition between the startup sequence and this
# shutdown method which can be called from another thread at any time
sleep(0.1) while !ready?
# TODO: should we also check against calling shutdown multiple times concurrently?
before_stop.call if block_given?
stop_inputs
# We make this call blocking, so we know for sure when the method return the shtudown is
# stopped
wait_for_workers
clear_pipeline_metrics
@logger.info("Pipeline terminated", "pipeline.id" => @pipeline_id)
end # def shutdown
def wait_for_workers
@logger.debug("Closing inputs", default_logging_keys)
@worker_threads.map(&:join)
@logger.debug("Worker closed", default_logging_keys)
end
def stop_inputs
@logger.debug("Closing inputs", default_logging_keys)
@inputs.each(&:do_stop)
@logger.debug("Closed inputs", default_logging_keys)
end
# After `shutdown` is called from an external thread this is called from the main thread to
# tell the worker threads to stop and then block until they've fully stopped
# This also stops all filter and output plugins
def shutdown_workers
# Each worker thread will receive this exactly once!
@worker_threads.each do |t|
@logger.debug("Pushing shutdown", default_logging_keys(:thread => t.inspect))
@signal_queue.put(SHUTDOWN)
end
@worker_threads.each do |t|
@logger.debug("Shutdown waiting for worker thread" , default_logging_keys(:thread => t.inspect))
t.join
end
@filters.each(&:do_close)
@outputs.each(&:do_close)
end
# for backward compatibility in devutils for the rspec helpers, this method is not used
# anymore and just here to not break TestPipeline that inherits this class.
def filter(event, &block)
end
# for backward compatibility in devutils for the rspec helpers, this method is not used
# anymore and just here to not break TestPipeline that inherits this class.
def flush_filters(options = {}, &block)
end
def start_flusher
# Invariant to help detect improper initialization
raise "Attempted to start flusher on a stopped pipeline!" if stopped?
@flusher_thread = Thread.new do
while Stud.stoppable_sleep(5, 0.1) { stopped? }
flush
break if stopped?
end
end
end
def shutdown_flusher
@flusher_thread.join
end
def flush
if @flushing.compare_and_set(false, true)
@logger.debug? && @logger.debug("Pushing flush onto pipeline", default_logging_keys)
@signal_queue.put(FLUSH)
end
end
# Calculate the uptime in milliseconds
#
# @return [Fixnum] Uptime in milliseconds, 0 if the pipeline is not started
def uptime
return 0 if started_at.nil?
((Time.now.to_f - started_at.to_f) * 1000.0).to_i
end
def plugin_threads_info
input_threads = @input_threads.select {|t| t.alive? }
worker_threads = @worker_threads.select {|t| t.alive? }
(input_threads + worker_threads).map {|t| Util.thread_info(t) }
end
def stalling_threads_info
plugin_threads_info
.reject {|t| t["blocked_on"] } # known benign blocking statuses
.each {|t| t.delete("backtrace") }
.each {|t| t.delete("blocked_on") }
.each {|t| t.delete("status") }
end
def collect_dlq_stats
if dlq_enabled?
dlq_metric = @metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :dlq])
dlq_metric.gauge(:queue_size_in_bytes, @dlq_writer.get_current_queue_size)
end
end
def collect_stats
pipeline_metric = @metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :queue])
pipeline_metric.gauge(:type, settings.get("queue.type"))
if @queue.is_a?(LogStash::Util::WrappedAckedQueue) && @queue.queue.is_a?(LogStash::AckedQueue)
queue = @queue.queue
dir_path = queue.dir_path
file_store = Files.get_file_store(Paths.get(dir_path))
pipeline_metric.namespace([:capacity]).tap do |n|
n.gauge(:page_capacity_in_bytes, queue.page_capacity)
n.gauge(:max_queue_size_in_bytes, queue.max_size_in_bytes)
n.gauge(:max_unread_events, queue.max_unread_events)
n.gauge(:queue_size_in_bytes, queue.persisted_size_in_bytes)
end
pipeline_metric.namespace([:data]).tap do |n|
n.gauge(:free_space_in_bytes, file_store.get_unallocated_space)
n.gauge(:storage_type, file_store.type)
n.gauge(:path, dir_path)
end
pipeline_metric.gauge(:events, queue.unread_count)
end
end
def clear_pipeline_metrics
# TODO(ph): I think the metric should also proxy that call correctly to the collector
# this will simplify everything since the null metric would simply just do a noop
collector = @metric.collector
unless collector.nil?
# selectively reset metrics we don't wish to keep after reloading
# these include metrics about the plugins and number of processed events
# we want to keep other metrics like reload counts and error messages
collector.clear("stats/pipelines/#{pipeline_id}/plugins")
collector.clear("stats/pipelines/#{pipeline_id}/events")
end
end
# Sometimes we log stuff that will dump the pipeline which may contain
# sensitive information (like the raw syntax tree which can contain passwords)
# We want to hide most of what's in here
def inspect
{
:pipeline_id => @pipeline_id,
:settings => @settings.inspect,
:ready => @ready,
:running => @running,
:flushing => @flushing
}
end
private
def execute_batch(batched_execution, batch, flush)
batched_execution.compute(batch, flush, false)
@filter_queue_client.add_output_metrics(batch)
@filter_queue_client.add_filtered_metrics(batch)
@events_filtered.increment(batch.size)
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
# but if an exception is raised up to the worker thread they are considered
# fatal and logstash will not recover from this situation.
#
# Users need to check their configuration or see if there is a bug in the
# plugin.
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
raise e
end
def maybe_setup_out_plugins
if @outputs_registered.make_true
register_plugins(@outputs)
register_plugins(@filters)
end
end
def default_logging_keys(other_keys = {})
keys = super
keys[:thread] ||= thread.inspect if thread
keys
end
def draining_queue?
@drain_queue ? !@filter_queue_client.empty? : false
end
def wrapped_write_client(plugin)
#need to ensure that metrics are initialized one plugin at a time, else a race condition can exist.
@mutex.synchronize do
LogStash::Instrument::WrappedWriteClient.new(@input_queue_client, self, metric, plugin)
end
end
end; end

View file

@ -5,6 +5,7 @@ require "logstash/output_delegator_strategies/single"
require "logstash/output_delegator_strategies/legacy"
module LogStash class OutputDelegator
include org.logstash.config.ir.compiler.RubyIntegration::Output
attr_reader :metric, :metric_events, :strategy, :namespaced_metric, :metric_events, :id
def initialize(logger, output_class, metric, execution_context, strategy_registry, plugin_args)
@ -44,11 +45,12 @@ module LogStash class OutputDelegator
end
def multi_receive(events)
@in_counter.increment(events.length)
count = events.size
@in_counter.increment(count)
start_time = java.lang.System.nano_time
@strategy.multi_receive(events)
@time_metric.increment((java.lang.System.nano_time - start_time) / 1_000_000)
@out_counter.increment(events.length)
@out_counter.increment(count)
end
def do_close

View file

@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/pipeline_action/base"
require "logstash/pipeline"
require "logstash/java_pipeline"
require "logstash/converge_result"
require "logstash/util/loggable"
@ -32,8 +33,13 @@ module LogStash module PipelineAction
# The execute assume that the thread safety access of the pipeline
# is managed by the caller.
def execute(agent, pipelines)
pipeline = LogStash::Pipeline.new(@pipeline_config, @metric, agent)
pipeline =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
else
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
status = pipeline.start # block until the pipeline is correctly started or crashed
if status

View file

@ -27,7 +27,12 @@ module LogStash module PipelineAction
end
begin
pipeline_validator = LogStash::BasePipeline.new(@pipeline_config)
pipeline_validator =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaBasePipeline.new(@pipeline_config)
else
LogStash::BasePipeline.new(@pipeline_config)
end
rescue => e
return LogStash::ConvergeResult::FailedAction.from_exception(e)
end

View file

@ -14,6 +14,7 @@ module LogStash
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
"metric.collect",
"pipeline.java_execution",
"path.config",
"path.dead_letter_queue",
"path.queue",

View file

@ -93,6 +93,11 @@ class LogStash::Runner < Clamp::StrictCommand
:attribute_name => "pipeline.workers",
:default => LogStash::SETTINGS.get_default("pipeline.workers")
option ["--experimental-java-execution"], :flag,
I18n.t("logstash.runner.flag.experimental-java-execution"),
:attribute_name => "pipeline.java_execution",
:default => LogStash::SETTINGS.get_default("pipeline.java_execution")
option ["-b", "--pipeline.batch.size"], "SIZE",
I18n.t("logstash.runner.flag.pipeline-batch-size"),
:attribute_name => "pipeline.batch.size",

View file

@ -238,6 +238,7 @@ module LogStash; module Util
end
class ReadBatch
include org.logstash.config.ir.compiler.RubyIntegration::Batch
def initialize(queue, size, wait)
@queue = queue
@size = size

View file

@ -155,6 +155,7 @@ module LogStash; module Util
end
class ReadBatch
include org.logstash.config.ir.compiler.RubyIntegration::Batch
def initialize(queue, size, wait)
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
# @cancelled = Hash.new

View file

@ -264,6 +264,8 @@ en:
http_port: Web API http port
pipeline-workers: |+
Sets the number of pipeline workers to run.
experimental-java-execution: |+
(Experimental) Use new Java execution engine.
pipeline-batch-size: |+
Size of batches the pipeline is to work in.
pipeline-batch-delay: |+

View file

@ -1,7 +1,9 @@
# encoding: utf-8
require 'spec_helper'
require 'support/pipeline/pipeline_helpers'
module ConditionalFanciness
include PipelineHelpers
def description
return self.metadata[:description]
end
@ -76,21 +78,21 @@ describe "conditionals in filter" do
}
CONFIG
sample({"foo" => "bar"}) do
sample_one({"foo" => "bar"}) do
expect(subject.get("always")).to eq("awesome")
expect(subject.get("hello")).to eq("world")
expect(subject.get("fancy")).to be_nil
expect(subject.get("free")).to be_nil
end
sample({"notfoo" => "bar"}) do
sample_one({"notfoo" => "bar"}) do
expect(subject.get("always")).to eq("awesome")
expect(subject.get("hello")).to be_nil
expect(subject.get("fancy")).to be_nil
expect(subject.get("free")).to eq("hugs")
end
sample({"bar" => "baz"}) do
sample_one({"bar" => "baz"}) do
expect(subject.get("always")).to eq("awesome")
expect(subject.get("hello")).to be_nil
expect(subject.get("fancy")).to eq("pants")
@ -114,28 +116,28 @@ describe "conditionals in filter" do
}
CONFIG
sample("foo" => "bar", "nest" => 124) do
sample_one("foo" => "bar", "nest" => 124) do
expect(subject.get("always")).to be_nil
expect(subject.get("hello")).to be_nil
expect(subject.get("fancy")).to be_nil
expect(subject.get("free")).to be_nil
end
sample("foo" => "bar", "nest" => 123) do
sample_one("foo" => "bar", "nest" => 123) do
expect(subject.get("always")).to eq("awesome")
expect(subject.get("hello")).to eq("world")
expect(subject.get("fancy")).to be_nil
expect(subject.get("free")).to be_nil
end
sample("notfoo" => "bar", "nest" => 123) do
sample_one("notfoo" => "bar", "nest" => 123) do
expect(subject.get("always")).to eq("awesome")
expect(subject.get("hello")).to be_nil
expect(subject.get("fancy")).to be_nil
expect(subject.get("free")).to eq("hugs")
end
sample("bar" => "baz", "nest" => 123) do
sample_one("bar" => "baz", "nest" => 123) do
expect(subject.get("always")).to eq("awesome")
expect(subject.get("hello")).to be_nil
expect(subject.get("fancy")).to eq("pants")
@ -152,7 +154,7 @@ describe "conditionals in filter" do
}
CONFIG
sample("foo" => 123, "bar" => 123) do
sample_one("foo" => 123, "bar" => 123) do
expect(subject.get("tags") ).to include("woot")
end
end
@ -181,7 +183,7 @@ describe "conditionals in filter" do
}
CONFIG
sample("foo" => "foo", "foobar" => "foobar", "greeting" => "hello world") do
sample_one("foo" => "foo", "foobar" => "foobar", "greeting" => "hello world") do
expect(subject.get("tags")).to include("field in field")
expect(subject.get("tags")).to include("field in string")
expect(subject.get("tags")).to include("string in field")
@ -203,7 +205,7 @@ describe "conditionals in filter" do
}
CONFIG
sample("foo" => "foo", "somelist" => [ "one", "two" ], "foobar" => "foobar", "greeting" => "hello world", "tags" => [ "fancypantsy" ]) do
sample_one("foo" => "foo", "somelist" => [ "one", "two" ], "foobar" => "foobar", "greeting" => "hello world", "tags" => [ "fancypantsy" ]) do
# verify the original exists
expect(subject.get("tags")).to include("fancypantsy")
@ -218,94 +220,156 @@ describe "conditionals in filter" do
describe "operators" do
conditional "[message] == 'sample'" do
sample("sample") { expect(subject.get("tags") ).to include("success") }
sample("different") { expect(subject.get("tags") ).to include("failure") }
sample_one("sample") { expect(subject.get("tags") ).to include("success") }
sample_one("different") { expect(subject.get("tags") ).to include("failure") }
end
conditional "'sample' == [message]" do
sample_one("sample") {expect(subject.get("tags")).to include("success")}
sample_one("different") {expect(subject.get("tags")).to include("failure")}
end
conditional "'value' == 'value'" do
sample_one("sample") {expect(subject.get("tags")).to include("success")}
end
conditional "'value' == 'other'" do
sample_one("sample") {expect(subject.get("tags")).to include("failure")}
end
conditional "[message] != 'sample'" do
sample("sample") { expect(subject.get("tags") ).to include("failure") }
sample("different") { expect(subject.get("tags") ).to include("success") }
sample_one("sample") { expect(subject.get("tags") ).to include("failure") }
sample_one("different") { expect(subject.get("tags") ).to include("success") }
end
conditional "[message] < 'sample'" do
sample("apple") { expect(subject.get("tags") ).to include("success") }
sample("zebra") { expect(subject.get("tags") ).to include("failure") }
sample_one("apple") { expect(subject.get("tags") ).to include("success") }
sample_one("zebra") { expect(subject.get("tags") ).to include("failure") }
end
conditional "[message] > 'sample'" do
sample("zebra") { expect(subject.get("tags") ).to include("success") }
sample("apple") { expect(subject.get("tags") ).to include("failure") }
sample_one("zebra") { expect(subject.get("tags") ).to include("success") }
sample_one("apple") { expect(subject.get("tags") ).to include("failure") }
end
conditional "[message] <= 'sample'" do
sample("apple") { expect(subject.get("tags") ).to include("success") }
sample("zebra") { expect(subject.get("tags") ).to include("failure") }
sample("sample") { expect(subject.get("tags") ).to include("success") }
sample_one("apple") { expect(subject.get("tags") ).to include("success") }
sample_one("zebra") { expect(subject.get("tags") ).to include("failure") }
sample_one("sample") { expect(subject.get("tags") ).to include("success") }
end
conditional "[message] >= 'sample'" do
sample("zebra") { expect(subject.get("tags") ).to include("success") }
sample("sample") { expect(subject.get("tags") ).to include("success") }
sample("apple") { expect(subject.get("tags") ).to include("failure") }
sample_one("zebra") { expect(subject.get("tags") ).to include("success") }
sample_one("sample") { expect(subject.get("tags") ).to include("success") }
sample_one("apple") { expect(subject.get("tags") ).to include("failure") }
end
conditional "[message] == 5" do
sample_one("message" => 5) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 3) {expect(subject.get("tags")).to include("failure")}
end
conditional "5 == [message]" do
sample_one("message" => 5) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 3) {expect(subject.get("tags")).to include("failure")}
end
conditional "7 == 7" do
sample_one("message" => 7) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 3) {expect(subject.get("tags")).to include("success")}
end
conditional "5 == 7" do
sample_one("message" => 3) {expect(subject.get("tags")).to include("failure")}
sample_one("message" => 2) {expect(subject.get("tags")).to include("failure")}
end
conditional "[message] != 5" do
sample_one("message" => 5) {expect(subject.get("tags")).to include("failure")}
sample_one("message" => 3) {expect(subject.get("tags")).to include("success")}
end
conditional "[message] < 5" do
sample_one("message" => 3) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 5) {expect(subject.get("tags")).to include("failure")}
sample_one("message" => 9) {expect(subject.get("tags")).to include("failure")}
end
conditional "[message] > 5" do
sample_one("message" => 9) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 5) {expect(subject.get("tags")).to include("failure")}
sample_one("message" => 4) {expect(subject.get("tags")).to include("failure")}
end
conditional "[message] <= 5" do
sample_one("message" => 9) {expect(subject.get("tags")).to include("failure")}
sample_one("message" => 5) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 3) {expect(subject.get("tags")).to include("success")}
end
conditional "[message] >= 5" do
sample_one("message" => 5) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 7) {expect(subject.get("tags")).to include("success")}
sample_one("message" => 3) {expect(subject.get("tags")).to include("failure")}
end
conditional "[message] =~ /sample/" do
sample("apple") { expect(subject.get("tags") ).to include("failure") }
sample("sample") { expect(subject.get("tags") ).to include("success") }
sample("some sample") { expect(subject.get("tags") ).to include("success") }
sample_one("apple") { expect(subject.get("tags") ).to include("failure") }
sample_one("sample") { expect(subject.get("tags") ).to include("success") }
sample_one("some sample") { expect(subject.get("tags") ).to include("success") }
end
conditional "[message] !~ /sample/" do
sample("apple") { expect(subject.get("tags")).to include("success") }
sample("sample") { expect(subject.get("tags")).to include("failure") }
sample("some sample") { expect(subject.get("tags")).to include("failure") }
sample_one("apple") { expect(subject.get("tags")).to include("success") }
sample_one("sample") { expect(subject.get("tags")).to include("failure") }
sample_one("some sample") { expect(subject.get("tags")).to include("failure") }
end
end
describe "negated expressions" do
conditional "!([message] == 'sample')" do
sample("sample") { expect(subject.get("tags")).not_to include("success") }
sample("different") { expect(subject.get("tags")).not_to include("failure") }
sample_one("sample") { expect(subject.get("tags")).not_to include("success") }
sample_one("different") { expect(subject.get("tags")).not_to include("failure") }
end
conditional "!([message] != 'sample')" do
sample("sample") { expect(subject.get("tags")).not_to include("failure") }
sample("different") { expect(subject.get("tags")).not_to include("success") }
sample_one("sample") { expect(subject.get("tags")).not_to include("failure") }
sample_one("different") { expect(subject.get("tags")).not_to include("success") }
end
conditional "!([message] < 'sample')" do
sample("apple") { expect(subject.get("tags")).not_to include("success") }
sample("zebra") { expect(subject.get("tags")).not_to include("failure") }
sample_one("apple") { expect(subject.get("tags")).not_to include("success") }
sample_one("zebra") { expect(subject.get("tags")).not_to include("failure") }
end
conditional "!([message] > 'sample')" do
sample("zebra") { expect(subject.get("tags")).not_to include("success") }
sample("apple") { expect(subject.get("tags")).not_to include("failure") }
sample_one("zebra") { expect(subject.get("tags")).not_to include("success") }
sample_one("apple") { expect(subject.get("tags")).not_to include("failure") }
end
conditional "!([message] <= 'sample')" do
sample("apple") { expect(subject.get("tags")).not_to include("success") }
sample("zebra") { expect(subject.get("tags")).not_to include("failure") }
sample("sample") { expect(subject.get("tags")).not_to include("success")}
sample_one("apple") { expect(subject.get("tags")).not_to include("success") }
sample_one("zebra") { expect(subject.get("tags")).not_to include("failure") }
sample_one("sample") { expect(subject.get("tags")).not_to include("success")}
end
conditional "!([message] >= 'sample')" do
sample("zebra") { expect(subject.get("tags")).not_to include("success") }
sample("sample") { expect(subject.get("tags")).not_to include("success") }
sample("apple") { expect(subject.get("tags")).not_to include("failure") }
sample_one("zebra") { expect(subject.get("tags")).not_to include("success") }
sample_one("sample") { expect(subject.get("tags")).not_to include("success") }
sample_one("apple") { expect(subject.get("tags")).not_to include("failure") }
end
conditional "!([message] =~ /sample/)" do
sample("apple") { expect(subject.get("tags")).not_to include("failure") }
sample("sample") { expect(subject.get("tags")).not_to include("success") }
sample("some sample") { expect(subject.get("tags")).not_to include("success") }
sample_one("apple") { expect(subject.get("tags")).not_to include("failure") }
sample_one("sample") { expect(subject.get("tags")).not_to include("success") }
sample_one("some sample") { expect(subject.get("tags")).not_to include("success") }
end
conditional "!([message] !~ /sample/)" do
sample("apple") { expect(subject.get("tags")).not_to include("success") }
sample("sample") { expect(subject.get("tags")).not_to include("failure") }
sample("some sample") { expect(subject.get("tags")).not_to include("failure") }
sample_one("apple") { expect(subject.get("tags")).not_to include("success") }
sample_one("sample") { expect(subject.get("tags")).not_to include("failure") }
sample_one("some sample") { expect(subject.get("tags")).not_to include("failure") }
end
end
@ -313,66 +377,96 @@ describe "conditionals in filter" do
describe "value as an expression" do
# testing that a field has a value should be true.
conditional "[message]" do
sample("apple") { expect(subject.get("tags")).to include("success") }
sample("sample") { expect(subject.get("tags")).to include("success") }
sample("some sample") { expect(subject.get("tags")).to include("success") }
sample_one("apple") { expect(subject.get("tags")).to include("success") }
sample_one("sample") { expect(subject.get("tags")).to include("success") }
sample_one("some sample") { expect(subject.get("tags")).to include("success") }
end
# testing that a missing field has a value should be false.
conditional "[missing]" do
sample("apple") { expect(subject.get("tags")).to include("failure") }
sample("sample") { expect(subject.get("tags")).to include("failure") }
sample("some sample") { expect(subject.get("tags")).to include("failure") }
sample_one("apple") { expect(subject.get("tags")).to include("failure") }
sample_one("sample") { expect(subject.get("tags")).to include("failure") }
sample_one("some sample") { expect(subject.get("tags")).to include("failure") }
end
end
describe "logic operators" do
describe "and" do
conditional "[message] and [message]" do
sample("whatever") { expect(subject.get("tags")).to include("success") }
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "[message] and ![message]" do
sample("whatever") { expect(subject.get("tags")).to include("failure") }
sample_one("whatever") { expect(subject.get("tags")).to include("failure") }
end
conditional "![message] and [message]" do
sample("whatever") { expect(subject.get("tags")).to include("failure") }
sample_one("whatever") { expect(subject.get("tags")).to include("failure") }
end
conditional "![message] and ![message]" do
sample("whatever") { expect(subject.get("tags")).to include("failure") }
sample_one("whatever") { expect(subject.get("tags")).to include("failure") }
end
end
describe "nand" do
conditional "[message] nand [message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("failure") }
end
conditional "[message] nand ![message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "![message] nand [message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "![message] nand ![message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
end
describe "xor" do
conditional "[message] xor [message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("failure") }
end
conditional "[message] xor ![message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "![message] xor [message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "![message] xor ![message]" do
sample_one("whatever") { expect(subject.get("tags")).to include("failure") }
end
end
describe "or" do
conditional "[message] or [message]" do
sample("whatever") { expect(subject.get("tags")).to include("success") }
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "[message] or ![message]" do
sample("whatever") { expect(subject.get("tags")).to include("success") }
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "![message] or [message]" do
sample("whatever") { expect(subject.get("tags")).to include("success") }
sample_one("whatever") { expect(subject.get("tags")).to include("success") }
end
conditional "![message] or ![message]" do
sample("whatever") { expect(subject.get("tags")).to include("failure") }
sample_one("whatever") { expect(subject.get("tags")).to include("failure") }
end
end
end
describe "field references" do
conditional "[field with space]" do
sample("field with space" => "hurray") do
sample_one("field with space" => "hurray") do
expect(subject.get("tags")).to include("success")
end
end
conditional "[field with space] == 'hurray'" do
sample("field with space" => "hurray") do
sample_one("field with space" => "hurray") do
expect(subject.get("tags")).to include("success")
end
end
conditional "[nested field][reference with][some spaces] == 'hurray'" do
sample({"nested field" => { "reference with" => { "some spaces" => "hurray" } } }) do
sample_one({"nested field" => { "reference with" => { "some spaces" => "hurray" } } }) do
expect(subject.get("tags")).to include("success")
end
end
@ -394,15 +488,16 @@ describe "conditionals in filter" do
}
CONFIG
sample({"type" => "original"}) do
sample_one({"type" => "original"}) do
expect(subject).to be_an(Array)
expect(subject.length).to eq(2)
subject.sort! {|a, b| a.get("type") <=> b.get("type")}
expect(subject[0].get("type")).to eq("original")
expect(subject[0].get("cond1")).to eq("true")
expect(subject[0].get("cond2")).to eq(nil)
expect(subject[1].get("type")).to eq("original")
expect(subject[1].get("cond1")).to eq("true")
expect(subject[1].get("cond2")).to eq(nil)
expect(subject[1].get("type")).to eq("clone")
expect(subject[0].get("type")).to eq("clone")
# expect(subject[1].get("cond1")).to eq(nil)
# expect(subject[1].get("cond2")).to eq("true")
end
@ -424,18 +519,78 @@ describe "conditionals in filter" do
}
CONFIG
sample({"type" => "original"}) do
# puts subject.inspect
expect(subject[0].get("cond1")).to eq(nil)
sample_one({"type" => "original"}) do
expect(subject.length).to eq(3)
subject.sort! {|a, b| a.get("type") <=> b.get("type")}
expect(subject[0].get("type")).to eq("clone1")
expect(subject[0].get("cond1")).to eq("true")
expect(subject[0].get("cond2")).to eq(nil)
expect(subject[1].get("type")).to eq("clone1")
expect(subject[1].get("cond1")).to eq("true")
expect(subject[1].get("cond2")).to eq(nil)
expect(subject[1].get("type")).to eq("clone2")
expect(subject[1].get("cond1")).to eq(nil)
expect(subject[1].get("cond2")).to eq("true")
expect(subject[2].get("type")).to eq("clone2")
expect(subject[2].get("type")).to eq("original")
expect(subject[2].get("cond1")).to eq(nil)
expect(subject[2].get("cond2")).to eq("true")
expect(subject[2].get("cond2")).to eq(nil)
end
end
describe "complex case" do
config <<-CONFIG
filter {
if ("foo" in [tags]) {
mutate { id => addbar add_tag => bar }
if ("bar" in [tags]) {
mutate { id => addbaz add_tag => baz }
}
if ("baz" in [tags]) {
mutate { id => addbot add_tag => bot }
if ("bot" in [tags]) {
mutate { id => addbonk add_tag => bonk }
}
}
}
if ("bot" in [tags]) {
mutate { id => addwat add_tag => wat }
}
mutate { id => addprev add_tag => prev }
mutate { id => addfinal add_tag => final }
}
CONFIG
sample_one("tags" => ["bot"]) do
tags = subject.get("tags")
expect(tags[0]).to eq("bot")
expect(tags[1]).to eq("wat")
expect(tags[2]).to eq("prev")
expect(tags[3]).to eq("final")
end
sample_one("tags" => ["foo"]) do
tags = subject.get("tags")
expect(tags[0]).to eq("foo")
expect(tags[1]).to eq("bar")
expect(tags[2]).to eq("baz")
expect(tags[3]).to eq("bot")
expect(tags[4]).to eq("bonk")
expect(tags[5]).to eq("wat")
expect(tags[6]).to eq("prev")
expect(tags[7]).to eq("final")
end
sample_one("type" => "original") do
tags = subject.get("tags")
expect(tags[0]).to eq("prev")
expect(tags[1]).to eq("final")
end
end
end

View file

@ -1,6 +1,7 @@
# encoding: utf-8
require "spec_helper"
require "logstash/json"
require 'support/pipeline/pipeline_helpers'
# use a dummy NOOP filter to test Filters::Base
class LogStash::Filters::NOOP < LogStash::Filters::Base
@ -50,6 +51,7 @@ describe LogStash::Filters::Base do
end
describe LogStash::Filters::NOOP do
extend PipelineHelpers
describe "adding multiple values to one field" do
config <<-CONFIG
@ -61,7 +63,7 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample "example" do
sample_one("example") do
insist { subject.get("new_field") } == ["new_value", "new_value_2"]
end
end
@ -75,7 +77,7 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop") do
sample_one("type" => "noop") do
insist { subject.get("tags") } == ["test"]
end
end
@ -89,11 +91,11 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop") do
sample_one("type" => "noop") do
insist { subject.get("tags") } == ["test"]
end
sample("type" => "noop", "tags" => ["t1", "t2"]) do
sample_one("type" => "noop", "tags" => ["t1", "t2"]) do
insist { subject.get("tags") } == ["t1", "t2", "test"]
end
end
@ -107,11 +109,11 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop") do
sample_one("type" => "noop") do
insist { subject.get("tags") } == ["bar"]
end
sample("type" => "noop", "tags" => "foo") do
sample_one("type" => "noop", "tags" => "foo") do
insist { subject.get("tags") } == ["foo", "bar"]
end
end
@ -125,7 +127,7 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop", "tags" => "foo") do
sample_one("type" => "noop", "tags" => "foo") do
# this is completely weird but seems to be already expected in other specs
insist { subject.get("tags") } == ["foo", "foo"]
end
@ -140,19 +142,19 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop") do
sample_one("type" => "noop") do
insist { subject.get("tags") } == ["test"]
end
sample("type" => "noop", "tags" => ["t1"]) do
sample_one("type" => "noop", "tags" => ["t1"]) do
insist { subject.get("tags") } == ["t1", "test"]
end
sample("type" => "noop", "tags" => ["t1", "t2"]) do
sample_one("type" => "noop", "tags" => ["t1", "t2"]) do
insist { subject.get("tags") } == ["t1", "t2", "test"]
end
sample("type" => "noop", "tags" => ["t1", "t2", "t3"]) do
sample_one("type" => "noop", "tags" => ["t1", "t2", "t3"]) do
insist { subject.get("tags") } == ["t1", "t2", "t3", "test"]
end
end
@ -166,39 +168,39 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop", "tags" => "foo") do
sample_one("type" => "noop", "tags" => "foo") do
insist { subject.get("tags") } == ["foo"]
end
sample("type" => "noop", "tags" => "t2") do
sample_one("type" => "noop", "tags" => "t2") do
insist { subject.get("tags") } == []
end
sample("type" => "noop", "tags" => ["t2"]) do
sample_one("type" => "noop", "tags" => ["t2"]) do
insist { subject.get("tags") } == []
end
sample("type" => "noop", "tags" => ["t4"]) do
sample_one("type" => "noop", "tags" => ["t4"]) do
insist { subject.get("tags") } == ["t4"]
end
sample("type" => "noop", "tags" => ["t1", "t2", "t3"]) do
sample_one("type" => "noop", "tags" => ["t1", "t2", "t3"]) do
insist { subject.get("tags") } == ["t1"]
end
# also test from Json deserialized data to test the handling of native Java collections by JrJackson
# see https://github.com/elastic/logstash/issues/2261
sample(LogStash::Json.load("{\"type\":\"noop\", \"tags\":[\"t1\", \"t2\", \"t3\"]}")) do
sample_one(LogStash::Json.load("{\"type\":\"noop\", \"tags\":[\"t1\", \"t2\", \"t3\"]}")) do
insist { subject.get("tags") } == ["t1"]
end
sample("type" => "noop", "tags" => ["t1", "t2"]) do
sample_one("type" => "noop", "tags" => ["t1", "t2"]) do
insist { subject.get("tags") } == ["t1"]
end
# also test from Json deserialized data to test the handling of native Java collections by JrJackson
# see https://github.com/elastic/logstash/issues/2261
sample(LogStash::Json.load("{\"type\":\"noop\", \"tags\":[\"t1\", \"t2\"]}")) do
sample_one(LogStash::Json.load("{\"type\":\"noop\", \"tags\":[\"t1\", \"t2\"]}")) do
insist { subject.get("tags") } == ["t1"]
end
end
@ -212,13 +214,13 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop", "tags" => ["t1", "goaway", "t3"], "blackhole" => "goaway") do
sample_one("type" => "noop", "tags" => ["t1", "goaway", "t3"], "blackhole" => "goaway") do
insist { subject.get("tags") } == ["t1", "t3"]
end
# also test from Json deserialized data to test the handling of native Java collections by JrJackson
# see https://github.com/elastic/logstash/issues/2261
sample(LogStash::Json.load("{\"type\":\"noop\", \"tags\":[\"t1\", \"goaway\", \"t3\"], \"blackhole\":\"goaway\"}")) do
sample_one(LogStash::Json.load("{\"type\":\"noop\", \"tags\":[\"t1\", \"goaway\", \"t3\"], \"blackhole\":\"goaway\"}")) do
insist { subject.get("tags") } == ["t1", "t3"]
end
end
@ -232,17 +234,17 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop", "t4" => "four") do
sample_one("type" => "noop", "t4" => "four") do
insist { subject }.include?("t4")
end
sample("type" => "noop", "t1" => "one", "t2" => "two", "t3" => "three") do
sample_one("type" => "noop", "t1" => "one", "t2" => "two", "t3" => "three") do
insist { subject }.include?("t1")
reject { subject }.include?("t2")
reject { subject }.include?("t3")
end
sample("type" => "noop", "t1" => "one", "t2" => "two") do
sample_one("type" => "noop", "t1" => "one", "t2" => "two") do
insist { subject }.include?("t1")
reject { subject }.include?("t2")
end
@ -257,7 +259,7 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("tags" => "foo") do
sample_one("tags" => "foo") do
reject { subject }.include?("tags")
end
end
@ -271,7 +273,7 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop", "t1" => {"t2" => "two", "t3" => "three"}) do
sample_one("type" => "noop", "t1" => {"t2" => "two", "t3" => "three"}) do
insist { subject }.include?("t1")
reject { subject }.include?("[t1][t2]")
insist { subject }.include?("[t1][t3]")
@ -287,7 +289,7 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop", "t1" => ["t2", "t3"]) do
sample_one("type" => "noop", "t1" => ["t2", "t3"]) do
insist { subject }.include?("t1")
insist { subject.get("[t1][0]") } == "t3"
end
@ -302,7 +304,7 @@ describe LogStash::Filters::NOOP do
}
CONFIG
sample("type" => "noop", "blackhole" => "go", "go" => "away") do
sample_one("type" => "noop", "blackhole" => "go", "go" => "away") do
insist { subject }.include?("blackhole")
reject { subject }.include?("go")
end

View file

@ -0,0 +1,176 @@
# encoding: utf-8
require "spec_helper"
require "logstash/java_filter_delegator"
require "logstash/instrument/null_metric"
require "logstash/event"
require "logstash/execution_context"
require "support/shared_contexts"
describe LogStash::JavaFilterDelegator do
class MockGauge
def increment(_)
end
end
include_context "execution_context"
let(:logger) { double(:logger) }
let(:filter_id) { "my-filter" }
let(:config) do
{ "host" => "127.0.0.1", "id" => filter_id }
end
let(:collector) { [] }
let(:counter_in) { MockGauge.new }
let(:counter_out) { MockGauge.new }
let(:counter_time) { MockGauge.new }
let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(collector, :null) }
let(:events) { [LogStash::Event.new, LogStash::Event.new] }
before :each do
allow(pipeline).to receive(:id).and_return(pipeline_id)
allow(metric).to receive(:namespace).with(anything).and_return(metric)
allow(metric).to receive(:counter).with(:in).and_return(counter_in)
allow(metric).to receive(:counter).with(:out).and_return(counter_out)
allow(metric).to receive(:counter).with(:duration_in_millis).and_return(counter_time)
end
let(:plugin_klass) do
Class.new(LogStash::Filters::Base) do
config_name "super_plugin"
config :host, :validate => :string
def register; end
end
end
subject { described_class.new(logger, plugin_klass, metric, execution_context, config) }
it "create a plugin with the passed options" do
expect(plugin_klass).to receive(:new).with(config).and_return(plugin_klass.new(config))
described_class.new(logger, plugin_klass, metric, execution_context, config)
end
context "when the plugin support flush" do
let(:plugin_klass) do
Class.new(LogStash::Filters::Base) do
config_name "super_plugin"
config :host, :validate => :string
def register; end
def flush(options = {}); @events ; end
def filter(event)
@events ||= []
@events << event
event.cancel
end
end
end
it "defines a flush method" do
expect(subject.respond_to?(:flush)).to be_truthy
end
context "when the flush return events" do
it "increments the out" do
subject.multi_filter([LogStash::Event.new])
expect(counter_out).to receive(:increment).with(1)
subject.flush({})
end
end
context "when the flush doesn't return anything" do
it "doesnt increment the out" do
expect(metric).not_to receive(:increment)
subject.flush({})
end
end
context "when the filter buffer events" do
before do
allow(metric).to receive(:increment).with(anything, anything)
end
it "has incremented :in" do
expect(counter_in).to receive(:increment).with(events.size)
subject.multi_filter(events)
end
it "has not incremented :out" do
expect(counter_out).not_to receive(:increment).with(anything)
subject.multi_filter(events)
end
end
context "when the filter create more events" do
let(:plugin_klass) do
Class.new(LogStash::Filters::Base) do
config_name "super_plugin"
config :host, :validate => :string
def register; end
def flush(options = {}); @events ; end
# naive split filter implementation
def filter(event)
event.cancel
2.times { yield LogStash::Event.new }
end
end
end
before do
allow(metric).to receive(:increment).with(anything, anything)
end
it "increments the in/out of the metric" do
expect(counter_in).to receive(:increment).with(events.size)
expect(counter_out).to receive(:increment).with(events.size * 2)
subject.multi_filter(events)
end
end
end
context "when the plugin doesnt support flush" do
let(:plugin_klass) do
Class.new(LogStash::Filters::Base) do
config_name "super_plugin"
config :host, :validate => :string
def register; end
def filter(event)
event
end
end
end
before do
allow(metric).to receive(:increment).with(anything, anything)
end
it "doesnt define a flush method" do
expect(subject.has_flush).to be_falsey
end
it "increments the in/out of the metric" do
expect(counter_in).to receive(:increment).with(events.size)
expect(counter_out).to receive(:increment).with(events.size)
subject.multi_filter(events)
end
end
context "#config_name" do
it "proxy the config_name to the class method" do
expect(subject.config_name).to eq("super_plugin")
end
end
context "delegate methods to the original plugin" do
# I am not testing the behavior of these methods
# this is done in the plugin tests. I just want to make sure
# the proxy delegates the methods.
LogStash::JavaFilterDelegator::DELEGATED_METHODS.each do |method|
it "delegate method: `#{method}` to the filter" do
expect(subject.respond_to?(method))
end
end
end
end

View file

@ -0,0 +1,933 @@
# encoding: utf-8
require "spec_helper"
require "logstash/inputs/generator"
require "logstash/filters/drop"
require_relative "../support/mocks_classes"
require_relative "../support/helpers"
require_relative "../logstash/pipeline_reporter_spec" # for DummyOutput class
require 'support/pipeline/pipeline_helpers'
require "stud/try"
require 'timeout'
class DummyInput < LogStash::Inputs::Base
config_name "dummyinput"
milestone 2
def register
end
def run(queue)
end
def close
end
end
class DummyInputGenerator < LogStash::Inputs::Base
config_name "dummyinputgenerator"
milestone 2
def register
end
def run(queue)
queue << Logstash::Event.new while !stop?
end
def close
end
end
class DummyCodec < LogStash::Codecs::Base
config_name "dummycodec"
milestone 2
config :format, :validate => :string
def decode(data)
data
end
def encode(event)
event
end
def close
end
end
class DummyOutputMore < ::LogStash::Outputs::DummyOutput
config_name "dummyoutputmore"
end
class DummyFilter < LogStash::Filters::Base
config_name "dummyfilter"
milestone 2
def register() end
def filter(event) end
def threadsafe?() false; end
def close() end
end
class DummySafeFilter < LogStash::Filters::Base
config_name "dummysafefilter"
milestone 2
def register() end
def filter(event) end
def threadsafe?() true; end
def close() end
end
class DummyFlushingFilter < LogStash::Filters::Base
config_name "dummyflushingfilter"
milestone 2
def register() end
def filter(event) end
def periodic_flush
true
end
def flush(options)
[::LogStash::Event.new("message" => "dummy_flush")]
end
def close() end
end
class DummyFlushingFilterPeriodic < DummyFlushingFilter
config_name "dummyflushingfilterperiodic"
def flush(options)
# Don't generate events on the shutdown flush to make sure we actually test the
# periodic flush.
options[:final] ? [] : [::LogStash::Event.new("message" => "dummy_flush")]
end
end
class JavaTestPipeline < LogStash::JavaPipeline
attr_reader :outputs, :settings
end
describe LogStash::JavaPipeline do
let(:worker_thread_count) { 5 }
let(:safe_thread_count) { 1 }
let(:override_thread_count) { 42 }
let(:dead_letter_queue_enabled) { false }
let(:dead_letter_queue_path) { }
let(:pipeline_settings_obj) { LogStash::SETTINGS.clone }
let(:pipeline_settings) { {} }
let(:max_retry) {10} #times
let(:timeout) {120} #seconds
before :each do
pipeline_workers_setting = LogStash::SETTINGS.get_setting("pipeline.workers")
allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count)
dlq_enabled_setting = LogStash::SETTINGS.get_setting("dead_letter_queue.enable")
allow(dlq_enabled_setting).to receive(:value).and_return(dead_letter_queue_enabled)
dlq_path_setting = LogStash::SETTINGS.get_setting("path.dead_letter_queue")
allow(dlq_path_setting).to receive(:value).and_return(dead_letter_queue_path)
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
end
describe "#ephemeral_id" do
it "creates an ephemeral_id at creation time" do
pipeline = mock_java_pipeline_from_string("input { generator { count => 1 } } output { null {} }")
expect(pipeline.ephemeral_id).to_not be_nil
pipeline.close
second_pipeline = mock_java_pipeline_from_string("input { generator { count => 1 } } output { null {} }")
expect(second_pipeline.ephemeral_id).not_to eq(pipeline.ephemeral_id)
second_pipeline.close
end
end
describe "event cancellation" do
# test harness for https://github.com/elastic/logstash/issues/6055
let(:output) { LogStash::Outputs::DummyOutputWithEventsArray.new }
before do
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputwitheventsarray").and_return(LogStash::Outputs::DummyOutputWithEventsArray)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "drop").and_call_original
allow(LogStash::Plugin).to receive(:lookup).with("filter", "mutate").and_call_original
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_call_original
allow(LogStash::Outputs::DummyOutputWithEventsArray).to receive(:new).with(any_args).and_return(output)
end
let(:config) do
<<-CONFIG
input {
generator {
lines => ["1", "2", "END"]
count => 1
}
}
filter {
if [message] == "1" {
drop {}
}
mutate { add_tag => ["notdropped"] }
}
output { dummyoutputwitheventsarray {} }
CONFIG
end
it "should not propagate cancelled events from filter to output" do
abort_on_exception_state = Thread.abort_on_exception
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(3).for do
# give us a bit of time to flush the events
# puts("*****" + output.events.map{|e| e.message}.to_s)
output.events.map{|e| e.get("message")}.include?("END")
end.to be_truthy
end
expect(output.events.size).to eq(2)
expect(output.events[0].get("tags")).to eq(["notdropped"])
expect(output.events[1].get("tags")).to eq(["notdropped"])
pipeline.shutdown
t.join
Thread.abort_on_exception = abort_on_exception_state
end
end
describe "defaulting the pipeline workers based on thread safety" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummysafefilter").and_return(DummySafeFilter)
end
context "when there are some not threadsafe filters" do
let(:test_config_with_filters) {
<<-eos
input {
dummyinput {}
}
filter {
dummyfilter {}
}
output {
dummyoutput {}
}
eos
}
describe "debug compiled" do
let(:logger) { double("pipeline logger").as_null_object }
before do
expect(::LogStash::JavaPipeline).to receive(:logger).and_return(logger)
allow(logger).to receive(:debug?).and_return(true)
end
it "should not receive a debug message with the compiled code" do
pipeline_settings_obj.set("config.debug", false)
expect(logger).not_to receive(:debug).with(/Compiled pipeline/, anything)
pipeline = mock_java_pipeline_from_string(test_config_with_filters)
pipeline.close
end
it "should print the compiled code if config.debug is set to true" do
pipeline_settings_obj.set("config.debug", true)
expect(logger).to receive(:debug).with(/Compiled pipeline/, anything)
pipeline = mock_java_pipeline_from_string(test_config_with_filters, pipeline_settings_obj)
pipeline.close
end
end
context "when there is no command line -w N set" do
it "starts one filter thread" do
msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads"
pipeline = mock_java_pipeline_from_string(test_config_with_filters)
expect(pipeline.logger).to receive(:warn).with(msg,
hash_including({:count_was=>worker_thread_count, :filters=>["dummyfilter"]}))
pipeline.run
expect(pipeline.worker_threads.size).to eq(safe_thread_count)
pipeline.shutdown
end
end
context "when there is command line -w N set" do
let(:pipeline_settings) { {"pipeline.workers" => override_thread_count } }
it "starts multiple filter thread" do
msg = "Warning: Manual override - there are filters that might" +
" not work with multiple worker threads"
pipeline = mock_java_pipeline_from_string(test_config_with_filters, pipeline_settings_obj)
expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:worker_threads=> override_thread_count, :filters=>["dummyfilter"]}))
pipeline.run
expect(pipeline.worker_threads.size).to eq(override_thread_count)
pipeline.shutdown
end
end
end
context "when there are threadsafe filters only" do
let(:test_config_with_filters) {
<<-eos
input {
dummyinput {}
}
filter {
dummysafefilter {}
}
output {
dummyoutput {}
}
eos
}
it "starts multiple filter threads" do
skip("This test has been failing periodically since November 2016. Tracked as https://github.com/elastic/logstash/issues/6245")
pipeline = mock_java_pipeline_from_string(test_config_with_filters)
pipeline.run
expect(pipeline.worker_threads.size).to eq(worker_thread_count)
pipeline.shutdown
end
end
end
context "close" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:test_config_without_output_workers) {
<<-eos
input {
dummyinput {}
}
output {
dummyoutput {}
}
eos
}
let(:test_config_with_output_workers) {
<<-eos
input {
dummyinput {}
}
output {
dummyoutput {
workers => 2
}
}
eos
}
context "output close" do
let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) }
let(:output) { pipeline.outputs.first }
before do
allow(output).to receive(:do_close)
end
after do
pipeline.shutdown
end
it "should call close of output without output-workers" do
pipeline.run
expect(output).to have_received(:do_close).once
end
end
end
context "with no explicit ids declared" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:config) { "input { dummyinput { codec => plain { format => 'something' } } } filter { dummyfilter {} } output { dummyoutput {} }"}
let(:pipeline) { mock_java_pipeline_from_string(config) }
after do
# If you don't start/stop the pipeline it won't release the queue lock and will
# cause the suite to fail :(
pipeline.close
end
it "should use LIR provided IDs" do
expect(pipeline.inputs.first.id).to eq(pipeline.lir.input_plugin_vertices.first.id)
expect(pipeline.filters.first.id).to eq(pipeline.lir.filter_plugin_vertices.first.id)
expect(pipeline.outputs.first.id).to eq(pipeline.lir.output_plugin_vertices.first.id)
end
end
context "compiled flush function" do
extend PipelineHelpers
describe "flusher thread" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:config) { "input { dummyinput {} } output { dummyoutput {} }"}
it "should start the flusher thread only after the pipeline is running" do
pipeline = mock_java_pipeline_from_string(config)
expect(pipeline).to receive(:transition_to_running).ordered.and_call_original
expect(pipeline).to receive(:start_flusher).ordered.and_call_original
pipeline.run
pipeline.shutdown
end
end
context "cancelled events should not propagate down the filters" do
config <<-CONFIG
filter {
drop {}
}
CONFIG
sample_one("hello") do
expect(subject).to eq(nil)
end
end
context "new events should propagate down the filters" do
config <<-CONFIG
filter {
clone {
clones => ["clone1"]
}
}
CONFIG
sample_one(["foo", "bar"]) do
expect(subject.size).to eq(4)
end
end
end
describe "max inflight warning" do
let(:config) { "input { dummyinput {} } output { dummyoutput {} }" }
let(:batch_size) { 1 }
let(:pipeline_settings) { { "pipeline.batch.size" => batch_size, "pipeline.workers" => 1 } }
let(:pipeline) { mock_java_pipeline_from_string(config, pipeline_settings_obj) }
let(:logger) { pipeline.logger }
let(:warning_prefix) { Regexp.new("CAUTION: Recommended inflight events max exceeded!") }
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(logger).to receive(:warn)
# pipeline must be first called outside the thread context because it lazily initialize and will create a
# race condition if called in the thread
p = pipeline
t = Thread.new { p.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
pipeline.shutdown
t.join
end
it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do
expect(logger).not_to have_received(:warn).with(warning_prefix)
end
context "with a too large inflight count" do
let(:batch_size) { LogStash::JavaPipeline::MAX_INFLIGHT_WARN_THRESHOLD + 1 }
it "should raise a max inflight warning if the max_inflight count is exceeded" do
expect(logger).to have_received(:warn).with(warning_prefix, hash_including(:pipeline_id => anything))
end
end
end
context "compiled filter functions" do
context "new events should propagate down the filters" do
extend PipelineHelpers
config <<-CONFIG
filter {
clone {
clones => ["clone1", "clone2"]
}
mutate {
add_field => {"foo" => "bar"}
}
}
CONFIG
sample_one("hello") do
expect(subject.size).to eq(3)
expect(subject[0].get("message")).to eq("hello")
expect(subject[0].get("type")).to be_nil
expect(subject[0].get("foo")).to eq("bar")
expect(subject[1].get("message")).to eq("hello")
expect(subject[1].get("type")).to eq("clone1")
expect(subject[1].get("foo")).to eq("bar")
expect(subject[2].get("message")).to eq("hello")
expect(subject[2].get("type")).to eq("clone2")
expect(subject[2].get("foo")).to eq("bar")
end
end
end
context "metrics" do
config = "input { } filter { } output { }"
let(:settings) { LogStash::SETTINGS.clone }
subject { mock_java_pipeline_from_string(config, settings, metric) }
after :each do
subject.close
end
context "when metric.collect is disabled" do
before :each do
settings.set("metric.collect", false)
end
context "if namespaced_metric is nil" do
let(:metric) { nil }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
end
context "if namespaced_metric is a Metric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::Metric.new(collector) }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
context "if namespaced_metric is a NullMetric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::NullMetric.new(collector) }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(::LogStash::Instrument::NullMetric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
end
context "when metric.collect is enabled" do
before :each do
settings.set("metric.collect", true)
end
context "if namespaced_metric is nil" do
let(:metric) { nil }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
end
context "if namespaced_metric is a Metric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::Metric.new(collector) }
it "uses a `Metric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::Metric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
context "if namespaced_metric is a NullMetric object" do
let(:collector) { ::LogStash::Instrument::Collector.new }
let(:metric) { ::LogStash::Instrument::NullMetric.new(collector) }
it "uses a `NullMetric` object" do
expect(subject.metric).to be_a(LogStash::Instrument::NullMetric)
end
it "uses the same collector" do
expect(subject.metric.collector).to be(collector)
end
end
end
end
context "Periodic Flush" do
let(:config) do
<<-EOS
input {
dummy_input {}
}
filter {
dummy_flushing_filter {}
}
output {
dummy_output {}
}
EOS
end
let(:output) { ::LogStash::Outputs::DummyOutput.new }
before do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
end
it "flush periodically" do
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(10).for do
# give us a bit of time to flush the events
output.events.empty?
end.to be_falsey
end
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
pipeline.shutdown
t.join
end
end
context "#started_at" do
# use a run limiting count to shutdown the pipeline automatically
let(:config) do
<<-EOS
input {
generator { count => 10 }
}
EOS
end
subject { mock_java_pipeline_from_string(config) }
context "when the pipeline is not started" do
after :each do
subject.close
end
it "returns nil when the pipeline isnt started" do
expect(subject.started_at).to be_nil
end
end
it "return when the pipeline started working" do
subject.run
expect(subject.started_at).to be < Time.now
subject.shutdown
end
end
context "#uptime" do
let(:config) do
<<-EOS
input {
generator {}
}
EOS
end
subject { mock_java_pipeline_from_string(config) }
context "when the pipeline is not started" do
after :each do
subject.close
end
it "returns 0" do
expect(subject.uptime).to eq(0)
end
end
context "when the pipeline is started" do
it "return the duration in milliseconds" do
# subject must be first call outside the thread context because of lazy initialization
s = subject
t = Thread.new { s.run }
Timeout.timeout(timeout) do
sleep(0.1) until subject.ready?
end
Timeout.timeout(timeout) do
sleep(0.1)
end
expect(subject.uptime).to be > 0
subject.shutdown
t.join
end
end
end
context "when collecting metrics in the pipeline" do
let(:metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) }
subject { mock_java_pipeline_from_string(config, pipeline_settings_obj, metric) }
let(:pipeline_settings) { { "pipeline.id" => pipeline_id } }
let(:pipeline_id) { "main" }
let(:number_of_events) { 420 }
let(:dummy_id) { "my-multiline" }
let(:dummy_id_other) { "my-multiline_other" }
let(:dummy_output_id) { "my-dummyoutput" }
let(:generator_id) { "my-generator" }
let(:config) do
<<-EOS
input {
generator {
count => #{number_of_events}
id => "#{generator_id}"
}
}
filter {
dummyfilter {
id => "#{dummy_id}"
}
dummyfilter {
id => "#{dummy_id_other}"
}
}
output {
dummyoutput {
id => "#{dummy_output_id}"
}
}
EOS
end
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) }
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
let(:pipeline_thread) do
# subject has to be called for the first time outside the thread because it will create a race condition
# with the subject.ready? call since subject is lazily initialized
s = subject
Thread.new { s.run }
end
before :each do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(LogStash::Filters::DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
pipeline_thread
Timeout.timeout(timeout) do
sleep(0.1) until subject.ready?
end
# make sure we have received all the generated events
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(3).for do
# give us a bit of time to flush the events
dummyoutput.events.size >= number_of_events
end.to be_truthy
end
end
after :each do
subject.shutdown
pipeline_thread.join
end
context "global metric" do
let(:collected_metric) { metric_store.get_with_path("stats/events") }
it "populates the different metrics" do
expect(collected_metric[:stats][:events][:duration_in_millis].value).not_to be_nil
expect(collected_metric[:stats][:events][:in].value).to eq(number_of_events)
expect(collected_metric[:stats][:events][:filtered].value).to eq(number_of_events)
expect(collected_metric[:stats][:events][:out].value).to eq(number_of_events)
end
end
context "pipelines" do
let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") }
it "populates the pipelines core metrics" do
expect(collected_metric[:stats][:pipelines][:main][:events][:duration_in_millis].value).not_to be_nil
expect(collected_metric[:stats][:pipelines][:main][:events][:in].value).to eq(number_of_events)
expect(collected_metric[:stats][:pipelines][:main][:events][:filtered].value).to eq(number_of_events)
expect(collected_metric[:stats][:pipelines][:main][:events][:out].value).to eq(number_of_events)
end
it "populates the filter metrics" do
[dummy_id, dummy_id_other].map(&:to_sym).each do |id|
[:in, :out].each do |metric_key|
plugin_name = id.to_sym
expect(collected_metric[:stats][:pipelines][:main][:plugins][:filters][plugin_name][:events][metric_key].value).to eq(number_of_events)
end
end
end
it "populates the output metrics" do
plugin_name = dummy_output_id.to_sym
expect(collected_metric[:stats][:pipelines][:main][:plugins][:outputs][plugin_name][:events][:in].value).to eq(number_of_events)
expect(collected_metric[:stats][:pipelines][:main][:plugins][:outputs][plugin_name][:events][:out].value).to eq(number_of_events)
expect(collected_metric[:stats][:pipelines][:main][:plugins][:outputs][plugin_name][:events][:duration_in_millis].value).not_to be_nil
end
it "populates the name of the output plugin" do
plugin_name = dummy_output_id.to_sym
expect(collected_metric[:stats][:pipelines][:main][:plugins][:outputs][plugin_name][:name].value).to eq(::LogStash::Outputs::DummyOutput.config_name)
end
it "populates the name of the filter plugin" do
[dummy_id, dummy_id_other].map(&:to_sym).each do |id|
plugin_name = id.to_sym
expect(collected_metric[:stats][:pipelines][:main][:plugins][:filters][plugin_name][:name].value).to eq(LogStash::Filters::DummyFilter.config_name)
end
end
context 'when dlq is disabled' do
let (:collect_stats) { subject.collect_dlq_stats}
let (:collected_stats) { collected_metric[:stats][:pipelines][:main][:dlq]}
let (:available_stats) {[:path, :queue_size_in_bytes]}
it 'should show not show any dlq stats' do
collect_stats
expect(collected_stats).to be_nil
end
end
context 'when dlq is enabled' do
let (:dead_letter_queue_enabled) { true }
let (:dead_letter_queue_path) { Stud::Temporary.directory }
let (:pipeline_dlq_path) { "#{dead_letter_queue_path}/#{pipeline_id}"}
let (:collect_stats) { subject.collect_dlq_stats }
let (:collected_stats) { collected_metric[:stats][:pipelines][:main][:dlq]}
it 'should show dlq stats' do
collect_stats
# A newly created dead letter queue with no entries will have a size of 1 (the version 'header')
expect(collected_stats[:queue_size_in_bytes].value).to eq(1)
end
end
end
end
context "Pipeline object" do
before do
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:pipeline1) { mock_java_pipeline_from_string("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
let(:pipeline2) { mock_java_pipeline_from_string("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") }
# multiple pipelines cannot be instantiated using the same PQ settings, force memory queue
before :each do
pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type")
allow(pipeline_workers_setting).to receive(:value).and_return("memory")
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
end
it "should not add ivars" do
expect(pipeline1.instance_variables).to eq(pipeline2.instance_variables)
end
end
context "#system" do
after do
pipeline.close # close the queue
end
context "when the pipeline is a system pipeline" do
let(:pipeline) { mock_java_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.system" => true)) }
it "returns true" do
expect(pipeline.system?).to be_truthy
end
end
context "when the pipeline is not a system pipeline" do
let(:pipeline) { mock_java_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.system" => false)) }
it "returns true" do
expect(pipeline.system?).to be_falsey
end
end
end
context "#reloadable?" do
after do
pipeline.close # close the queue
end
context "when all plugins are reloadable and pipeline is configured as reloadable" do
let(:pipeline) { mock_java_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.reloadable" => true)) }
it "returns true" do
expect(pipeline.reloadable?).to be_truthy
end
end
context "when the plugins are not reloadable and pipeline is configured as reloadable" do
let(:pipeline) { mock_java_pipeline_from_string("input { stdin {} } output { null {} }", mock_settings("pipeline.reloadable" => true)) }
it "returns true" do
expect(pipeline.reloadable?).to be_falsey
end
end
context "when all plugins are reloadable and pipeline is configured as non-reloadable" do
let(:pipeline) { mock_java_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.reloadable" => false)) }
it "returns true" do
expect(pipeline.reloadable?).to be_falsey
end
end
end
end

View file

@ -54,6 +54,11 @@ def mock_pipeline_from_string(config_string, settings = LogStash::SETTINGS, metr
LogStash::Pipeline.new(pipeline_config, metric)
end
def mock_java_pipeline_from_string(config_string, settings = LogStash::SETTINGS, metric = nil)
pipeline_config = mock_pipeline_config(settings.get("pipeline.id"), config_string, settings)
LogStash::JavaPipeline.new(pipeline_config, metric)
end
def mock_pipeline_config(pipeline_id, config_string = nil, settings = {})
config_string = "input { stdin { id => '#{pipeline_id}' }}" if config_string.nil?

View file

@ -0,0 +1,97 @@
require "logstash/agent"
require "logstash/java_pipeline"
require "logstash/event"
require "stud/try"
require "rspec/expectations"
require "thread"
java_import org.logstash.common.SourceWithMetadata
module PipelineHelpers
class SpecSamplerInput < LogStash::Inputs::Base
config_name "spec_sampler_input"
def register
end
def run(queue)
unless @@event.nil?
queue.push_batch(@@event)
@@event = nil
end
end
def close
@@event = nil
end
def self.set_event(event)
@@event = event
end
end
class SpecSamplerOutput < LogStash::Outputs::Base
config_name "spec_sampler_output"
def register
@@seen = []
end
def multi_receive(events)
@@seen += events
end
def self.seen
@@seen
end
end
def sample_one(sample_event, &block)
name = sample_event.is_a?(String) ? sample_event : LogStash::Json.dump(sample_event)
name = name[0..50] + "..." if name.length > 50
before do
LogStash::PLUGIN_REGISTRY.add(:input, "spec_sampler_input", SpecSamplerInput)
LogStash::PLUGIN_REGISTRY.add(:output, "spec_sampler_output", SpecSamplerOutput)
end
describe "\"#{name}\"" do
let(:pipeline) do
settings = ::LogStash::SETTINGS.clone
settings.set_value("queue.drain", true)
settings.set_value("pipeline.workers", 1)
LogStash::JavaPipeline.new(
LogStash::Config::PipelineConfig.new(
LogStash::Config::Source::Local, :main,
SourceWithMetadata.new(
"config_string", "config_string",
"input { spec_sampler_input {} }\n" + config + "\noutput { spec_sampler_output {} }"
), settings
)
)
end
let(:event) do
sample_event = [sample_event] unless sample_event.is_a?(Array)
next sample_event.collect do |e|
e = { "message" => e } if e.is_a?(String)
next LogStash::Event.new(e)
end
end
let(:results) do
SpecSamplerInput.set_event event
pipeline.run
SpecSamplerOutput.seen
end
after do
pipeline.close
end
subject {results.length > 1 ? results : results.first}
it("when processed", &block)
end
end
end

View file

@ -27,6 +27,7 @@ import org.jruby.RubyNil;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.ext.bigdecimal.RubyBigDecimal;
import org.jruby.util.ByteList;
import org.logstash.ext.JrubyTimestampExtLibrary;
public final class ObjectMappers {
@ -44,6 +45,7 @@ public final class ObjectMappers {
private static final SimpleModule CBOR_DESERIALIZERS =
new SimpleModule("CborRubyDeserializers")
.addDeserializer(RubyString.class, new RubyStringDeserializer())
.addDeserializer(RubyNil.class, new RubyNilDeserializer());
public static final ObjectMapper JSON_MAPPER =
@ -85,8 +87,7 @@ public final class ObjectMappers {
* Serializer for {@link RubyString} since Jackson can't handle that type natively, so we
* simply serialize it as if it were a {@link String}.
*/
private static final class RubyStringSerializer
extends ObjectMappers.NonTypedScalarSerializer<RubyString> {
private static final class RubyStringSerializer extends StdSerializer<RubyString> {
RubyStringSerializer() {
super(RubyString.class);
@ -98,6 +99,30 @@ public final class ObjectMappers {
throws IOException {
generator.writeString(value.asJavaString());
}
@Override
public void serializeWithType(final RubyString value, final JsonGenerator jgen,
final SerializerProvider serializers, final TypeSerializer typeSer) throws IOException {
final WritableTypeId typeId =
typeSer.typeId(value, RubyString.class, JsonToken.VALUE_STRING);
typeSer.writeTypePrefix(jgen, typeId);
final ByteList bytes = value.getByteList();
jgen.writeBinary(bytes.getUnsafeBytes(), 0, bytes.length());
typeSer.writeTypeSuffix(jgen, typeId);
}
}
public static final class RubyStringDeserializer extends StdDeserializer<RubyString> {
RubyStringDeserializer() {
super(RubyString.class);
}
@Override
public RubyString deserialize(final JsonParser p, final DeserializationContext ctxt)
throws IOException {
return RubyString.newString(RubyUtil.RUBY, p.getBinaryValue());
}
}
/**
@ -113,8 +138,7 @@ public final class ObjectMappers {
@Override
public void serialize(final RubySymbol value, final JsonGenerator generator,
final SerializerProvider provider)
throws IOException {
final SerializerProvider provider) throws IOException {
generator.writeString(value.asJavaString());
}
}

View file

@ -0,0 +1,382 @@
package org.logstash.config.ir;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.jruby.RubyHash;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.Rubyfier;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.compiler.Dataset;
import org.logstash.config.ir.compiler.EventCondition;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.graph.IfVertex;
import org.logstash.config.ir.graph.PluginVertex;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.config.ir.imperative.PluginStatement;
import org.logstash.ext.JrubyEventExtLibrary;
/**
* <h3>Compiled Logstash Pipeline Configuration.</h3>
* This class represents an executable pipeline, compiled from the configured topology that is
* learnt from {@link PipelineIR}.
* Each compiled pipeline consists in graph of {@link Dataset} that represent either a
* {@code filter}, {@code output} or an {@code if} condition.
*/
public final class CompiledPipeline {
/**
* Configured inputs.
*/
private final Collection<IRubyObject> inputs;
/**
* Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}.
*/
private final Map<String, RubyIntegration.Filter> filters;
/**
* Immutable collection of filters that flush on shutdown.
*/
private final Collection<RubyIntegration.Filter> shutdownFlushes;
/**
* Immutable collection of filters that flush periodically.
*/
private final Collection<RubyIntegration.Filter> periodicFlushes;
/**
* Configured outputs.
*/
private final Map<String, RubyIntegration.Output> outputs;
/**
* Parsed pipeline configuration graph.
*/
private final PipelineIR pipelineIR;
/**
* Ruby pipeline object.
*/
private final RubyIntegration.Pipeline pipeline;
public CompiledPipeline(final PipelineIR pipelineIR, final RubyIntegration.Pipeline pipeline) {
this.pipelineIR = pipelineIR;
this.pipeline = pipeline;
inputs = setupInputs();
filters = setupFilters();
outputs = setupOutputs();
shutdownFlushes = Collections.unmodifiableList(
filters.values().stream().filter(RubyIntegration.Filter::hasFlush)
.collect(Collectors.toList())
);
periodicFlushes = Collections.unmodifiableList(
shutdownFlushes.stream().filter(RubyIntegration.Filter::periodicFlush)
.collect(Collectors.toList())
);
}
public Collection<RubyIntegration.Filter> shutdownFlushers() {
return shutdownFlushes;
}
public Collection<RubyIntegration.Filter> periodicFlushers() {
return periodicFlushes;
}
public Collection<RubyIntegration.Output> outputs() {
return Collections.unmodifiableCollection(outputs.values());
}
public Collection<RubyIntegration.Filter> filters() {
return Collections.unmodifiableCollection(filters.values());
}
public Collection<IRubyObject> inputs() {
return inputs;
}
public RubyIntegration.Plugin registerPlugin(final RubyIntegration.Plugin plugin) {
plugin.register();
return plugin;
}
/**
* This method contains the actual compilation of the {@link Dataset} representing the
* underlying pipeline from the Queue to the outputs.
* @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology
*/
public Dataset buildExecution() {
return new CompiledPipeline.CompiledExecution().toDataset();
}
/**
* Sets up all Ruby outputs learnt from {@link PipelineIR}.
*/
private Map<String, RubyIntegration.Output> setupOutputs() {
final Collection<PluginVertex> outs = pipelineIR.getOutputPluginVertices();
final Map<String, RubyIntegration.Output> res = new HashMap<>(outs.size());
outs.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
res.put(v.getId(), pipeline.buildOutput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
));
});
return res;
}
/**
* Sets up all Ruby filters learnt from {@link PipelineIR}.
*/
private Map<String, RubyIntegration.Filter> setupFilters() {
final Collection<PluginVertex> filterPlugins = pipelineIR.getFilterPluginVertices();
final Map<String, RubyIntegration.Filter> res =
new HashMap<>(filterPlugins.size(), 1.0F);
for (final PluginVertex plugin : filterPlugins) {
res.put(plugin.getId(), buildFilter(plugin));
}
return res;
}
/**
* Sets up all Ruby inputs learnt from {@link PipelineIR}.
*/
private Collection<IRubyObject> setupInputs() {
final Collection<PluginVertex> vertices = pipelineIR.getInputPluginVertices();
final Collection<IRubyObject> nodes = new HashSet<>(vertices.size());
vertices.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
nodes.add(pipeline.buildInput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
));
});
return nodes;
}
/**
* Converts plugin arguments from the format provided by {@link PipelineIR} into coercible
* Ruby types.
* @param def PluginDefinition as provided by {@link PipelineIR}
* @return RubyHash of plugin arguments as understood by {@link RubyIntegration.Pipeline}
* methods
*/
private RubyHash convertArgs(final PluginDefinition def) {
final RubyHash converted = RubyHash.newHash(RubyUtil.RUBY);
for (final Map.Entry<String, Object> entry : def.getArguments().entrySet()) {
final Object value = entry.getValue();
final String key = entry.getKey();
final Object toput;
if (value instanceof PluginStatement) {
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
toput = pipeline.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments())
);
} else {
toput = value;
}
converted.put(key, toput);
}
return converted;
}
/**
* Compiles a {@link RubyIntegration.Filter} from a given {@link PluginVertex}.
* @param vertex Filter {@link PluginVertex}
* @return Compiled {@link RubyIntegration.Filter}
*/
private RubyIntegration.Filter buildFilter(final PluginVertex vertex) {
final PluginDefinition def = vertex.getPluginDefinition();
final SourceWithMetadata source = vertex.getSourceWithMetadata();
return pipeline.buildFilter(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
);
}
/**
* Checks if a certain {@link Vertex} represents a {@link RubyIntegration.Filter}.
* @param vertex Vertex to check
* @return True iff {@link Vertex} represents a {@link RubyIntegration.Filter}
*/
private boolean isFilter(final Vertex vertex) {
return filters.containsKey(vertex.getId());
}
/**
* Checks if a certain {@link Vertex} represents a {@link RubyIntegration.Output}.
* @param vertex Vertex to check
* @return True iff {@link Vertex} represents a {@link RubyIntegration.Output}
*/
private boolean isOutput(final Vertex vertex) {
return outputs.containsKey(vertex.getId());
}
/**
* Compiles an {@link IfVertex} into an {@link EventCondition}.
* @param iff IfVertex to build condition for
* @return EventCondition for given {@link IfVertex}
*/
private static EventCondition buildCondition(final IfVertex iff) {
return EventCondition.Compiler.buildCondition(iff.getBooleanExpression());
}
/**
* Instances of this class represent a fully compiled pipeline execution. Note that this class
* has a separate lifecycle from {@link CompiledPipeline} because it holds per (worker-thread)
* state and thus needs to be instantiated once per thread.
*/
private final class CompiledExecution {
/**
* Compiled {@link IfVertex, indexed by their ID as returned by {@link Vertex#getId()}.
*/
private final Map<String, Dataset.SplitDataset> iffs = new HashMap<>(5);
/**
* Cached {@link Dataset} compiled from {@link PluginVertex} indexed by their ID as returned
* by {@link Vertex#getId()} to avoid duplicate computations.
*/
private final Map<String, Dataset> plugins = new HashMap<>(5);
private final Dataset compiled;
CompiledExecution() {
compiled = compile();
}
Dataset toDataset() {
return compiled;
}
/**
* Instantiates the graph of compiled {@link Dataset}.
* @return Compiled {@link Dataset} representing the pipeline.
*/
private Dataset compile() {
final Collection<Dataset> datasets = new ArrayList<>();
pipelineIR.getGraph()
.allLeaves()
.filter(CompiledPipeline.this::isOutput)
.forEach(leaf -> datasets.add(
outputDataset(leaf.getId(), flatten(Dataset.ROOT_DATASETS, leaf))
)
);
return Dataset.TerminalDataset.from(datasets);
}
/**
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
* the application of the given filter.
* @param vertex Vertex Id of the filter to create this {@link Dataset} for
* @param datasets All the datasets that pass through this filter
* @return Filter {@link Dataset}
*/
private Dataset filterDataset(final String vertex, final Collection<Dataset> datasets) {
return plugins.computeIfAbsent(vertex, v -> {
final Dataset filter;
final RubyIntegration.Filter ruby = filters.get(v);
if (ruby.hasFlush()) {
if (ruby.periodicFlush()) {
filter = new Dataset.FilteredFlushableDataset(datasets, ruby);
} else {
filter = new Dataset.FilteredShutdownFlushableDataset(datasets, ruby);
}
} else {
filter = new Dataset.FilteredDataset(datasets, ruby);
}
return filter;
});
}
/**
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
* the application of the given output.
* @param vertexId Vertex Id of the filter to create this {@link Dataset} for
* filter node in the topology once
* @param datasets All the datasets that are passed into this output
* @return Output {@link Dataset}
*/
private Dataset outputDataset(final String vertexId, final Collection<Dataset> datasets) {
return plugins.computeIfAbsent(
vertexId, v -> new Dataset.OutputDataset(datasets, outputs.get(v))
);
}
/**
* Split the given {@link Dataset}s and return the dataset half of their elements that contains
* the {@link JrubyEventExtLibrary.RubyEvent} that fulfil the given {@link EventCondition}.
* @param datasets Datasets to split
* @param condition Condition that must be fulfilled
* @param index Vertex id to cache the resulting {@link Dataset} under
* @return The half of the datasets contents that fulfils the condition
*/
private Dataset.SplitDataset split(final Collection<Dataset> datasets,
final EventCondition condition, final String index) {
return iffs
.computeIfAbsent(index, ind -> new Dataset.SplitDataset(datasets, condition));
}
/**
* Compiles the next level of the execution from the given {@link Vertex} or simply return
* the given {@link Dataset} at the previous level if the starting {@link Vertex} cannot
* be expanded any further (i.e. doesn't have any more incoming vertices that are either
* a {code filter} or and {code if} statement).
* @param datasets Nodes from the last already compiled level
* @param start Vertex to compile children for
* @return Datasets originating from given {@link Vertex}
*/
private Collection<Dataset> flatten(final Collection<Dataset> datasets,
final Vertex start) {
final Collection<Vertex> dependencies = start.incomingVertices()
.filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex)
.collect(Collectors.toList());
return dependencies.isEmpty() ? datasets
: compileDependencies(start, datasets, dependencies);
}
/**
* Compiles all child vertices for a given vertex.
* @param datasets Datasets from previous stage
* @param start Start Vertex that got expanded
* @param dependencies Dependencies of {@code start}
* @return Datasets compiled from vertex children
*/
private Collection<Dataset> compileDependencies(final Vertex start,
final Collection<Dataset> datasets, final Collection<Vertex> dependencies) {
return dependencies.stream().map(
dependency -> {
final Collection<Dataset> transientDependencies = flatten(datasets, dependency);
if (isFilter(dependency)) {
return filterDataset(dependency.getId(), transientDependencies);
} else if (isOutput(dependency)) {
return outputDataset(dependency.getId(), transientDependencies);
} else {
// We know that it's an if vertex since the the input children are either
// output, filter or if in type.
final IfVertex ifvert = (IfVertex) dependency;
final EventCondition iff = buildCondition(ifvert);
final String index = ifvert.getId();
// It is important that we double check that we are actually dealing with the
// positive/left branch of the if condition
if (ifvert.getOutgoingBooleanEdgesByType(true).stream()
.anyMatch(edge -> Objects.equals(edge.getTo(), start))) {
return split(transientDependencies, iff, index);
} else {
return split(transientDependencies, iff, index).right();
}
}
}).collect(Collectors.toList());
}
}
}

View file

@ -1,18 +1,36 @@
package org.logstash.config.ir;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.expression.*;
import org.logstash.config.ir.expression.binary.*;
import org.logstash.config.ir.expression.BooleanExpression;
import org.logstash.config.ir.expression.EventValueExpression;
import org.logstash.config.ir.expression.Expression;
import org.logstash.config.ir.expression.RegexValueExpression;
import org.logstash.config.ir.expression.ValueExpression;
import org.logstash.config.ir.expression.binary.And;
import org.logstash.config.ir.expression.binary.Eq;
import org.logstash.config.ir.expression.binary.Gt;
import org.logstash.config.ir.expression.binary.Gte;
import org.logstash.config.ir.expression.binary.In;
import org.logstash.config.ir.expression.binary.Lt;
import org.logstash.config.ir.expression.binary.Lte;
import org.logstash.config.ir.expression.binary.Neq;
import org.logstash.config.ir.expression.binary.Or;
import org.logstash.config.ir.expression.binary.RegexEq;
import org.logstash.config.ir.expression.unary.Not;
import org.logstash.config.ir.expression.unary.Truthy;
import org.logstash.config.ir.graph.Graph;
import org.logstash.config.ir.graph.IfVertex;
import org.logstash.config.ir.graph.PluginVertex;
import org.logstash.config.ir.imperative.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.logstash.config.ir.imperative.ComposedParallelStatement;
import org.logstash.config.ir.imperative.ComposedSequenceStatement;
import org.logstash.config.ir.imperative.ComposedStatement;
import org.logstash.config.ir.imperative.IfStatement;
import org.logstash.config.ir.imperative.NoopStatement;
import org.logstash.config.ir.imperative.PluginStatement;
import org.logstash.config.ir.imperative.Statement;
/**
* Created by andrewvc on 9/15/16.
@ -107,6 +125,10 @@ public class DSL {
return new And(null, left, right);
}
public static Not eNand(Expression left, Expression right) throws InvalidIRException {
return eNot(eAnd(left, right));
}
public static Or eOr(SourceWithMetadata meta, Expression left, Expression right) {
return new Or(meta, left, right);
}
@ -115,6 +137,10 @@ public class DSL {
return new Or(null, left, right);
}
public static Or eXor(Expression left, Expression right) throws InvalidIRException {
return eOr(eAnd(eNot(left), right), eAnd(left, eNot(right)));
}
public static RegexEq eRegexEq(SourceWithMetadata meta, Expression left, ValueExpression right) throws InvalidIRException {
return new RegexEq(meta, left, right);
}

View file

@ -1,19 +1,19 @@
package org.logstash.config.ir;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.logstash.common.Util;
import org.logstash.config.ir.graph.Graph;
import org.logstash.config.ir.graph.PluginVertex;
import org.logstash.config.ir.graph.QueueVertex;
import org.logstash.config.ir.graph.Vertex;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Created by andrewvc on 9/20/16.
*/
public class PipelineIR implements Hashable {
public final class PipelineIR implements Hashable {
private String uniqueHash;
public Graph getGraph() {

View file

@ -11,7 +11,7 @@ import org.logstash.common.SourceWithMetadata;
/**
* Created by andrewvc on 9/20/16.
*/
public class PluginDefinition implements SourceComponent, HashableWithSource {
public final class PluginDefinition implements SourceComponent, HashableWithSource {
@Override
public String hashSource() {

View file

@ -0,0 +1,469 @@
package org.logstash.config.ir.compiler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.jruby.RubyArray;
import org.jruby.RubyHash;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
/**
* <p>A trueData structure backed by a {@link RubyArray} that represents one step of execution flow of a
* batch is lazily filled with {@link JrubyEventExtLibrary.RubyEvent} computed from its dependent
* {@link Dataset}.</p>
* <p>Each {@link Dataset} either represents a filter, output or one branch of an {@code if}
* statement in a Logstash configuration file.</p>
* <p>Note: It does seem more natural to use the {@code clear} invocation to set the next
* batch of input trueData. For now this is intentionally not implemented since we want to clear
* the trueData stored in the Dataset tree as early as possible and before the output operations
* finish. Once the whole execution tree including the output operation is implemented in
* Java, this API can be adjusted as such.</p>
*/
public interface Dataset {
/**
* Compute the actual contents of the backing {@link RubyArray} and cache them.
* Repeated invocations will be effectively free.
* @param batch Input {@link JrubyEventExtLibrary.RubyEvent} received at the root
* of the execution
* @param flush True if flushing flushable nodes while traversing the execution
* @param shutdown True if this is the last call to this instance's compute method because
* the pipeline it belongs to is shut down
* @return Computed {@link RubyArray} of {@link JrubyEventExtLibrary.RubyEvent}
*/
Collection<JrubyEventExtLibrary.RubyEvent> compute(RubyIntegration.Batch batch,
boolean flush, boolean shutdown);
/**
* Removes all data from the instance and all of its parents, making the instance ready for
* use with a new set of input data.
*/
void clear();
/**
* Root {@link Dataset}s at the beginning of the execution tree that simply pass through
* the given set of {@link JrubyEventExtLibrary.RubyEvent} and have no state.
*/
Collection<Dataset> ROOT_DATASETS = Collections.singleton(
new Dataset() {
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(
final RubyIntegration.Batch batch, final boolean flush, final boolean shutdown) {
return batch.to_a();
}
@Override
public void clear() {
}
}
);
/**
* <p>{@link Dataset} that contains all {@link JrubyEventExtLibrary.RubyEvent} instances of all
* from its dependencies and is assumed to be at the end of an execution.</p>
* This dataset does not require an explicit call to {@link Dataset#clear()} since it will
* automatically {@code clear} all of its parents.
*/
final class TerminalDataset implements Dataset {
/**
* Empty {@link Collection} returned by this class's
* {@link Dataset#compute(RubyIntegration.Batch, boolean, boolean)} implementation.
*/
private static final Collection<JrubyEventExtLibrary.RubyEvent> EMPTY_RETURN =
Collections.emptyList();
/**
* Trivial {@link Dataset} that simply returns an empty collection of elements.
*/
private static final Dataset EMPTY_DATASET = new Dataset() {
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(
final RubyIntegration.Batch batch, final boolean flush, final boolean shutdown) {
return EMPTY_RETURN;
}
@Override
public void clear() {
}
};
private final Collection<Dataset> parents;
/**
* <p>Builds a terminal {@link Dataset} from the given parent {@link Dataset}s.</p>
* <p>If the given set of parent {@link Dataset} is empty the sum is defined as the
* trivial dataset that does not invoke any computation whatsoever.</p>
* {@link Dataset#compute(RubyIntegration.Batch, boolean, boolean)} is always
* {@link Collections#emptyList()}.
* @param parents Parent {@link Dataset} to sum and terminate
* @return Dataset representing the sum of given parent {@link Dataset}
*/
public static Dataset from(final Collection<Dataset> parents) {
final int count = parents.size();
final Dataset result;
if (count > 0) {
result = new Dataset.TerminalDataset(parents);
} else {
result = EMPTY_DATASET;
}
return result;
}
private TerminalDataset(final Collection<Dataset> parents) {
this.parents = parents;
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
final boolean flush, final boolean shutdown) {
parents.forEach(dataset -> dataset.compute(batch, flush, shutdown));
this.clear();
return EMPTY_RETURN;
}
@Override
public void clear() {
for (final Dataset parent : parents) {
parent.clear();
}
}
}
/**
* {@link Dataset} that results from the {@code if} branch of its backing
* {@link EventCondition} being applied to all of its dependencies.
*/
final class SplitDataset implements Dataset {
private final Collection<Dataset> parents;
private final EventCondition func;
private final Collection<JrubyEventExtLibrary.RubyEvent> trueData;
private final Collection<JrubyEventExtLibrary.RubyEvent> falseData;
private final Dataset opposite;
private boolean done;
public SplitDataset(final Collection<Dataset> parents,
final EventCondition eventCondition) {
this.parents = parents;
this.func = eventCondition;
done = false;
trueData = new ArrayList<>(5);
falseData = new ArrayList<>(5);
opposite = new Dataset.SplitDataset.Complement(this, falseData);
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
final boolean flush, final boolean shutdown) {
if (done) {
return trueData;
}
for (final Dataset set : parents) {
for (final JrubyEventExtLibrary.RubyEvent event
: set.compute(batch, flush, shutdown)) {
if (func.fulfilled(event)) {
trueData.add(event);
} else {
falseData.add(event);
}
}
}
done = true;
return trueData;
}
@Override
public void clear() {
for (final Dataset parent : parents) {
parent.clear();
}
trueData.clear();
falseData.clear();
done = false;
}
public Dataset right() {
return opposite;
}
/**
* Complementary {@link Dataset} to a {@link Dataset.SplitDataset} representing the
* negative branch of the {@code if} statement.
*/
private static final class Complement implements Dataset {
/**
* Positive branch of underlying {@code if} statement.
*/
private final Dataset parent;
/**
* This collection is shared with {@link Dataset.SplitDataset.Complement#parent} and
* mutated when calling its {@code compute} method. This class does not directly compute
* it.
*/
private final Collection<JrubyEventExtLibrary.RubyEvent> data;
private boolean done;
/**
* Ctor.
* @param left Positive Branch {@link Dataset.SplitDataset}
* @param complement Collection of {@link JrubyEventExtLibrary.RubyEvent}s that did
* not match {@code left}
*/
private Complement(
final Dataset left, final Collection<JrubyEventExtLibrary.RubyEvent> complement) {
this.parent = left;
data = complement;
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(
final RubyIntegration.Batch batch, final boolean flush, final boolean shutdown) {
if (done) {
return data;
}
parent.compute(batch, flush, shutdown);
done = true;
return data;
}
@Override
public void clear() {
parent.clear();
done = false;
}
}
}
/**
* {@link Dataset} resulting from applying a backing {@link RubyIntegration.Filter} to all
* dependent {@link Dataset}.
*/
final class FilteredDataset implements Dataset {
private final Collection<Dataset> parents;
private final RubyIntegration.Filter func;
private final Collection<JrubyEventExtLibrary.RubyEvent> data;
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
private boolean done;
public FilteredDataset(Collection<Dataset> parents, final RubyIntegration.Filter func) {
this.parents = parents;
this.func = func;
data = new ArrayList<>(5);
buffer = new ArrayList<>(5);
done = false;
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
final boolean flush, final boolean shutdown) {
if (done) {
return data;
}
for (final Dataset set : parents) {
buffer.addAll(set.compute(batch, flush, shutdown));
}
done = true;
data.addAll(func.multiFilter(buffer));
buffer.clear();
return data;
}
@Override
public void clear() {
for (final Dataset parent : parents) {
parent.clear();
}
data.clear();
done = false;
}
}
/**
* {@link Dataset} resulting from applying a backing {@link RubyIntegration.Filter} that flushes
* periodically to all dependent {@link Dataset}.
*/
final class FilteredFlushableDataset implements Dataset {
public static final RubyHash FLUSH_FINAL = flushOpts(true);
private static final RubyHash FLUSH_NOT_FINAL = flushOpts(false);
private final Collection<Dataset> parents;
private final RubyIntegration.Filter func;
private final Collection<JrubyEventExtLibrary.RubyEvent> data;
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
private boolean done;
public FilteredFlushableDataset(Collection<Dataset> parents,
final RubyIntegration.Filter func) {
this.parents = parents;
this.func = func;
data = new ArrayList<>(5);
buffer = new ArrayList<>(5);
done = false;
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
final boolean flush, final boolean shutdown) {
if (done) {
return data;
}
for (final Dataset set : parents) {
buffer.addAll(set.compute(batch, flush, shutdown));
}
done = true;
data.addAll(func.multiFilter(buffer));
if (flush) {
data.addAll(func.flush(shutdown ? FLUSH_FINAL : FLUSH_NOT_FINAL));
}
buffer.clear();
return data;
}
@Override
public void clear() {
for (final Dataset parent : parents) {
parent.clear();
}
data.clear();
done = false;
}
private static RubyHash flushOpts(final boolean fin) {
final RubyHash res = RubyHash.newHash(RubyUtil.RUBY);
res.put(RubyUtil.RUBY.newSymbol("final"), RubyUtil.RUBY.newBoolean(fin));
return res;
}
}
/**
* {@link Dataset} resulting from applying a backing {@link RubyIntegration.Filter} that does
* flush, but only on shutdown, to all dependent {@link Dataset}.
*/
final class FilteredShutdownFlushableDataset implements Dataset {
private final Collection<Dataset> parents;
private final RubyIntegration.Filter func;
private final Collection<JrubyEventExtLibrary.RubyEvent> data;
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
private boolean done;
public FilteredShutdownFlushableDataset(Collection<Dataset> parents,
final RubyIntegration.Filter func) {
this.parents = parents;
this.func = func;
data = new ArrayList<>(5);
buffer = new ArrayList<>(5);
done = false;
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
final boolean flush, final boolean shutdown) {
if (done) {
return data;
}
for (final Dataset set : parents) {
buffer.addAll(set.compute(batch, flush, shutdown));
}
done = true;
data.addAll(func.multiFilter(buffer));
if (flush && shutdown) {
data.addAll(func.flush(FilteredFlushableDataset.FLUSH_FINAL));
}
buffer.clear();
return data;
}
@Override
public void clear() {
for (final Dataset parent : parents) {
parent.clear();
}
data.clear();
done = false;
}
}
/**
* Output {@link Dataset} that passes all its {@link JrubyEventExtLibrary.RubyEvent}
* to the underlying {@link RubyIntegration.Output#multiReceive(Collection)}.
*/
final class OutputDataset implements Dataset {
/**
* Empty {@link Collection} returned by this class's
* {@link Dataset#compute(RubyIntegration.Batch, boolean, boolean)} implementation.
*/
private static final Collection<JrubyEventExtLibrary.RubyEvent> EMPTY_RETURN =
Collections.emptyList();
private final Collection<Dataset> parents;
private final RubyIntegration.Output output;
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
private boolean done;
public OutputDataset(Collection<Dataset> parents, final RubyIntegration.Output output) {
this.parents = parents;
this.output = output;
buffer = new ArrayList<>(5);
done = false;
}
@Override
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
final boolean flush, final boolean shutdown) {
if(!done) {
for (final Dataset set : parents) {
for (final JrubyEventExtLibrary.RubyEvent event
: set.compute(batch, flush, shutdown)) {
if (!event.getEvent().isCancelled()) {
buffer.add(event);
}
}
}
output.multiReceive(buffer);
done = true;
buffer.clear();
}
return EMPTY_RETURN;
}
@Override
public void clear() {
for (final Dataset parent : parents) {
parent.clear();
}
done = false;
}
}
}

View file

@ -0,0 +1,754 @@
package org.logstash.config.ir.compiler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.jruby.RubyInteger;
import org.jruby.RubyNumeric;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
import org.logstash.ConvertedList;
import org.logstash.ConvertedMap;
import org.logstash.FieldReference;
import org.logstash.PathCache;
import org.logstash.RubyUtil;
import org.logstash.Valuefier;
import org.logstash.config.ir.expression.BinaryBooleanExpression;
import org.logstash.config.ir.expression.BooleanExpression;
import org.logstash.config.ir.expression.EventValueExpression;
import org.logstash.config.ir.expression.Expression;
import org.logstash.config.ir.expression.ValueExpression;
import org.logstash.config.ir.expression.binary.And;
import org.logstash.config.ir.expression.binary.Eq;
import org.logstash.config.ir.expression.binary.Gt;
import org.logstash.config.ir.expression.binary.Gte;
import org.logstash.config.ir.expression.binary.In;
import org.logstash.config.ir.expression.binary.Lt;
import org.logstash.config.ir.expression.binary.Lte;
import org.logstash.config.ir.expression.binary.Neq;
import org.logstash.config.ir.expression.binary.Or;
import org.logstash.config.ir.expression.binary.RegexEq;
import org.logstash.config.ir.expression.unary.Not;
import org.logstash.config.ir.expression.unary.Truthy;
import org.logstash.ext.JrubyEventExtLibrary;
/**
* A pipeline execution "if" condition, compiled from the {@link BooleanExpression} of an
* {@link org.logstash.config.ir.graph.IfVertex}.
*/
public interface EventCondition {
/**
* Checks if {@link JrubyEventExtLibrary.RubyEvent} fulfils the condition.
* @param event RubyEvent to check
* @return True iff event fulfils condition
*/
boolean fulfilled(JrubyEventExtLibrary.RubyEvent event);
/**
* <h3>EventCondition Compiler.</h3>
* Compiles {@link BooleanExpression} into {@link EventCondition} globally ensuring that no
* duplicate {@link EventCondition} are generated by strict synchronization on the internal
* compiler cache {@link EventCondition.Compiler#CACHE}.
*/
final class Compiler {
/**
* {@link EventCondition} that is always {@code true}.
*/
private static final EventCondition TRUE = event -> true;
/**
* {@link EventCondition} that is always {@code false}.
*/
private static final EventCondition FALSE = event -> false;
/**
* Cache of all compiled {@link EventCondition}.
*/
private static final Map<String, EventCondition> CACHE = new HashMap<>(10);
private Compiler() {
//Utility Class.
}
/**
* Compiles a {@link BooleanExpression} into an {@link EventCondition}.
* All compilation is globally {@code synchronized} on {@link EventCondition.Compiler#CACHE}
* to minimize code size by avoiding compiling logically equivalent expressions in more than
* one instance.
* @param expression BooleanExpress to compile
* @return Compiled {@link EventCondition}
*/
public static EventCondition buildCondition(final BooleanExpression expression) {
synchronized (CACHE) {
final String cachekey = expression.toRubyString();
final EventCondition cached = CACHE.get(cachekey);
if (cached != null) {
return cached;
}
final EventCondition condition;
if (expression instanceof Eq) {
condition = eq((Eq) expression);
} else if (expression instanceof RegexEq) {
condition = regex((RegexEq) expression);
} else if (expression instanceof In) {
condition = in((In) expression);
} else if (expression instanceof Or) {
condition = or(booleanPair((BinaryBooleanExpression) expression));
} else if (expression instanceof Truthy) {
condition = truthy((Truthy) expression);
} else if (expression instanceof Not) {
condition = not((Not) expression);
} else if (expression instanceof Gt) {
condition = gt((Gt) expression);
} else if (expression instanceof Gte) {
condition = gte((Gte) expression);
} else if (expression instanceof Lt) {
condition = lt((Lt) expression);
} else if (expression instanceof Lte) {
condition = lte((Lte) expression);
} else if (expression instanceof And) {
condition = and(booleanPair((BinaryBooleanExpression) expression));
} else if (expression instanceof Neq) {
condition = neq((Neq) expression);
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(expression);
}
CACHE.put(cachekey, condition);
return condition;
}
}
/**
* Checks if a {@link BinaryBooleanExpression} consists of a {@link ValueExpression} on the
* left and a {@link EventValueExpression} on the right.
* @param expression Expression to check type for
* @return True if the left branch of the {@link BinaryBooleanExpression} is a
* {@link ValueExpression} and its right side is a {@link EventValueExpression}.
*/
private static boolean vAndE(final BinaryBooleanExpression expression) {
return expression.getLeft() instanceof ValueExpression &&
expression.getRight() instanceof EventValueExpression;
}
private static boolean vAndV(final BinaryBooleanExpression expression) {
return expression.getLeft() instanceof ValueExpression &&
expression.getRight() instanceof ValueExpression;
}
private static boolean eAndV(final BinaryBooleanExpression expression) {
return expression.getLeft() instanceof EventValueExpression &&
expression.getRight() instanceof ValueExpression;
}
private static boolean eAndE(final BinaryBooleanExpression expression) {
return expression.getLeft() instanceof EventValueExpression &&
expression.getRight() instanceof EventValueExpression;
}
private static EventCondition neq(final Neq neq) {
final EventCondition condition;
final Expression uleft = neq.getLeft();
final Expression uright = neq.getRight();
if (eAndV(neq)) {
condition = not(eq((EventValueExpression) uleft, (ValueExpression) uright));
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(uleft, uright);
}
return condition;
}
private static EventCondition truthy(final Truthy truthy) {
final EventCondition condition;
final Expression inner = truthy.getExpression();
if (inner instanceof EventValueExpression) {
condition = truthy((EventValueExpression) inner);
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(inner);
}
return condition;
}
private static EventCondition regex(final RegexEq regex) {
final EventCondition condition;
final Expression uleft = regex.getLeft();
final Expression uright = regex.getRight();
if (eAndV(regex)) {
condition = new EventCondition.Compiler.FieldMatches(
((EventValueExpression) uleft).getFieldName(),
((ValueExpression) uright).get().toString()
);
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(uleft, uright);
}
return condition;
}
private static EventCondition not(final Not not) {
final EventCondition condition;
final Expression inner = not.getExpression();
if (inner instanceof BooleanExpression) {
condition = not(buildCondition((BooleanExpression) inner));
} else if (inner instanceof EventValueExpression) {
condition = not(truthy((EventValueExpression) inner));
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(inner);
}
return condition;
}
private static EventCondition gte(final Gte gte) {
final EventCondition condition;
final Expression uleft = gte.getLeft();
final Expression uright = gte.getRight();
if (eAndV(gte)) {
final EventValueExpression left = (EventValueExpression) uleft;
final ValueExpression right = (ValueExpression) uright;
condition = or(gt(left, right), eq(left, right));
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(uleft, uright);
}
return condition;
}
private static EventCondition lte(final Lte lte) {
final EventCondition condition;
final Expression uleft = lte.getLeft();
final Expression uright = lte.getRight();
if (eAndV(lte)) {
condition = not(gt((EventValueExpression) uleft, (ValueExpression) uright));
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(uleft, uright);
}
return condition;
}
private static EventCondition lt(final Lt lt) {
final EventCondition condition;
final Expression uleft = lt.getLeft();
final Expression uright = lt.getRight();
if (eAndV(lt)) {
final EventValueExpression left = (EventValueExpression) uleft;
final ValueExpression right = (ValueExpression) uright;
condition = not(or(gt(left, right), eq(left, right)));
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(uleft, uright);
}
return condition;
}
private static EventCondition in(final In in) {
final Expression left = in.getLeft();
final Expression right = in.getRight();
final EventCondition condition;
if (eAndV(in) && isScalar((ValueExpression) in.getRight())) {
condition = new EventCondition.Compiler.FieldInConstantScalar(
PathCache.cache(((EventValueExpression) left).getFieldName()),
((ValueExpression) right).get().toString()
);
} else if (vAndE(in) && isScalar((ValueExpression) in.getLeft())) {
final Object leftv = ((ValueExpression) left).get();
final FieldReference rfield =
PathCache.cache(((EventValueExpression) right).getFieldName());
if (leftv instanceof String) {
condition = new EventCondition.Compiler.ConstantStringInField(
rfield, (String) leftv
);
} else {
condition = new EventCondition.Compiler.ConstantScalarInField(rfield, leftv);
}
} else if (eAndV(in) && listValueRight(in)) {
condition = in(
(EventValueExpression) left, (List<?>) ((ValueExpression) right).get()
);
} else if (eAndE(in)) {
condition = in((EventValueExpression) right, (EventValueExpression) left);
} else if (vAndV(in)) {
condition = in((ValueExpression) left, (ValueExpression) right);
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(left, right);
}
return condition;
}
private static EventCondition in(final EventValueExpression left, final List<?> right) {
return new EventCondition.Compiler.FieldInConstantList(
PathCache.cache(left.getFieldName()), right
);
}
/**
* Compiles a constant (due to both of its sides being constant {@link ValueExpression})
* conditional into either {@link EventCondition.Compiler#TRUE} or
* {@link EventCondition.Compiler#FALSE}.
* @param left Constant left side {@link ValueExpression}
* @param right Constant right side {@link ValueExpression}
* @return Either {@link EventCondition.Compiler#TRUE} or
* {@link EventCondition.Compiler#FALSE}
*/
private static EventCondition in(final ValueExpression left, final ValueExpression right) {
final Object found = right.get();
final Object other = left.get();
if (found instanceof ConvertedList && other instanceof RubyString) {
return ((ConvertedList) found).stream().anyMatch(item -> item.toString()
.equals(other.toString())) ? TRUE : FALSE;
} else if (found instanceof RubyString && other instanceof RubyString) {
return found.toString().contains(other.toString()) ? TRUE : FALSE;
} else if (found instanceof RubyString && other instanceof ConvertedList) {
return ((ConvertedList) other).stream()
.anyMatch(item -> item.toString().equals(found.toString())) ? TRUE : FALSE;
} else {
return found != null && other != null && found.equals(other) ? TRUE : FALSE;
}
}
private static boolean listValueRight(final In in) {
return ((ValueExpression) in.getRight()).get() instanceof List;
}
private static boolean isScalar(final ValueExpression expression) {
final Object value = expression.get();
return value instanceof String || value instanceof Number;
}
private static EventCondition in(final EventValueExpression left,
final EventValueExpression right) {
return new EventCondition.Compiler.FieldInField(
PathCache.cache(left.getFieldName()), PathCache.cache(right.getFieldName())
);
}
private static EventCondition eq(final EventValueExpression evalE,
final ValueExpression valE) {
final Object value = valE.get();
final String field = evalE.getFieldName();
if (value instanceof String) {
return new EventCondition.Compiler.FieldEqualsString(field, (String) value);
} else if (value instanceof Long || value instanceof Integer ||
value instanceof Short) {
return new EventCondition.Compiler.FieldEqualsLong(
field, ((Number) value).longValue()
);
}
throw new EventCondition.Compiler.UnexpectedTypeException(value);
}
private static EventCondition eq(final Eq equals) {
final Expression left = equals.getLeft();
final Expression right = equals.getRight();
final EventCondition condition;
if (eAndV(equals)) {
condition = eq((EventValueExpression) left, (ValueExpression) right);
} else if (vAndE(equals)) {
condition = eq((EventValueExpression) right, (ValueExpression) left);
} else if (eAndE(equals)) {
condition = eq((EventValueExpression) left, (EventValueExpression) right);
} else if (vAndV(equals)) {
condition = ((ValueExpression) left).get()
.equals(((ValueExpression) right).get()) ? TRUE : FALSE;
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(left, right);
}
return condition;
}
private static EventCondition eq(final EventValueExpression first,
final EventValueExpression second) {
return new EventCondition.Compiler.FieldEqualsField(
PathCache.cache(first.getFieldName()), PathCache.cache(second.getFieldName())
);
}
private static EventCondition gt(final Gt greater) {
final EventCondition condition;
final Expression left = greater.getLeft();
final Expression right = greater.getRight();
if (eAndV(greater)) {
condition = gt((EventValueExpression) left, (ValueExpression) right);
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(left, right);
}
return condition;
}
private static EventCondition gt(final EventValueExpression left,
final ValueExpression right) {
final Object value = right.get();
final String field = left.getFieldName();
if (value instanceof String) {
return new EventCondition.Compiler.FieldGreaterThanString(field, (String) value);
} else if (value instanceof Long || value instanceof Integer ||
value instanceof Short) {
return new FieldGreaterThanNumber(
field, RubyUtil.RUBY.newFixnum(((Number) value).longValue())
);
}
throw new EventCondition.Compiler.UnexpectedTypeException(value);
}
private static EventCondition truthy(final EventValueExpression evalE) {
return new EventCondition.Compiler.FieldTruthy(PathCache.cache(evalE.getFieldName()));
}
private static EventCondition[] booleanPair(final BinaryBooleanExpression expression) {
final Expression left = expression.getLeft();
final Expression right = expression.getRight();
final EventCondition first;
final EventCondition second;
if (left instanceof BooleanExpression && right instanceof BooleanExpression) {
first = buildCondition((BooleanExpression) left);
second = buildCondition((BooleanExpression) right);
} else if (eAndE(expression)) {
first = truthy((EventValueExpression) left);
second = truthy((EventValueExpression) right);
} else if (left instanceof BooleanExpression && right instanceof EventValueExpression) {
first = buildCondition((BooleanExpression) left);
second = truthy((EventValueExpression) right);
} else if (right instanceof BooleanExpression &&
left instanceof EventValueExpression) {
first = truthy((EventValueExpression) left);
second = buildCondition((BooleanExpression) right);
} else {
throw new EventCondition.Compiler.UnexpectedTypeException(left, right);
}
return new EventCondition[]{first, second};
}
private static EventCondition not(final EventCondition condition) {
return new EventCondition.Compiler.Negated(condition);
}
private static EventCondition or(final EventCondition... conditions) {
return new EventCondition.Compiler.OrCondition(conditions[0], conditions[1]);
}
private static EventCondition and(final EventCondition... conditions) {
return new EventCondition.Compiler.AndCondition(conditions[0], conditions[1]);
}
/**
* Contains function using Ruby equivalent comparison logic.
* @param list List to find value in
* @param value Value to find in list
* @return True iff value is in list
*/
private static boolean contains(final ConvertedList list, final Object value) {
for (final Object element : list) {
if (value.equals(element)) {
return true;
}
}
return false;
}
private static final class Negated implements EventCondition {
private final EventCondition condition;
Negated(final EventCondition condition) {
this.condition = condition;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
return !condition.fulfilled(event);
}
}
private static final class AndCondition implements EventCondition {
private final EventCondition first;
private final EventCondition second;
AndCondition(final EventCondition first, final EventCondition second) {
this.first = first;
this.second = second;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
return first.fulfilled(event) && second.fulfilled(event);
}
}
private static final class OrCondition implements EventCondition {
private final EventCondition first;
private final EventCondition second;
OrCondition(final EventCondition first, final EventCondition second) {
this.first = first;
this.second = second;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
return first.fulfilled(event) || second.fulfilled(event);
}
}
private static final class FieldGreaterThanString implements EventCondition {
private final FieldReference field;
private final RubyString value;
private FieldGreaterThanString(final String field, final String value) {
this.field = PathCache.cache(field);
this.value = RubyUtil.RUBY.newString(value);
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
return value.compareTo(
(IRubyObject) event.getEvent().getUnconvertedField(field)
) < 0;
}
}
private static final class FieldGreaterThanNumber implements EventCondition {
private final FieldReference field;
private final RubyNumeric value;
private FieldGreaterThanNumber(final String field, final RubyNumeric value) {
this.field = PathCache.cache(field);
this.value = value;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
return value.compareTo(
(IRubyObject) event.getEvent().getUnconvertedField(field)
) < 0;
}
}
private static final class FieldEqualsString implements EventCondition {
private final FieldReference field;
private final RubyString value;
private FieldEqualsString(final String field, final String value) {
this.field = PathCache.cache(field);
this.value = RubyUtil.RUBY.newString(value);
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object val = event.getEvent().getUnconvertedField(field);
return value.equals(val);
}
}
private static final class FieldEqualsLong implements EventCondition {
private final FieldReference field;
private final long value;
private FieldEqualsLong(final String field, final long value) {
this.field = PathCache.cache(field);
this.value = value;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object val = event.getEvent().getUnconvertedField(field);
return val instanceof RubyInteger && ((RubyInteger) val).getLongValue() == value;
}
}
private static final class FieldEqualsField implements EventCondition {
private final FieldReference one;
private final FieldReference other;
private FieldEqualsField(final FieldReference one, final FieldReference other) {
this.one = one;
this.other = other;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
return event.getEvent().getUnconvertedField(one)
.equals(event.getEvent().getUnconvertedField(other));
}
}
private static final class FieldMatches implements EventCondition {
private final FieldReference field;
private final RubyString regex;
private FieldMatches(final String field, final String regex) {
this.field = PathCache.cache(field);
this.regex = RubyUtil.RUBY.newString(regex);
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object tomatch = event.getEvent().getUnconvertedField(field);
return tomatch instanceof RubyString &&
!((RubyString) tomatch).match(RubyUtil.RUBY.getCurrentContext(), regex).isNil();
}
}
private static final class ConstantStringInField implements EventCondition {
private final FieldReference field;
private final ByteList bytes;
private final RubyString string;
private ConstantStringInField(final FieldReference field, final String value) {
this.field = field;
this.string = RubyUtil.RUBY.newString(value);
bytes = string.getByteList();
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object found = event.getEvent().getUnconvertedField(field);
return found instanceof RubyString &&
((RubyString) found).getByteList().indexOf(bytes) > -1
|| found instanceof ConvertedList && contains((ConvertedList) found, string);
}
}
private static final class ConstantScalarInField implements EventCondition {
private final FieldReference field;
private final Object value;
private ConstantScalarInField(final FieldReference field, final Object value) {
this.field = field;
this.value = Valuefier.convert(value);
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object found = event.getEvent().getUnconvertedField(field);
return found instanceof ConvertedList && contains((ConvertedList) found, value)
|| Objects.equals(found, field);
}
}
private static final class FieldInConstantScalar implements EventCondition {
private final FieldReference field;
private final ByteList value;
private FieldInConstantScalar(final FieldReference field, final String value) {
this.field = field;
this.value = RubyUtil.RUBY.newString(value).getByteList();
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object found = event.getEvent().getUnconvertedField(field);
return found instanceof RubyString &&
value.indexOf(((RubyString) found).getByteList()) > -1;
}
}
private static final class FieldInField implements EventCondition {
private final FieldReference left;
private final FieldReference right;
private FieldInField(final FieldReference left, final FieldReference right) {
this.left = left;
this.right = right;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object lfound = event.getEvent().getUnconvertedField(left);
final Object rfound = event.getEvent().getUnconvertedField(right);
if (lfound instanceof ConvertedList || lfound instanceof ConvertedMap) {
return false;
} else if (lfound instanceof RubyString && rfound instanceof RubyString) {
return ((RubyString) lfound).getByteList()
.indexOf(((RubyString) rfound).getByteList()) > -1;
} else if (rfound instanceof ConvertedList) {
return contains((ConvertedList) rfound, lfound);
} else {
return lfound != null && rfound != null && lfound.equals(rfound);
}
}
}
private static final class FieldInConstantList implements EventCondition {
private final FieldReference field;
private final List<?> value;
private FieldInConstantList(final FieldReference field, final List<?> value) {
this.field = field;
this.value = value;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object found = event.getEvent().getUnconvertedField(field);
return found != null &&
value.stream().anyMatch(val -> val.toString().equals(found.toString()));
}
}
private static final class FieldTruthy implements EventCondition {
private final FieldReference field;
private FieldTruthy(final FieldReference field) {
this.field = field;
}
@Override
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
final Object object = event.getEvent().getUnconvertedField(field);
if (object == null) {
return false;
}
final String other = object.toString();
return other != null && !other.isEmpty() &&
!Boolean.toString(false).equals(other);
}
}
/**
* This {@link IllegalArgumentException} is thrown when the inputs to an {@code if}
* condition do not conform to the expected types. It being thrown is a bug in Logstash
* in every case.
*/
private static final class UnexpectedTypeException extends IllegalArgumentException {
private static final long serialVersionUID = 1L;
UnexpectedTypeException(final Expression left, final Expression right) {
super(
String.format("Unexpected input types %s %s", left.getClass(), right.getClass())
);
}
UnexpectedTypeException(final Object inner) {
super(String.format("Unexpected input type %s", inner.getClass()));
}
}
}
}

View file

@ -0,0 +1,97 @@
package org.logstash.config.ir.compiler;
import java.util.Collection;
import org.jruby.RubyHash;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ext.JrubyEventExtLibrary;
/**
* This class holds interfaces implemented by Ruby concrete classes.
*/
public final class RubyIntegration {
private RubyIntegration() {
//Utility Class.
}
/**
* A Ruby Plugin.
*/
public interface Plugin {
void register();
}
/**
* A Ruby Filter. Currently, this interface is implemented only by the Ruby class
* {@code FilterDelegator}.
*/
public interface Filter extends RubyIntegration.Plugin {
/**
* Same as {@code FilterDelegator}'s {@code multi_filter}.
* @param events Events to Filter
* @return Filtered {@link JrubyEventExtLibrary.RubyEvent}
*/
Collection<JrubyEventExtLibrary.RubyEvent> multiFilter(
Collection<JrubyEventExtLibrary.RubyEvent> events
);
Collection<JrubyEventExtLibrary.RubyEvent> flush(RubyHash options);
/**
* Checks if this filter has a flush method.
* @return True iff this filter has a flush method
*/
boolean hasFlush();
/**
* Checks if this filter does periodic flushing.
* @return True iff this filter uses periodic flushing
*/
boolean periodicFlush();
}
/**
* A Ruby Output. Currently, this interface is implemented only by the Ruby class
* {@code OutputDelegator}.
*/
public interface Output extends RubyIntegration.Plugin {
void multiReceive(Collection<JrubyEventExtLibrary.RubyEvent> events);
}
/**
* The Main Ruby Pipeline Class. Currently, this interface is implemented only by the Ruby class
* {@code BasePipeline}.
*/
public interface Pipeline {
IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column,
IRubyObject args);
RubyIntegration.Output buildOutput(RubyString name, RubyInteger line, RubyInteger column,
IRubyObject args);
RubyIntegration.Filter buildFilter(RubyString name, RubyInteger line, RubyInteger column,
IRubyObject args);
RubyIntegration.Filter buildCodec(RubyString name, IRubyObject args);
}
/**
* A Ruby {@code ReadBatch} implemented by {@code WrappedSynchronousQueue::ReadClient::ReadBatch}
* and {@code WrappedAckedQueue::ReadClient::ReadBatch}.
*/
public interface Batch {
/**
* Retrieves all {@link JrubyEventExtLibrary.RubyEvent} from the batch that are not
* cancelled.
* @return Collection of all {@link JrubyEventExtLibrary.RubyEvent} in the batch that
* are not cancelled
*/
Collection<JrubyEventExtLibrary.RubyEvent> to_a();
}
}

View file

@ -1,10 +1,7 @@
package org.logstash.config.ir.expression;
import org.jruby.RubyInstanceConfig;
import org.jruby.embed.AttributeName;
import org.jruby.embed.ScriptingContainer;
import org.logstash.config.ir.BaseSourceComponent;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.BaseSourceComponent;
import org.logstash.config.ir.HashableWithSource;
/*
@ -16,19 +13,11 @@ import org.logstash.config.ir.HashableWithSource;
* Created by andrewvc on 9/6/16.
*/
public abstract class Expression extends BaseSourceComponent implements HashableWithSource {
private ScriptingContainer container;
public Expression(SourceWithMetadata meta) {
super(meta);
}
public void compile() {
container = new ScriptingContainer();
container.setCompileMode(RubyInstanceConfig.CompileMode.JIT);
container.setAttribute(AttributeName.SHARING_VARIABLES, false);
container.runScriptlet("def start(event)\n" + this.toString() + "\nend");
}
@Override
public String toString(int indent) {
return toString();

View file

@ -1,18 +1,14 @@
package org.logstash.config.ir.expression;
import org.joni.Option;
import org.joni.Regex;
import org.logstash.config.ir.SourceComponent;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.common.SourceWithMetadata;
import java.nio.charset.StandardCharsets;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.config.ir.SourceComponent;
/**
* Created by andrewvc on 9/15/16.
*/
public class RegexValueExpression extends ValueExpression {
private final Regex regex;
private final String regex;
public RegexValueExpression(SourceWithMetadata meta, Object value) throws InvalidIRException {
super(meta, value);
@ -21,8 +17,7 @@ public class RegexValueExpression extends ValueExpression {
throw new InvalidIRException("Regex value expressions can only take strings!");
}
byte[] patternBytes = getSource().getBytes(StandardCharsets.UTF_8);
this.regex = new Regex(patternBytes, 0, patternBytes.length, Option.NONE);
this.regex = getSource();
}
@Override

View file

@ -1,11 +1,11 @@
package org.logstash.config.ir.expression;
import org.logstash.config.ir.SourceComponent;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.common.SourceWithMetadata;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.config.ir.SourceComponent;
/**
* Created by andrewvc on 9/13/16.
@ -25,7 +25,7 @@ public class ValueExpression extends Expression {
value instanceof BigDecimal ||
value instanceof String ||
value instanceof List ||
value instanceof java.time.Instant
value instanceof Instant
)) {
// This *should* be caught by the treetop grammar, but we need this case just in case there's a bug
// somewhere

View file

@ -7,7 +7,7 @@ import org.logstash.config.ir.InvalidIRException;
/**
* Created by andrewvc on 9/15/16.
*/
public class BooleanEdge extends Edge {
public final class BooleanEdge extends Edge {
public static class BooleanEdgeFactory extends EdgeFactory {
public Boolean getEdgeType() {
return edgeType;

View file

@ -42,4 +42,8 @@ public class PluginStatement extends Statement {
g.addVertex(pluginVertex);
return g;
}
public PluginDefinition getPluginDefinition() {
return pluginDefinition;
}
}

View file

@ -209,7 +209,7 @@ public class DeadLetterQueueReaderTest {
@Test
public void testBlockBoundary() throws Exception {
final int PAD_FOR_BLOCK_SIZE_EVENT = 32513;
final int PAD_FOR_BLOCK_SIZE_EVENT = 32516;
Event event = new Event();
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
Arrays.fill(field, 'e');
@ -234,7 +234,7 @@ public class DeadLetterQueueReaderTest {
@Test
public void testBlockBoundaryMultiple() throws Exception {
Event event = new Event(Collections.emptyMap());
char[] field = new char[7952];
char[] field = new char[7934];
Arrays.fill(field, 'x');
event.setField("message", new String(field));
long startTime = System.currentTimeMillis();
@ -256,11 +256,10 @@ public class DeadLetterQueueReaderTest {
}
}
// This test tests for a single event that ends on a block and segment boundary
@Test
public void testBlockAndSegmentBoundary() throws Exception {
final int PAD_FOR_BLOCK_SIZE_EVENT = 32513;
final int PAD_FOR_BLOCK_SIZE_EVENT = 32516;
Event event = new Event();
event.setField("T", generateMessageContent(PAD_FOR_BLOCK_SIZE_EVENT));
Timestamp timestamp = new Timestamp();
@ -279,7 +278,6 @@ public class DeadLetterQueueReaderTest {
}
}
@Test
public void testWriteReadRandomEventSize() throws Exception {
Event event = new Event(Collections.emptyMap());