diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 8c722624b..5960d5cf2 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -32,7 +32,6 @@ java_import org.logstash.config.ir.CompiledPipeline java_import org.logstash.config.ir.ConfigCompiler module LogStash; class JavaBasePipeline - include org.logstash.config.ir.compiler.RubyIntegration::Pipeline include LogStash::Util::Loggable attr_reader :settings, :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id, :lir, :execution_context, :ephemeral_id @@ -52,19 +51,16 @@ module LogStash; class JavaBasePipeline @config_str, @settings.get_value("config.support_escapes") ) - # Every time #plugin is invoked this is incremented to give each plugin - # a unique id when auto-generating plugin ids - @plugin_counter ||= 0 - @pipeline_id = @settings.get_value("pipeline.id") || self.object_id - - # A list of plugins indexed by id - @plugins_by_id = {} @agent = agent - @dlq_writer = dlq_writer - - @lir_execution = CompiledPipeline.new(@lir, self) + @plugin_factory = LogStash::Plugins::PluginFactory.new( + # use NullMetric if called in the BasePipeline context otherwise use the @metric value + @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new), + @logger, LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer), + JavaFilterDelegator + ) + @lir_execution = CompiledPipeline.new(@lir, @plugin_factory) if settings.get_value("config.debug") && @logger.debug? @logger.debug("Compiled pipeline code", default_logging_keys(:code => @lir.get_graph.to_string)) end @@ -105,54 +101,7 @@ module LogStash; class JavaBasePipeline end def plugin(plugin_type, name, line, column, *args) - @plugin_counter += 1 - - # Collapse the array of arguments into a single merged hash - args = args.reduce({}, &:merge) - - if plugin_type == "codec" - id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here - else - # Pull the ID from LIR to keep IDs consistent between the two representations - id = lir.graph.vertices.filter do |v| - v.source_with_metadata && - v.source_with_metadata.line == line && - v.source_with_metadata.column == column - end.findFirst.get.id - end - - args["id"] = id # some code pulls the id out of the args - - if !id - raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}" - end - - raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id] - @plugins_by_id[id] = true - - # use NullMetric if called in the BasePipeline context otherwise use the @metric value - metric = @metric || Instrument::NullMetric.new - - pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins]) - # Scope plugins of type 'input' to 'inputs' - type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym) - - klass = Plugin.lookup(plugin_type, name) - - execution_context = ExecutionContext.new(self, @agent, id, klass.config_name, @dlq_writer) - - if plugin_type == "output" - OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args) - elsif plugin_type == "filter" - JavaFilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args) - else # input - input_plugin = klass.new(args) - scoped_metric = type_scoped_metric.namespace(id.to_sym) - scoped_metric.gauge(:name, input_plugin.config_name) - input_plugin.metric = scoped_metric - input_plugin.execution_context = execution_context - input_plugin - end + @plugin_factory.plugin(plugin_type, name, line, column, *args) end def reloadable? diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 9d61065e2..5d64b2030 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -22,6 +22,7 @@ require "logstash/util/dead_letter_queue_manager" require "logstash/output_delegator" require "logstash/filter_delegator" require "logstash/queue_factory" +require "logstash/plugins/plugin_factory" require "logstash/compiler" require "logstash/execution_context" require "securerandom" @@ -51,14 +52,8 @@ module LogStash; class BasePipeline @config_str, @settings.get_value("config.support_escapes") ) - # Every time #plugin is invoked this is incremented to give each plugin - # a unique id when auto-generating plugin ids - @plugin_counter ||= 0 - @pipeline_id = @settings.get_value("pipeline.id") || self.object_id - # A list of plugins indexed by id - @plugins_by_id = {} @inputs = nil @filters = nil @outputs = nil @@ -66,6 +61,12 @@ module LogStash; class BasePipeline @dlq_writer = dlq_writer + @plugin_factory = LogStash::Plugins::PluginFactory.new( + # use NullMetric if called in the BasePipeline context otherwise use the @metric value + @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new), + @logger, LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer), + FilterDelegator + ) grammar = LogStashConfigParser.new parsed_config = grammar.parse(config_str) raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil? @@ -110,54 +111,7 @@ module LogStash; class BasePipeline end def plugin(plugin_type, name, line, column, *args) - @plugin_counter += 1 - - # Collapse the array of arguments into a single merged hash - args = args.reduce({}, &:merge) - - if plugin_type == "codec" - id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here - else - # Pull the ID from LIR to keep IDs consistent between the two representations - id = lir.graph.vertices.filter do |v| - v.source_with_metadata && - v.source_with_metadata.line == line && - v.source_with_metadata.column == column - end.findFirst.get.id - end - - args["id"] = id # some code pulls the id out of the args - - if !id - raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}" - end - - raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id] - @plugins_by_id[id] = true - - # use NullMetric if called in the BasePipeline context otherwise use the @metric value - metric = @metric || Instrument::NullMetric.new - - pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins]) - # Scope plugins of type 'input' to 'inputs' - type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym) - - klass = Plugin.lookup(plugin_type, name) - - execution_context = ExecutionContext.new(self, @agent, id, klass.config_name, @dlq_writer) - - if plugin_type == "output" - OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args) - elsif plugin_type == "filter" - FilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args) - else # input - input_plugin = klass.new(args) - scoped_metric = type_scoped_metric.namespace(id.to_sym) - scoped_metric.gauge(:name, input_plugin.config_name) - input_plugin.metric = scoped_metric - input_plugin.execution_context = execution_context - input_plugin - end + @plugin_factory.plugin(plugin_type, name, line, column, *args) end def reloadable? diff --git a/logstash-core/lib/logstash/plugins/plugin_factory.rb b/logstash-core/lib/logstash/plugins/plugin_factory.rb new file mode 100644 index 000000000..a9432b9d1 --- /dev/null +++ b/logstash-core/lib/logstash/plugins/plugin_factory.rb @@ -0,0 +1,100 @@ +# encoding: utf-8 + +module LogStash + module Plugins + + class ExecutionContextFactory + + def initialize(agent, pipeline, dlq_writer) + @agent = agent + @pipeline = pipeline + @dlq_writer = dlq_writer + end + + def create(id, klass_cfg_name) + ExecutionContext.new(@pipeline, @agent, id, klass_cfg_name, @dlq_writer) + end + end + + class PluginMetricFactory + + def initialize(pipeline_id, metric) + @pipeline_id = pipeline_id.to_s.to_sym + @metric = metric + end + + def create(plugin_type) + @metric.namespace([:stats, :pipelines, @pipeline_id, :plugins]) + .namespace("#{plugin_type}s".to_sym) + end + end + + class PluginFactory + include org.logstash.config.ir.compiler.RubyIntegration::PluginFactory + + def initialize(lir, metric_factory, logger, exec_factory, filter_class) + @lir = lir + @plugins_by_id = {} + @metric_factory = metric_factory + @logger = logger + @exec_factory = exec_factory + @filter_class = filter_class + end + + def buildOutput(name, line, column, *args) + plugin("output", name, line, column, *args) + end + + def buildFilter(name, line, column, *args) + plugin("filter", name, line, column, *args) + end + + def buildInput(name, line, column, *args) + plugin("input", name, line, column, *args) + end + + def buildCodec(name, *args) + plugin("codec", name, 0, 0, *args) + end + + def plugin(plugin_type, name, line, column, *args) + # Collapse the array of arguments into a single merged hash + args = args.reduce({}, &:merge) + + if plugin_type == "codec" + id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here + else + # Pull the ID from LIR to keep IDs consistent between the two representations + id = @lir.graph.vertices.filter do |v| + v.source_with_metadata && + v.source_with_metadata.line == line && + v.source_with_metadata.column == column + end.findFirst.get.id + end + args["id"] = id # some code pulls the id out of the args + + raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}" unless id + raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id] + + @plugins_by_id[id] = true + # Scope plugins of type 'input' to 'inputs' + type_scoped_metric = @metric_factory.create(plugin_type) + klass = Plugin.lookup(plugin_type, name) + execution_context = @exec_factory.create(id, klass.config_name) + + if plugin_type == "output" + OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args) + elsif plugin_type == "filter" + @filter_class.new(@logger, klass, type_scoped_metric, execution_context, args) + else # input or codec plugin + plugin_instance = klass.new(args) + scoped_metric = type_scoped_metric.namespace(id.to_sym) + scoped_metric.gauge(:name, plugin_instance.config_name) + plugin_instance.metric = scoped_metric + plugin_instance.execution_context = execution_context + plugin_instance + end + end + end + end +end diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 141ccc095..74cf1003c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -63,13 +63,14 @@ public final class CompiledPipeline { private final PipelineIR pipelineIR; /** - * Ruby pipeline object. + * Ruby plugin factory instance. */ - private final RubyIntegration.Pipeline pipeline; + private final RubyIntegration.PluginFactory pluginFactory; - public CompiledPipeline(final PipelineIR pipelineIR, final RubyIntegration.Pipeline pipeline) { + public CompiledPipeline(final PipelineIR pipelineIR, + final RubyIntegration.PluginFactory pluginFactory) { this.pipelineIR = pipelineIR; - this.pipeline = pipeline; + this.pluginFactory = pluginFactory; inputs = setupInputs(); filters = setupFilters(); outputs = setupOutputs(); @@ -126,7 +127,7 @@ public final class CompiledPipeline { outs.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); - res.put(v.getId(), pipeline.buildOutput( + res.put(v.getId(), pluginFactory.buildOutput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def) )); @@ -156,7 +157,7 @@ public final class CompiledPipeline { vertices.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); - nodes.add(pipeline.buildInput( + nodes.add(pluginFactory.buildInput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def) )); @@ -168,7 +169,7 @@ public final class CompiledPipeline { * Converts plugin arguments from the format provided by {@link PipelineIR} into coercible * Ruby types. * @param def PluginDefinition as provided by {@link PipelineIR} - * @return RubyHash of plugin arguments as understood by {@link RubyIntegration.Pipeline} + * @return RubyHash of plugin arguments as understood by {@link RubyIntegration.PluginFactory} * methods */ private RubyHash convertArgs(final PluginDefinition def) { @@ -179,7 +180,7 @@ public final class CompiledPipeline { final Object toput; if (value instanceof PluginStatement) { final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); - toput = pipeline.buildCodec( + toput = pluginFactory.buildCodec( RubyUtil.RUBY.newString(codec.getName()), Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()) ); @@ -199,7 +200,7 @@ public final class CompiledPipeline { private RubyIntegration.Filter buildFilter(final PluginVertex vertex) { final PluginDefinition def = vertex.getPluginDefinition(); final SourceWithMetadata source = vertex.getSourceWithMetadata(); - return pipeline.buildFilter( + return pluginFactory.buildFilter( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def) ); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java index b9af282f3..519a88f9a 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java @@ -54,10 +54,9 @@ public final class RubyIntegration { } /** - * The Main Ruby Pipeline Class. Currently, this interface is implemented only by the Ruby class - * {@code BasePipeline}. + * Plugin Factory that instantiates Ruby plugins and is implemented in Ruby. */ - public interface Pipeline { + public interface PluginFactory { IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args);