ECS Compatibility 7.x Backport (#12308)

Implements a plugin `ecs_compatibility` option, whose default value is powered
by the pipeline-level setting `pipeline.ecs_compatibility`, in line with the
proposal in elastic/logstash#11623:

In order to increase the confidence a user has when upgrading Logstash, this
implementation uses the deprecation logger to warn when `ecs_compatibility` is
used without an explicit directive.

For now, as we continue to add ECS Compatibility Modes, an opting into a
specific ECS Compatibility mode at a pipeline level is considered a BETA
feature. All plugins using the [ECS Compatibility Support][] adapter will
use the setting correctly, but pipelines configured in this way do not
guarantee consistent behaviour across minor versions of Logstash or the
plugins it bundles (e.g., upgraded plugins that have newly-implemented an ECS
Compatibility mode will use the pipeline-level setting as a default, causing
them to potentially behave differently after the upgrade).

This change-set also includes a significant amount of work within the
`PluginFactory`, which allows us to ensure that pipeline-level settings are
available to a Logstash plugin _before_ its `initialize` is executed,
including the maintaining of context for codecs that are routinely cloned.

* JEE: instantiate codecs only once
* PluginFactory: use passed FilterDelegator class
* PluginFactory: require engine name in init
* NOOP: remove useless secondary plugin factory interface
* PluginFactory: simplify, compute java args only when necessary
* PluginFactory: accept explicit id when vertex unavailable
* PluginFactory: make source optional, args required
* PluginFactory: threadsafe refactor of id duplicate tracking
* PluginFactory: make id extraction/geration more abstract/understandable
* PluginFactory: extract or generate ID when source not available
* PluginFactory: inject ExecutionContext before initializing plugins
* Codec: propagate execution_context and metric to clones
* Plugin: intercept string-specified codecs and propagate execution_context
* Plugin: implement `ecs_compatibility` for all plugins
* Plugin: deprecate use of `Config::Mixin::DSL::validate_value(String, :codec)`
This commit is contained in:
Ry Biesemeyer 2020-10-06 08:33:11 -07:00 committed by GitHub
parent ccbc5691cb
commit c606e9f5b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 751 additions and 369 deletions

View file

@ -57,6 +57,7 @@ func normalizeSetting(setting string) (string, error) {
"pipeline.batch.delay",
"pipeline.unsafe_shutdown",
"pipeline.java_execution",
"pipeline.ecs_compatibility"
"pipeline.plugin_classloaders",
"path.config",
"config.string",

View file

@ -75,6 +75,14 @@ class LogStash::Agent
logger.warn("deprecated setting `config.field_reference.parser` set; field reference parsing is strict by default")
end
if @settings.set?('pipeline.ecs_compatibility')
ecs_compatibility_value = settings.get('pipeline.ecs_compatibility')
if ecs_compatibility_value != 'disabled'
logger.warn("Setting `pipeline.ecs_compatibility` given as `#{ecs_compatibility_value}`; " +
"values other than `disabled` are currently considered BETA and may have unintended consequences when upgrading minor versions of Logstash.")
end
end
# This is for backward compatibility in the tests
if source_loader.nil?
@source_loader = LogStash::Config::SourceLoader.new

View file

@ -92,6 +92,8 @@ module LogStash::Codecs; class Base < LogStash::Plugin
public
def clone
return self.class.new(params)
LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, self.class, params).tap do |klone|
klone.metric = @metric if klone.instance_variable_get(:@metric).nil?
end
end
end; end # class LogStash::Codecs::Base

View file

@ -249,12 +249,12 @@ module LogStash; module Config; module AST
# If any parent is a Plugin, this must be a codec.
if attributes.elements.nil?
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, {}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
else
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
end
end
@ -271,7 +271,7 @@ module LogStash; module Config; module AST
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))"
end
end

View file

@ -49,6 +49,7 @@ LogStash::Environment.load_locale!
module LogStash::Config::Mixin
include LogStash::Util::SubstitutionVariables
include LogStash::Util::Loggable
attr_accessor :config
attr_accessor :original_params
@ -99,6 +100,17 @@ module LogStash::Config::Mixin
params[name.to_s] = deep_replace(value)
end
# Intercept codecs that have not been instantiated
params.each do |name, value|
validator = self.class.validator_find(name)
next unless validator && validator[:validate] == :codec && value.kind_of?(String)
codec_klass = LogStash::Plugin.lookup("codec", value)
codec_instance = LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass)
params[name.to_s] = LogStash::Codecs::Delegator.new(codec_instance)
end
if !self.class.validate(params)
raise LogStash::ConfigurationError,
I18n.t("logstash.runner.configuration.invalid_plugin_settings")
@ -190,7 +202,7 @@ module LogStash::Config::Mixin
name = name.to_s if name.is_a?(Symbol)
@config[name] = opts # ok if this is empty
if name.is_a?(String)
if name.is_a?(String) && opts.fetch(:attr_accessor, true)
define_method(name) { instance_variable_get("@#{name}") }
define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) }
end
@ -429,6 +441,11 @@ module LogStash::Config::Mixin
case validator
when :codec
if value.first.is_a?(String)
# A plugin's codecs should be instantiated by `PluginFactory` or in `Config::Mixin#config_init(Hash)`,
# which ensure the inner plugin has access to the outer's execution context and metric store.
# This deprecation exists to warn plugins that call `Config::Mixin::validate_value` directly.
self.deprecation_logger.deprecated("Codec instantiated by `Config::Mixin::DSL::validate_value(String, :codec)` which cannot propagate parent plugin's execution context or metrics. ",
self.logger.debug? ? {:backtrace => caller} : {})
value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new
return true, value
else

View file

@ -62,6 +62,7 @@ module LogStash
Setting::Boolean.new("pipeline.plugin_classloaders", false),
Setting::Boolean.new("pipeline.separate_logs", false),
Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
Setting::CoercibleString.new("pipeline.ecs_compatibility", "disabled", true, %w(disabled v1 v2)),
Setting.new("path.plugins", Array, []),
Setting::NullableString.new("interactive", nil, false),
Setting::Boolean.new("config.debug", false),

View file

@ -124,10 +124,10 @@ class LogStash::Inputs::Base < LogStash::Plugin
def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
# Setting the execution context after initialization is deprecated and will be removed in
# a future release of Logstash. While this code is no longer executed from Logstash core,
# we continue to propagate a set execution context to an input's codec, and rely on super's
# deprecation warning.
@codec.execution_context = context
context
end

View file

@ -127,10 +127,10 @@ class LogStash::Outputs::Base < LogStash::Plugin
def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
# Setting the execution context after initialization is deprecated and will be removed in
# a future release of Logstash. While this code is no longer executed from Logstash core,
# we continue to propagate a set execution context to an output's codec, and rely on super's
# deprecation warning.
@codec.execution_context = context
context
end

View file

@ -89,8 +89,8 @@ module LogStash; class BasePipeline < AbstractPipeline
private
def plugin(plugin_type, name, source, *args)
@plugin_factory.plugin(plugin_type, name, source, *args)
def plugin(plugin_type, name, args, source)
@plugin_factory.plugin(plugin_type, name, args, source)
end
def default_logging_keys(other_keys = {})

View file

@ -16,6 +16,7 @@
# under the License.
require "logstash/config/mixin"
require "logstash/plugins/ecs_compatibility_support"
require "concurrent"
require "securerandom"
@ -24,11 +25,12 @@ require_relative 'plugin_metadata'
class LogStash::Plugin
include LogStash::Util::Loggable
attr_accessor :params, :execution_context
attr_accessor :params
NL = "\n"
include LogStash::Config::Mixin
include LogStash::Plugins::ECSCompatibilitySupport
# Disable or enable metric logging for this specific plugin instance
# by default we record all the metrics we can, but you can disable metrics collection
@ -60,7 +62,7 @@ class LogStash::Plugin
self.class.name == other.class.name && @params == other.params
end
def initialize(params=nil)
def initialize(params={})
@logger = self.logger
@deprecation_logger = self.deprecation_logger
# need to access settings statically because plugins are initialized in config_ast with no context.
@ -177,4 +179,14 @@ class LogStash::Plugin
def plugin_metadata
LogStash::PluginMetadata.for_plugin(self.id)
end
# Deprecated attr_writer for execution_context
def execution_context=(new_context)
@deprecation_logger.deprecated("LogStash::Plugin#execution_context=(new_ctx) is deprecated. Use LogStash::Plugins::Contextualizer#initialize_plugin(new_ctx, klass, args) instead", :caller => caller.first)
@execution_context = new_context
end
def execution_context
@execution_context || LogStash::ExecutionContext::Empty
end
end # class LogStash::Plugin

View file

@ -0,0 +1,53 @@
module LogStash
module Plugins
module ECSCompatibilitySupport
def self.included(base)
base.extend(ArgumentValidator)
base.config(:ecs_compatibility, :validate => :ecs_compatibility_argument,
:attr_accessor => false)
end
MUTEX = Mutex.new
private_constant :MUTEX
def ecs_compatibility
@_ecs_compatibility || MUTEX.synchronize do
@_ecs_compatibility ||= begin
# use config_init-set value if present
break @ecs_compatibility unless @ecs_compatibility.nil?
pipeline = execution_context.pipeline
pipeline_settings = pipeline && pipeline.settings
pipeline_settings ||= LogStash::SETTINGS
if !pipeline_settings.set?('pipeline.ecs_compatibility')
deprecation_logger.deprecated("Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. " +
"To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.")
end
pipeline_settings.get_value('pipeline.ecs_compatibility').to_sym
end
end
end
module ArgumentValidator
V_PREFIXED_INTEGER_PATTERN = %r(\Av[1-9][0-9]?\Z).freeze
private_constant :V_PREFIXED_INTEGER_PATTERN
def validate_value(value, validator)
return super unless validator == :ecs_compatibility_argument
value = deep_replace(value)
value = hash_or_array(value)
if value.size == 1
return true, :disabled if value.first.to_s == 'disabled'
return true, value.first.to_sym if value.first.to_s =~ V_PREFIXED_INTEGER_PATTERN
end
return false, "Expected a v-prefixed integer major-version number (e.g., `v1`) or the literal `disabled`, got #{value.inspect}"
end
end
end
end
end

View file

@ -154,6 +154,11 @@ class LogStash::Runner < Clamp::StrictCommand
:attribute_name => "pipeline.unsafe_shutdown",
:default => LogStash::SETTINGS.get_default("pipeline.unsafe_shutdown")
option ["--pipeline.ecs_compatibility"], "STRING",
I18n.t("logstash.runner.flag.ecs_compatibility"),
:attribute_name => "pipeline.ecs_compatibility",
:default => LogStash::SETTINGS.get_default('pipeline.ecs_compatibility')
# Data Path Setting
option ["--path.data"] , "PATH",
I18n.t("logstash.runner.flag.datapath"),

View file

@ -48,6 +48,7 @@ module LogStash
"pipeline.system",
"pipeline.workers",
"pipeline.ordered",
"pipeline.ecs_compatibility",
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.writes",

View file

@ -362,6 +362,21 @@ en:
if there are still inflight events in memory.
By default, logstash will refuse to quit until all
received events have been pushed to the outputs.
ecs_compatibility: |+
Sets the pipeline's default value for `ecs_compatibility`,
a setting that is available to plugins that implement
an ECS Compatibility mode for use with the Elastic Common
Schema.
Possible values are:
- disabled (default)
- v1
- v2
This option allows the early opt-in (or preemptive opt-out)
of ECS Compatibility modes in plugins, which is scheduled to
be on-by-default in a future major release of Logstash.
Values other than `disabled` are currently considered BETA,
and may produce unintended consequences when upgrading Logstash.
rubyshell: |+
Drop to shell instead of running as normal.
Valid shells are "irb" and "pry"

View file

@ -47,6 +47,31 @@ describe LogStash::Config::Mixin do
end
end
context 'DSL::validate_value(String, :codec)' do
subject(:plugin_class) { Class.new(LogStash::Filters::Base) { config_name "test_deprecated_two" } }
let(:codec_class) { Class.new(LogStash::Codecs::Base) { config_name 'dummy' } }
let(:deprecation_logger) { double("DeprecationLogger").as_null_object }
before(:each) do
allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger)
allow(LogStash::Plugin).to receive(:lookup).with("codec", codec_class.config_name).and_return(codec_class)
end
it 'instantiates the codec' do
success, codec = plugin_class.validate_value(codec_class.config_name, :codec)
expect(success).to be true
expect(codec.class).to eq(codec_class)
end
it 'logs a deprecation' do
plugin_class.validate_value(codec_class.config_name, :codec)
expect(deprecation_logger).to have_received(:deprecated) do |message|
expect(message).to include("validate_value(String, :codec)")
end
end
end
context "when validating :bytes successfully" do
subject do
local_num_bytes = num_bytes # needs to be locally scoped :(

View file

@ -0,0 +1,59 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require "spec_helper"
describe LogStash::Plugins::ExecutionContextFactory do
let(:pipeline) { double('Pipeline') }
let(:agent) { double('Agent') }
let(:inner_dlq_writer) { nil }
subject(:factory) { described_class.new(agent, pipeline, inner_dlq_writer) }
context '#create' do
let(:plugin_id) { SecureRandom.uuid }
let(:plugin_type) { 'input' }
context 'the resulting instance' do
subject(:instance) { factory.create(plugin_id, plugin_type) }
it 'retains the pipeline from the factory' do
expect(instance.pipeline).to be(pipeline)
end
it 'retains the agent from the factory' do
expect(instance.agent).to be(agent)
end
it 'has a dlq_writer' do
expect(instance.dlq_writer).to_not be_nil
end
context 'dlq_writer' do
subject(:instance_dlq_writer) { instance.dlq_writer }
it 'retains the plugin id' do
expect(instance_dlq_writer.plugin_id).to eq(plugin_id)
end
it 'retains the plugin type' do
expect(instance_dlq_writer.plugin_type).to eq(plugin_type)
end
end
end
end
end

View file

@ -30,7 +30,7 @@ describe LogStash::ExecutionContext do
allow(pipeline).to receive(:pipeline_id).and_return(pipeline_id)
end
subject { described_class.new(pipeline, agent, plugin_id, plugin_type, dlq_writer) }
subject { described_class.new(pipeline, agent, dlq_writer) }
it "returns the `pipeline_id`" do
expect(subject.pipeline_id).to eq(pipeline_id)
@ -44,9 +44,7 @@ describe LogStash::ExecutionContext do
expect(subject.agent).to eq(agent)
end
it "returns the plugin-specific dlq writer" do
expect(subject.dlq_writer.plugin_type).to eq(plugin_type)
expect(subject.dlq_writer.plugin_id).to eq(plugin_id)
expect(subject.dlq_writer.inner_writer).to eq(dlq_writer)
it "returns the dlq writer" do
expect(subject.dlq_writer).to be(dlq_writer)
end
end

View file

@ -86,18 +86,34 @@ describe "LogStash::Inputs::Base#decorate" do
subject(:instance) { klass.new({}) }
it "allow to set the context" do
expect(instance.execution_context).to be_nil
instance.execution_context = execution_context
context 'execution_context=' do
let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object }
before(:each) do
allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub)
end
expect(instance.execution_context).to eq(execution_context)
it "allow to set the context" do
new_ctx = execution_context.dup
subject.execution_context = new_ctx
expect(subject.execution_context).to be(new_ctx)
end
it "propagate the context to the codec" do
expect(instance.codec.execution_context).to be_nil
instance.execution_context = execution_context
new_ctx = execution_context.dup
expect(instance.codec.execution_context).to_not be(new_ctx)
instance.execution_context = new_ctx
expect(instance.codec.execution_context).to eq(execution_context)
expect(instance.execution_context).to be(new_ctx)
expect(instance.codec.execution_context).to be(new_ctx)
end
it 'emits a deprecation warning' do
expect(deprecation_logger_stub).to receive(:deprecated) do |message|
expect(message).to match(/execution_context=/)
end
instance.execution_context = execution_context
end
end
end

View file

@ -102,18 +102,34 @@ describe "LogStash::Outputs::Base#new" do
subject(:instance) { klass.new(params.dup) }
it "allow to set the context" do
expect(instance.execution_context).to be_nil
instance.execution_context = execution_context
context 'execution_context=' do
let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object }
before(:each) do
allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub)
end
expect(instance.execution_context).to eq(execution_context)
it "allow to set the context" do
new_ctx = execution_context.dup
subject.execution_context = new_ctx
expect(subject.execution_context).to be(new_ctx)
end
it "propagate the context to the codec" do
expect(instance.codec.execution_context).to be_nil
instance.execution_context = execution_context
new_ctx = execution_context.dup
expect(instance.codec.execution_context).to_not be(new_ctx)
instance.execution_context = new_ctx
expect(instance.codec.execution_context).to eq(execution_context)
expect(instance.execution_context).to be(new_ctx)
expect(instance.codec.execution_context).to be(new_ctx)
end
it 'emits a deprecation warning' do
expect(deprecation_logger_stub).to receive(:deprecated) do |message|
expect(message).to match(/execution_context=/)
end
instance.execution_context = execution_context
end
end
end

View file

@ -69,13 +69,28 @@ describe LogStash::Plugin do
end
context "#execution_context" do
subject { Class.new(LogStash::Plugin).new({}) }
let(:klass) { Class.new(LogStash::Plugin) }
subject(:instance) { klass.new({}) }
include_context "execution_context"
context 'execution_context=' do
let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object }
before(:each) do
allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub)
end
it "can be set and get" do
expect(subject.execution_context).to be_nil
subject.execution_context = execution_context
expect(subject.execution_context).to eq(execution_context)
new_ctx = execution_context.dup
subject.execution_context = new_ctx
expect(subject.execution_context).to eq(new_ctx)
end
it 'emits a deprecation warning' do
expect(deprecation_logger_stub).to receive(:deprecated) do |message|
expect(message).to match(/execution_context=/)
end
instance.execution_context = execution_context
end
end
end
@ -402,6 +417,66 @@ describe LogStash::Plugin do
end
end
describe "#ecs_compatibility" do
let(:plugin_class) do
Class.new(LogStash::Filters::Base) do
config_name "ecs_validator_sample"
def register; end
end
end
let(:config) { Hash.new }
let(:instance) { plugin_class.new(config) }
let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object }
before(:each) do
allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger_stub)
end
context 'when plugin initialized with explicit value' do
let(:config) { super().merge("ecs_compatibility" => "v17") }
it 'returns the explicitly-given value' do
expect(instance.ecs_compatibility).to eq(:v17)
end
end
context 'when plugin is not initialized with an explicit value' do
let(:settings_stub) { LogStash::SETTINGS.clone }
before(:each) do
allow(settings_stub).to receive(:get_value).with(anything).and_call_original # allow spies
stub_const('LogStash::SETTINGS', settings_stub)
end
context 'and pipeline-level setting is explicitly `v1`' do
let(:settings_stub) do
super().tap do |settings|
settings.set_value('pipeline.ecs_compatibility', 'v1')
end
end
it 'reads the setting' do
expect(instance.ecs_compatibility).to eq(:v1)
expect(settings_stub).to have_received(:get_value)
end
end
context 'and pipeline-level setting is not specified' do
it 'emits a deprecation warning about using the default which may change' do
instance.ecs_compatibility
expect(deprecation_logger_stub).to have_received(:deprecated) do |message|
expect(message).to include("Relying on default value of `pipeline.ecs_compatibility`")
end
end
it 'returns `disabled`' do
# Default value of `pipeline.ecs_compatibility`
expect(instance.ecs_compatibility).to eq(:disabled)
end
end
end
end
describe "deprecation logger" do
let(:config) do
{

View file

@ -22,8 +22,9 @@ shared_context "execution_context" do
let(:plugin_id) { :plugin_id }
let(:plugin_type) { :plugin_type }
let(:dlq_writer) { double("dlq_writer") }
let(:execution_context_factory) { ::LogStash::Plugins::ExecutionContextFactory.new(agent, pipeline, dlq_writer) }
let(:execution_context) do
::LogStash::ExecutionContext.new(pipeline, agent, plugin_id, plugin_type, dlq_writer)
execution_context_factory.create(plugin_id, plugin_type)
end
before do

View file

@ -26,6 +26,7 @@ import org.jruby.RubyModule;
import org.jruby.anno.JRubyClass;
import org.jruby.exceptions.RaiseException;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.Block;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ackedqueue.QueueFactoryExt;
@ -74,6 +75,7 @@ import org.logstash.log.LoggerExt;
import org.logstash.log.SlowLoggerExt;
import org.logstash.plugins.HooksRegistryExt;
import org.logstash.plugins.UniversalPluginExt;
import org.logstash.plugins.factory.ContextualizerExt;
import org.logstash.util.UtilExt;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
import org.logstash.plugins.factory.PluginMetricsFactoryExt;
@ -199,6 +201,8 @@ public final class RubyUtil {
public static final RubyClass PLUGIN_FACTORY_CLASS;
public static final RubyModule PLUGIN_CONTEXTUALIZER_MODULE;
public static final RubyClass LOGGER;
public static final RubyModule LOGGABLE_MODULE;
@ -333,7 +337,7 @@ public final class RubyUtil {
UTIL_MODULE = LOGSTASH_MODULE.defineModuleUnder("Util");
UTIL_MODULE.defineAnnotatedMethods(UtilExt.class);
ABSTRACT_DLQ_WRITER_CLASS = UTIL_MODULE.defineClassUnder(
"AbstractDeadLetterQueueWriterExt", RUBY.getObject(),
"AbstractDeadLetterQueueWriter", RUBY.getObject(),
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
ABSTRACT_DLQ_WRITER_CLASS.defineAnnotatedMethods(AbstractDeadLetterQueueWriterExt.class);
@ -401,6 +405,7 @@ public final class RubyUtil {
EXECUTION_CONTEXT_CLASS = setupLogstashClass(
ExecutionContextExt::new, ExecutionContextExt.class
);
EXECUTION_CONTEXT_CLASS.defineConstant("Empty", EXECUTION_CONTEXT_CLASS.newInstance(RUBY.getCurrentContext(), RUBY.getNil(), RUBY.getNil(), RUBY.getNil(), Block.NULL_BLOCK));
RUBY_TIMESTAMP_CLASS = setupLogstashClass(
JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class
);
@ -551,6 +556,8 @@ public final class RubyUtil {
"PluginFactory", RUBY.getObject(), PluginFactoryExt::new
);
PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.class);
PLUGIN_CONTEXTUALIZER_MODULE = PLUGINS_MODULE.defineOrGetModuleUnder("Contextualizer");
PLUGIN_CONTEXTUALIZER_MODULE.defineAnnotatedMethods(ContextualizerExt.class);
UNIVERSAL_PLUGIN_CLASS =
setupLogstashClass(UniversalPluginExt::new, UniversalPluginExt.class);
EVENT_DISPATCHER_CLASS =
@ -636,4 +643,18 @@ public final class RubyUtil {
return JavaUtil.convertJavaToRuby(RUBY, javaObject);
}
/**
* Cast an IRubyObject that may be nil to a specific class
* @param objectOrNil an object of either type {@code <T>} or nil.
* @param <T> the type to cast non-nil values to
* @return The given value, cast to {@code <T>}, or null.
*/
public static <T extends IRubyObject> T nilSafeCast(final IRubyObject objectOrNil) {
if (objectOrNil == null || objectOrNil.isNil()) { return null; }
@SuppressWarnings("unchecked")
final T objectAsCasted = (T) objectOrNil;
return objectAsCasted;
}
}

View file

@ -164,8 +164,9 @@ public final class CompiledPipeline {
outs.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
final Map<String, Object> args = expandArguments(def, cve);
res.put(v.getId(), pluginFactory.buildOutput(
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)
RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source
));
});
return res;
@ -181,8 +182,9 @@ public final class CompiledPipeline {
for (final PluginVertex vertex : filterPlugins) {
final PluginDefinition def = vertex.getPluginDefinition();
final SourceWithMetadata source = vertex.getSourceWithMetadata();
final Map<String, Object> args = expandArguments(def, cve);
res.put(vertex.getId(), pluginFactory.buildFilter(
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)
RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source
));
}
return res;
@ -197,71 +199,47 @@ public final class CompiledPipeline {
vertices.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
final Map<String, Object> args = expandArguments(def, cve);
IRubyObject o = pluginFactory.buildInput(
RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve));
RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source);
nodes.add(o);
});
return nodes;
}
/**
* Converts plugin arguments from the format provided by {@link PipelineIR} into coercible
* Ruby types.
* @param def PluginDefinition as provided by {@link PipelineIR}
* @return RubyHash of plugin arguments as understood by {@link RubyIntegration.PluginFactory}
* methods
*/
private RubyHash convertArgs(final PluginDefinition def) {
final RubyHash convertArgs(final Map<String, Object> input) {
final RubyHash converted = RubyHash.newHash(RubyUtil.RUBY);
for (final Map.Entry<String, Object> entry : def.getArguments().entrySet()) {
for (final Map.Entry<String, Object> entry : input.entrySet()) {
final Object value = entry.getValue();
final String key = entry.getKey();
final Object toput;
if (value instanceof PluginStatement) {
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata();
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
source,
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codec.getArguments()
);
} else {
toput = value;
}
converted.put(key, toput);
converted.put(key, value);
}
return converted;
}
/**
* Converts plugin arguments from the format provided by {@link PipelineIR} into coercible
* Java types for consumption by Java plugins.
* @param def PluginDefinition as provided by {@link PipelineIR}
* @return Map of plugin arguments as understood by the {@link RubyIntegration.PluginFactory}
* methods that create Java plugins
*/
private Map<String, Object> convertJavaArgs(final PluginDefinition def, ConfigVariableExpander cve) {
Map<String, Object> args = expandConfigVariables(cve, def.getArguments());
for (final Map.Entry<String, Object> entry : args.entrySet()) {
final Object value = entry.getValue();
private Map<String, Object> expandArguments(final PluginDefinition pluginDefinition, final ConfigVariableExpander cve) {
Map<String, Object> arguments = expandConfigVariables(cve, pluginDefinition.getArguments());
// Intercept codec definitions from LIR
for (final Map.Entry<String, Object> entry : arguments.entrySet()) {
final String key = entry.getKey();
final IRubyObject toput;
final Object value = entry.getValue();
if (value instanceof PluginStatement) {
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata();
Map<String, Object> codecArgs = expandConfigVariables(cve, codec.getArguments());
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
source,
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codecArgs
);
Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput);
args.put(key, javaCodec);
final PluginStatement codecPluginStatement = (PluginStatement) value;
final PluginDefinition codecDefinition = codecPluginStatement.getPluginDefinition();
final SourceWithMetadata codecSource = codecPluginStatement.getSourceWithMetadata();
final Map<String, Object> codecArguments = expandArguments(codecDefinition, cve);
IRubyObject codecInstance = pluginFactory.buildCodec(RubyUtil.RUBY.newString(codecDefinition.getName()),
Rubyfier.deep(RubyUtil.RUBY, codecArguments),
codecSource);
arguments.put(key, codecInstance);
}
}
return args;
return arguments;
}
@SuppressWarnings({"rawtypes", "unchecked"})

View file

@ -32,9 +32,14 @@ import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.plugins.factory.ContextualizerExt;
import static org.logstash.RubyUtil.PLUGIN_CONTEXTUALIZER_MODULE;
public final class OutputStrategyExt {
@ -176,6 +181,9 @@ public final class OutputStrategyExt {
@JRubyMethod(required = 4)
public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) {
final RubyClass outputClass = (RubyClass) args[0];
final IRubyObject metric = args[1];
final ExecutionContextExt executionContext = (ExecutionContextExt) args[2];
final RubyHash pluginArgs = (RubyHash) args[3];
workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers"));
if (workerCount.isNil()) {
@ -185,12 +193,9 @@ public final class OutputStrategyExt {
workerQueue = new ArrayBlockingQueue<>(count);
workers = context.runtime.newArray(count);
for (int i = 0; i < count; ++i) {
final RubyClass outputClass = (RubyClass) args[0];
// Calling "new" here manually to allow mocking the ctor in RSpec Tests
final IRubyObject output = outputClass.callMethod(context, "new", pluginArgs);
final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs);
initOutputCallsite(outputClass);
output.callMethod(context, "metric=", args[1]);
output.callMethod(context, "execution_context=", args[2]);
output.callMethod(context, "metric=", metric);
workers.append(output);
workerQueue.add(output);
}
@ -248,11 +253,15 @@ public final class OutputStrategyExt {
@JRubyMethod(required = 4)
public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) {
final RubyClass outputClass = (RubyClass) args[0];
final IRubyObject metric = args[1];
final ExecutionContextExt executionContext = (ExecutionContextExt) args[2];
final RubyHash pluginArgs = (RubyHash) args[3];
// TODO: fixup mocks
// Calling "new" here manually to allow mocking the ctor in RSpec Tests
output = args[0].callMethod(context, "new", args[3]);
output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs);
initOutputCallsite(outputClass);
output.callMethod(context, "metric=", args[1]);
output.callMethod(context, "execution_context=", args[2]);
output.callMethod(context, "metric=", metric);
return this;
}

View file

@ -1,90 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.config.ir.compiler;
import co.elastic.logstash.api.Codec;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import org.logstash.common.SourceWithMetadata;
import java.util.Map;
/**
* Factory that can instantiate Java plugins as well as Ruby plugins.
*/
public interface PluginFactory extends RubyIntegration.PluginFactory {
Input buildInput(String name, String id, Configuration configuration, Context context);
Filter buildFilter(String name, String id, Configuration configuration, Context context);
final class Default implements PluginFactory {
private final RubyIntegration.PluginFactory rubyFactory;
public Default(final RubyIntegration.PluginFactory rubyFactory) {
this.rubyFactory = rubyFactory;
}
@Override
public Input buildInput(final String name, final String id, final Configuration configuration, final Context context) {
return null;
}
@Override
public Filter buildFilter(final String name, final String id, final Configuration configuration, final Context context) {
return null;
}
@Override
public IRubyObject buildInput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
return rubyFactory.buildInput(name, source, args, pluginArgs);
}
@Override
public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, final Map<String, Object> pluginArgs) {
return rubyFactory.buildOutput(name, source, args, pluginArgs);
}
@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source,
final IRubyObject args, final Map<String, Object> pluginArgs) {
return rubyFactory.buildFilter(name, source, args, pluginArgs);
}
@Override
public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args,
Map<String, Object> pluginArgs) {
return rubyFactory.buildCodec(name, source, args, pluginArgs);
}
@Override
public Codec buildDefaultCodec(final String codecName) {
return null;
}
}
}

View file

@ -41,17 +41,13 @@ public final class RubyIntegration {
*/
public interface PluginFactory {
IRubyObject buildInput(RubyString name, SourceWithMetadata source,
IRubyObject args, Map<String, Object> pluginArgs);
IRubyObject buildInput(RubyString name, IRubyObject args, SourceWithMetadata source);
AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source,
IRubyObject args, Map<String, Object> pluginArgs);
AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, SourceWithMetadata source);
AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, IRubyObject args,
Map<String, Object> pluginArgs);
AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, SourceWithMetadata source);
IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args,
Map<String, Object> pluginArgs);
IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source);
Codec buildDefaultCodec(String codecName);

View file

@ -0,0 +1,7 @@
package org.logstash.execution;
public enum Engine {
RUBY,
JAVA,
;
}

View file

@ -25,6 +25,7 @@ import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
@ -45,14 +46,16 @@ public final class ExecutionContextExt extends RubyObject {
super(runtime, metaClass);
}
@JRubyMethod(required = 5)
@JRubyMethod(required = 2, optional = 1)
public ExecutionContextExt initialize(final ThreadContext context,
final IRubyObject[] args) {
pipeline = args[0];
agent = args[1];
dlqWriter = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt(
context.runtime, RubyUtil.PLUGIN_DLQ_WRITER_CLASS
).initialize(context, args[4], args[2], args[3]);
if (args.length > 2 && !args[2].isNil()) {
dlqWriter = (AbstractDeadLetterQueueWriterExt) args[2];
} else {
dlqWriter = (AbstractDeadLetterQueueWriterExt) RubyUtil.DUMMY_DLQ_WRITER_CLASS.newInstance(context, Block.NULL_BLOCK);
}
return this;
}
@ -73,6 +76,9 @@ public final class ExecutionContextExt extends RubyObject {
@JRubyMethod(name = "pipeline_id")
public IRubyObject pipelineId(final ThreadContext context) {
if (pipeline.isNil()) {
return context.nil;
}
return pipeline.callMethod(context, "pipeline_id");
}
}

View file

@ -77,7 +77,8 @@ public final class JavaBasePipelineExt extends AbstractPipelineExt {
new ExecutionContextFactoryExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS
).initialize(context, args[3], this, dlqWriter(context)),
RubyUtil.FILTER_DELEGATOR_CLASS
RubyUtil.FILTER_DELEGATOR_CLASS,
Engine.JAVA
),
getSecretStore(context)
);

View file

@ -0,0 +1,99 @@
package org.logstash.plugins.factory;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyModule;
import org.jruby.anno.JRubyMethod;
import org.jruby.anno.JRubyModule;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.ExecutionContextExt;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.jruby.runtime.Helpers.invokeSuper;
import static org.logstash.RubyUtil.PLUGIN_CONTEXTUALIZER_MODULE;
/**
* The {@link ContextualizerExt} is used to inject {@link org.logstash.execution.ExecutionContextExt } to plugins
* before they are initialized.
*
* @see ContextualizerExt#initializePlugin(ThreadContext, IRubyObject, IRubyObject[], Block)
*/
@JRubyModule(name = "Contextualizer")
public class ContextualizerExt {
private static final String EXECUTION_CONTEXT_IVAR_NAME = "@execution_context";
/*
* @overload PluginContextualizer::initialize_plugin(execution_context, plugin_class, *plugin_args, &pluginBlock)
*/
@JRubyMethod(name = "initialize_plugin", meta = true, required = 2, rest = true)
public static IRubyObject initializePlugin(final ThreadContext context,
final IRubyObject recv,
final IRubyObject[] args,
final Block block) {
final List<IRubyObject> argsList = new ArrayList<>(Arrays.asList(args));
final ExecutionContextExt executionContext = RubyUtil.nilSafeCast(argsList.remove(0));
final RubyClass pluginClass = (RubyClass) argsList.remove(0);
final IRubyObject[] pluginArgs = argsList.toArray(new IRubyObject[]{});
return initializePlugin(context, (RubyModule) recv, executionContext, pluginClass, pluginArgs, block);
}
public static IRubyObject initializePlugin(final ThreadContext context,
@Nullable final ExecutionContextExt executionContextExt,
final RubyClass pluginClass,
final RubyHash pluginArgs) {
return initializePlugin(context, PLUGIN_CONTEXTUALIZER_MODULE, executionContextExt, pluginClass, new IRubyObject[]{pluginArgs}, Block.NULL_BLOCK);
}
private static IRubyObject initializePlugin(final ThreadContext context,
final RubyModule recv,
@Nullable final ExecutionContextExt executionContext,
final RubyClass pluginClass,
final IRubyObject[] pluginArgs,
final Block block) {
synchronized (ContextualizerExt.class) {
if (!pluginClass.hasModuleInPrepends(recv)) {
pluginClass.prepend(context, new IRubyObject[]{recv});
}
}
final IRubyObject[] pluginInitArgs;
if (executionContext == null) {
pluginInitArgs = pluginArgs;
} else {
List<IRubyObject> pluginInitArgList = new ArrayList<>(1 + pluginArgs.length);
pluginInitArgList.add(executionContext);
pluginInitArgList.addAll(Arrays.asList(pluginArgs));
pluginInitArgs = pluginInitArgList.toArray(new IRubyObject[]{});
}
// We must use IRubyObject#callMethod(...,"new",...) here to continue supporting
// mocking/validating from rspec.
return pluginClass.callMethod(context, "new", pluginInitArgs, block);
}
@JRubyMethod(name = "initialize", rest = true, frame = true) // framed for invokeSuper
public static IRubyObject initialize(final ThreadContext context,
final IRubyObject recv,
final IRubyObject[] args,
final Block block) {
final List<IRubyObject> argsList = new ArrayList<>(Arrays.asList(args));
if (args.length > 0 && args[0] instanceof ExecutionContextExt) {
final ExecutionContextExt executionContext = (ExecutionContextExt) argsList.remove(0);
recv.getInstanceVariables().setInstanceVariable(EXECUTION_CONTEXT_IVAR_NAME, executionContext);
}
final IRubyObject[] restArgs = argsList.toArray(new IRubyObject[]{});
return invokeSuper(context, recv, restArgs, block);
}
}

View file

@ -46,10 +46,14 @@ public final class ExecutionContextFactoryExt extends RubyBasicObject {
@JRubyMethod
public ExecutionContextExt create(final ThreadContext context, final IRubyObject id,
final IRubyObject classConfigName) {
final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt(
context.runtime, RubyUtil.PLUGIN_DLQ_WRITER_CLASS
).initialize(context, dlqWriter, id, classConfigName);
return new ExecutionContextExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS
).initialize(
context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter}
context, new IRubyObject[]{pipeline, agent, dlqWriterForInstance}
);
}

View file

@ -5,6 +5,7 @@ import org.jruby.*;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
@ -13,6 +14,7 @@ import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.*;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.execution.Engine;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
@ -21,6 +23,7 @@ import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.plugins.PluginLookup;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@JRubyClass(name = "PluginFactory")
public final class PluginFactoryExt extends RubyBasicObject
@ -35,7 +38,9 @@ public final class PluginFactoryExt extends RubyBasicObject
private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id");
private final Collection<String> pluginsById = new HashSet<>();
private final Collection<String> pluginsById = ConcurrentHashMap.newKeySet();
private Engine engine;
private PipelineIR lir;
@ -43,7 +48,7 @@ public final class PluginFactoryExt extends RubyBasicObject
private PluginMetricsFactoryExt metrics;
private RubyClass filterClass;
private RubyClass filterDelegatorClass;
private ConfigVariableExpander configVariables;
@ -54,16 +59,22 @@ public final class PluginFactoryExt extends RubyBasicObject
@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
public static IRubyObject filterDelegator(final ThreadContext context,
final IRubyObject recv, final IRubyObject... args) {
// filterDelegatorClass, klass, rubyArgs, typeScopedMetric, executionCntx
final RubyClass filterDelegatorClass = (RubyClass) args[0];
final RubyClass klass = (RubyClass) args[1];
final RubyHash arguments = (RubyHash) args[2];
final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments);
final AbstractMetricExt typeScopedMetric = (AbstractMetricExt) args[3];
final ExecutionContextExt executionContext = (ExecutionContextExt) args[4];
final IRubyObject filterInstance = ContextualizerExt.initializePlugin(context, executionContext, klass, arguments);
final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY);
filterInstance.callMethod(
context, "metric=",
((AbstractMetricExt) args[3]).namespace(context, id.intern())
typeScopedMetric.namespace(context, id.intern())
);
filterInstance.callMethod(context, "execution_context=", args[4]);
return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS)
.initialize(context, filterInstance, id);
return filterDelegatorClass.newInstance(context, filterInstance, id, Block.NULL_BLOCK);
}
public PluginFactoryExt(final Ruby runtime, final RubyClass metaClass) {
@ -80,25 +91,33 @@ public final class PluginFactoryExt extends RubyBasicObject
final IRubyObject[] args) {
return init(
args[0].toJava(PipelineIR.class),
(PluginMetricsFactoryExt) args[1], (ExecutionContextFactoryExt) args[2],
(RubyClass) args[3]
(PluginMetricsFactoryExt) args[1],
(ExecutionContextFactoryExt) args[2],
(RubyClass) args[3],
EnvironmentVariableProvider.defaultProvider(),
Engine.RUBY
);
}
public PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics,
final ExecutionContextFactoryExt executionContextFactoryExt,
final RubyClass filterClass) {
return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider());
}
PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics,
public PluginFactoryExt init(final PipelineIR lir,
final PluginMetricsFactoryExt metrics,
final ExecutionContextFactoryExt executionContextFactoryExt,
final RubyClass filterClass,
final EnvironmentVariableProvider envVars) {
final Engine engine) {
return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider(), engine);
}
PluginFactoryExt init(final PipelineIR lir,
final PluginMetricsFactoryExt metrics,
final ExecutionContextFactoryExt executionContextFactoryExt,
final RubyClass filterClass,
final EnvironmentVariableProvider envVars,
final Engine engine) {
this.lir = lir;
this.metrics = metrics;
this.executionContextFactory = executionContextFactoryExt;
this.filterClass = filterClass;
this.filterDelegatorClass = filterClass;
this.engine = engine;
this.pluginCreatorsRegistry.put(PluginLookup.PluginType.INPUT, new InputPluginCreator(this));
this.pluginCreatorsRegistry.put(PluginLookup.PluginType.CODEC, new CodecPluginCreator());
this.pluginCreatorsRegistry.put(PluginLookup.PluginType.FILTER, new FilterPluginCreator());
@ -109,71 +128,102 @@ public final class PluginFactoryExt extends RubyBasicObject
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildInput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
public IRubyObject buildInput(final RubyString name,
final IRubyObject args,
final SourceWithMetadata source) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
RubyUtil.RUBY.getCurrentContext(),
PluginLookup.PluginType.INPUT,
name.asJavaString(),
(RubyHash) args,
source
);
}
@SuppressWarnings("unchecked")
@Override
public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractOutputDelegatorExt buildOutput(final RubyString name,
final IRubyObject args,
final SourceWithMetadata source) {
return (AbstractOutputDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
(RubyHash) args, source
);
}
@SuppressWarnings("unchecked")
@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractFilterDelegatorExt buildFilter(final RubyString name,
final IRubyObject args,
final SourceWithMetadata source) {
return (AbstractFilterDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
(RubyHash) args, source
);
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args,
Map<String, Object> pluginArgs) {
public IRubyObject buildCodec(final RubyString name,
final IRubyObject args,
final SourceWithMetadata source) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
name.asJavaString(), source, (Map<String, IRubyObject>) args, pluginArgs
RubyUtil.RUBY.getCurrentContext(),
PluginLookup.PluginType.CODEC,
name.asJavaString(),
(RubyHash) args,
source
);
}
@Override
public Codec buildDefaultCodec(String codecName) {
return (Codec) JavaUtil.unwrapJavaValue(plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
codecName, null, Collections.emptyMap(), Collections.emptyMap()
RubyUtil.RUBY.getCurrentContext(),
PluginLookup.PluginType.CODEC,
codecName,
RubyHash.newHash(RubyUtil.RUBY),
null
));
}
@SuppressWarnings("unchecked")
@JRubyMethod(required = 3, optional = 1)
public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) {
final SourceWithMetadata source = args.length > 3 ? (SourceWithMetadata) JavaUtil.unwrapIfJavaObject(args[3]) : null;
return plugin(
context,
PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)),
args[1].asJavaString(),
JavaUtil.unwrapIfJavaObject(args[2]),
args.length > 3 ? (Map<String, IRubyObject>) args[3] : new HashMap<>(),
null
(RubyHash) args[2],
source
);
}
@SuppressWarnings("unchecked")
private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name,
SourceWithMetadata source, final Map<String, IRubyObject> args,
Map<String, Object> pluginArgs) {
final String id = generateOrRetrievePluginId(context, type, name, source);
pluginsById.add(id);
private IRubyObject plugin(final ThreadContext context,
final PluginLookup.PluginType type,
final String name,
final RubyHash args,
final SourceWithMetadata source) {
final String id = generateOrRetrievePluginId(type, source, args);
if (id == null) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format(
"Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name
)
);
}
if (!pluginsById.add(id)) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format("Two plugins have the id '%s', please fix this conflict", id)
);
}
final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel());
final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name);
@ -199,19 +249,19 @@ public final class PluginFactoryExt extends RubyBasicObject
} else if (type == PluginLookup.PluginType.FILTER) {
return filterDelegator(
context, null,
filterClass, klass, rubyArgs, typeScopedMetric, executionCntx);
filterDelegatorClass, klass, rubyArgs, typeScopedMetric, executionCntx);
} else {
final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs);
final IRubyObject pluginInstance = ContextualizerExt.initializePlugin(context, executionCntx, klass, rubyArgs);
final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id));
scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name"));
pluginInstance.callMethod(context, "metric=", scopedMetric);
pluginInstance.callMethod(context, "execution_context=", executionCntx);
return pluginInstance;
}
} else {
if (pluginArgs == null) {
String err = String.format("Cannot start the Java plugin '%s' in the Ruby execution engine." +
" The Java execution engine is required to run Java plugins.", name);
if (engine != Engine.JAVA) {
String err = String.format("Cannot start the Java plugin '%s' in the %s execution engine." +
" The Java execution engine is required to run Java plugins.", name, engine);
throw new IllegalStateException(err);
}
@ -221,38 +271,97 @@ public final class PluginFactoryExt extends RubyBasicObject
}
Context contextWithMetrics = executionContextFactory.toContext(type, metrics.getRoot(context));
return pluginCreator.createDelegator(name, pluginArgs, id, typeScopedMetric, pluginClass, contextWithMetrics);
return pluginCreator.createDelegator(name, convertToJavaCoercible(args), id, typeScopedMetric, pluginClass, contextWithMetrics);
}
}
private String generateOrRetrievePluginId(ThreadContext context, PluginLookup.PluginType type, String name,
SourceWithMetadata source) {
final String id;
if (type == PluginLookup.PluginType.CODEC) {
id = UUID.randomUUID().toString();
private Map<String, Object> convertToJavaCoercible(Map<String, Object> input) {
final Map<String, Object> output = new HashMap<>(input);
// Intercept Codecs
for (final Map.Entry<String, Object> entry : input.entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
if (value instanceof IRubyObject) {
final Object unwrapped = JavaUtil.unwrapJavaValue((IRubyObject) value);
if (unwrapped instanceof Codec) {
output.put(key, unwrapped);
}
}
}
return output;
}
// TODO: caller seems to think that the args is `Map<String, IRubyObject>`, but
// at least any `id` present is actually a `String`.
private String generateOrRetrievePluginId(final PluginLookup.PluginType type,
final SourceWithMetadata source,
final Map<String, ?> args) {
final Optional<String> unprocessedId;
if (source == null) {
unprocessedId = extractId(() -> extractIdFromArgs(args),
this::generateUUID);
} else {
String unresolvedId = lir.getGraph().vertices()
unprocessedId = extractId(() -> extractIdFromLIR(source),
() -> extractIdFromArgs(args),
() -> generateUUIDForCodecs(type));
}
return unprocessedId
.map(configVariables::expand)
.filter(String.class::isInstance)
.map(String.class::cast)
.orElse(null);
}
private Optional<String> extractId(final IdExtractor... extractors) {
for (IdExtractor extractor : extractors) {
final Optional<String> extracted = extractor.extract();
if (extracted.isPresent()) {
return extracted;
}
}
return Optional.empty();
}
@FunctionalInterface
interface IdExtractor {
Optional<String> extract();
}
private Optional<String> extractIdFromArgs(final Map<String, ?> args) {
if (!args.containsKey("id")) {
return Optional.empty();
}
final Object explicitId = args.get("id");
if (explicitId instanceof String) {
return Optional.of((String) explicitId);
} else if (explicitId instanceof RubyString) {
return Optional.of(((RubyString) explicitId).asJavaString());
} else {
return Optional.empty();
}
}
private Optional<String> generateUUID() {
return Optional.of(UUID.randomUUID().toString());
}
private Optional<String> generateUUIDForCodecs(final PluginLookup.PluginType pluginType) {
if (pluginType == PluginLookup.PluginType.CODEC) {
return generateUUID();
}
return Optional.empty();
}
private Optional<String> extractIdFromLIR(final SourceWithMetadata source) {
return lir.getGraph().vertices()
.filter(v -> v.getSourceWithMetadata() != null
&& v.getSourceWithMetadata().equalsWithoutText(source))
.findFirst()
.map(Vertex::getId).orElse(null);
id = (String) configVariables.expand(unresolvedId);
}
if (id == null) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format(
"Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name
)
);
}
if (pluginsById.contains(id)) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format("Two plugins have the id '%s', please fix this conflict", id)
);
}
return id;
.map(Vertex::getId);
}
ExecutionContextFactoryExt getExecutionContextFactory() {

View file

@ -54,12 +54,8 @@ import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.ComputeStepSyntaxElement;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.PluginFactory;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.ext.JrubyEventExtLibrary;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Context;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -494,9 +490,9 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
/**
* Configurable Mock {@link PluginFactory}
* Configurable Mock {@link RubyIntegration.PluginFactory}
*/
static final class MockPluginFactory implements PluginFactory {
static final class MockPluginFactory implements RubyIntegration.PluginFactory {
private final Map<String, Supplier<IRubyObject>> inputs;
@ -514,20 +510,20 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
@Override
public IRubyObject buildInput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
public IRubyObject buildInput(final RubyString name, final IRubyObject args,
SourceWithMetadata source) {
return setupPlugin(name, inputs);
}
@Override
public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractOutputDelegatorExt buildOutput(final RubyString name, final IRubyObject args,
SourceWithMetadata source) {
return PipelineTestUtil.buildOutput(setupPlugin(name, outputs));
}
@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractFilterDelegatorExt buildFilter(final RubyString name, final IRubyObject args,
SourceWithMetadata source) {
final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name);
return new FilterDelegatorExt(
RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS)
@ -535,8 +531,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
@Override
public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args,
Map<String, Object> pluginArgs) {
public IRubyObject buildCodec(final RubyString name, final IRubyObject args, SourceWithMetadata source) {
throw new IllegalStateException("No codec setup expected in this test.");
}
@ -555,17 +550,6 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
return suppliers.get(name.asJavaString()).get();
}
@Override
public Input buildInput(final String name, final String id, final Configuration configuration, final Context context) {
return null;
}
@Override
public Filter buildFilter(final String name, final String id,
final Configuration configuration, final Context context) {
return null;
}
}
@Test
@ -698,9 +682,9 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
/**
* Fixed Mock {@link PluginFactory}
* Fixed Mock {@link RubyIntegration.PluginFactory}
* */
static final class FixedPluginFactory implements PluginFactory {
static final class FixedPluginFactory implements RubyIntegration.PluginFactory {
private Supplier<IRubyObject> input;
private Supplier<IRubyObject> filter;
@ -714,27 +698,17 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
@Override
public Input buildInput(String name, String id, Configuration configuration, Context context) {
return null;
}
@Override
public Filter buildFilter(String name, String id, Configuration configuration, Context context) {
return null;
}
@Override
public IRubyObject buildInput(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
public IRubyObject buildInput(RubyString name, IRubyObject args, SourceWithMetadata source) {
return this.input.get();
}
@Override
public AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, SourceWithMetadata source) {
return PipelineTestUtil.buildOutput(this.output.get());
}
@Override
public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, SourceWithMetadata source) {
final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name);
return new FilterDelegatorExt(
RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS)
@ -742,7 +716,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
@Override
public IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, Map<String, Object> pluginArgs) {
public IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source) {
return null;
}

View file

@ -1,35 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.config.ir.compiler;
import org.junit.Test;
/**
* Tests for {@link PluginFactory.Default}.
*/
public final class PluginFactoryTest {
@Test
public void testBuildJavaFilter() throws Exception {
}
}

View file

@ -30,31 +30,29 @@ import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.plugins.codecs.Line;
import java.util.Collections;
import java.util.Map;
public class TestPluginFactory implements RubyIntegration.PluginFactory {
@Override
public IRubyObject buildInput(RubyString name, SourceWithMetadata source,
IRubyObject args, Map<String, Object> pluginArgs) {
public IRubyObject buildInput(RubyString name, IRubyObject args,
SourceWithMetadata source) {
return null;
}
@Override
public AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source,
IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args,
SourceWithMetadata source) {
return null;
}
@Override
public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source,
IRubyObject args, Map<String, Object> pluginArgs) {
public AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args,
SourceWithMetadata source) {
return null;
}
@Override
public IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args,
Map<String, Object> pluginArgs) {
public IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source) {
return null;
}

View file

@ -30,6 +30,7 @@ import org.logstash.config.ir.ConfigCompiler;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.RubyEnvTestCase;
import org.logstash.execution.Engine;
import org.logstash.instrument.metrics.NamespacedMetricExt;
import org.logstash.plugins.MetricTestCase;
import org.logstash.plugins.PluginLookup;
@ -93,13 +94,12 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase {
envVars.put("CUSTOM", "test");
PluginFactoryExt sut = new PluginFactoryExt(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS,
mockPluginResolver);
sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get);
sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get, Engine.JAVA);
RubyString pluginName = RubyUtil.RUBY.newString("mockinput");
// Exercise
IRubyObject pluginInstance = sut.buildInput(pluginName, sourceWithMetadata, RubyHash.newHash(RubyUtil.RUBY),
Collections.emptyMap());
IRubyObject pluginInstance = sut.buildInput(pluginName, RubyHash.newHash(RubyUtil.RUBY), sourceWithMetadata);
//Verify
IRubyObject id = pluginInstance.callMethod(RUBY.getCurrentContext(), "id");