mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
487afbc2d1
commit
86e3cd26d2
5 changed files with 128 additions and 125 deletions
|
@ -32,7 +32,6 @@ java_import org.logstash.config.ir.CompiledPipeline
|
||||||
java_import org.logstash.config.ir.ConfigCompiler
|
java_import org.logstash.config.ir.ConfigCompiler
|
||||||
|
|
||||||
module LogStash; class JavaBasePipeline
|
module LogStash; class JavaBasePipeline
|
||||||
include org.logstash.config.ir.compiler.RubyIntegration::Pipeline
|
|
||||||
include LogStash::Util::Loggable
|
include LogStash::Util::Loggable
|
||||||
|
|
||||||
attr_reader :settings, :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id, :lir, :execution_context, :ephemeral_id
|
attr_reader :settings, :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id, :lir, :execution_context, :ephemeral_id
|
||||||
|
@ -52,19 +51,16 @@ module LogStash; class JavaBasePipeline
|
||||||
@config_str, @settings.get_value("config.support_escapes")
|
@config_str, @settings.get_value("config.support_escapes")
|
||||||
)
|
)
|
||||||
|
|
||||||
# Every time #plugin is invoked this is incremented to give each plugin
|
|
||||||
# a unique id when auto-generating plugin ids
|
|
||||||
@plugin_counter ||= 0
|
|
||||||
|
|
||||||
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
|
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
|
||||||
|
|
||||||
# A list of plugins indexed by id
|
|
||||||
@plugins_by_id = {}
|
|
||||||
@agent = agent
|
@agent = agent
|
||||||
|
|
||||||
@dlq_writer = dlq_writer
|
@dlq_writer = dlq_writer
|
||||||
|
@plugin_factory = LogStash::Plugins::PluginFactory.new(
|
||||||
@lir_execution = CompiledPipeline.new(@lir, self)
|
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
|
||||||
|
@lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new),
|
||||||
|
@logger, LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer),
|
||||||
|
JavaFilterDelegator
|
||||||
|
)
|
||||||
|
@lir_execution = CompiledPipeline.new(@lir, @plugin_factory)
|
||||||
if settings.get_value("config.debug") && @logger.debug?
|
if settings.get_value("config.debug") && @logger.debug?
|
||||||
@logger.debug("Compiled pipeline code", default_logging_keys(:code => @lir.get_graph.to_string))
|
@logger.debug("Compiled pipeline code", default_logging_keys(:code => @lir.get_graph.to_string))
|
||||||
end
|
end
|
||||||
|
@ -105,54 +101,7 @@ module LogStash; class JavaBasePipeline
|
||||||
end
|
end
|
||||||
|
|
||||||
def plugin(plugin_type, name, line, column, *args)
|
def plugin(plugin_type, name, line, column, *args)
|
||||||
@plugin_counter += 1
|
@plugin_factory.plugin(plugin_type, name, line, column, *args)
|
||||||
|
|
||||||
# Collapse the array of arguments into a single merged hash
|
|
||||||
args = args.reduce({}, &:merge)
|
|
||||||
|
|
||||||
if plugin_type == "codec"
|
|
||||||
id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here
|
|
||||||
else
|
|
||||||
# Pull the ID from LIR to keep IDs consistent between the two representations
|
|
||||||
id = lir.graph.vertices.filter do |v|
|
|
||||||
v.source_with_metadata &&
|
|
||||||
v.source_with_metadata.line == line &&
|
|
||||||
v.source_with_metadata.column == column
|
|
||||||
end.findFirst.get.id
|
|
||||||
end
|
|
||||||
|
|
||||||
args["id"] = id # some code pulls the id out of the args
|
|
||||||
|
|
||||||
if !id
|
|
||||||
raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}"
|
|
||||||
end
|
|
||||||
|
|
||||||
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
|
|
||||||
@plugins_by_id[id] = true
|
|
||||||
|
|
||||||
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
|
|
||||||
metric = @metric || Instrument::NullMetric.new
|
|
||||||
|
|
||||||
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
|
|
||||||
# Scope plugins of type 'input' to 'inputs'
|
|
||||||
type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)
|
|
||||||
|
|
||||||
klass = Plugin.lookup(plugin_type, name)
|
|
||||||
|
|
||||||
execution_context = ExecutionContext.new(self, @agent, id, klass.config_name, @dlq_writer)
|
|
||||||
|
|
||||||
if plugin_type == "output"
|
|
||||||
OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args)
|
|
||||||
elsif plugin_type == "filter"
|
|
||||||
JavaFilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args)
|
|
||||||
else # input
|
|
||||||
input_plugin = klass.new(args)
|
|
||||||
scoped_metric = type_scoped_metric.namespace(id.to_sym)
|
|
||||||
scoped_metric.gauge(:name, input_plugin.config_name)
|
|
||||||
input_plugin.metric = scoped_metric
|
|
||||||
input_plugin.execution_context = execution_context
|
|
||||||
input_plugin
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def reloadable?
|
def reloadable?
|
||||||
|
|
|
@ -22,6 +22,7 @@ require "logstash/util/dead_letter_queue_manager"
|
||||||
require "logstash/output_delegator"
|
require "logstash/output_delegator"
|
||||||
require "logstash/filter_delegator"
|
require "logstash/filter_delegator"
|
||||||
require "logstash/queue_factory"
|
require "logstash/queue_factory"
|
||||||
|
require "logstash/plugins/plugin_factory"
|
||||||
require "logstash/compiler"
|
require "logstash/compiler"
|
||||||
require "logstash/execution_context"
|
require "logstash/execution_context"
|
||||||
require "securerandom"
|
require "securerandom"
|
||||||
|
@ -51,14 +52,8 @@ module LogStash; class BasePipeline
|
||||||
@config_str, @settings.get_value("config.support_escapes")
|
@config_str, @settings.get_value("config.support_escapes")
|
||||||
)
|
)
|
||||||
|
|
||||||
# Every time #plugin is invoked this is incremented to give each plugin
|
|
||||||
# a unique id when auto-generating plugin ids
|
|
||||||
@plugin_counter ||= 0
|
|
||||||
|
|
||||||
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
|
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
|
||||||
|
|
||||||
# A list of plugins indexed by id
|
|
||||||
@plugins_by_id = {}
|
|
||||||
@inputs = nil
|
@inputs = nil
|
||||||
@filters = nil
|
@filters = nil
|
||||||
@outputs = nil
|
@outputs = nil
|
||||||
|
@ -66,6 +61,12 @@ module LogStash; class BasePipeline
|
||||||
|
|
||||||
@dlq_writer = dlq_writer
|
@dlq_writer = dlq_writer
|
||||||
|
|
||||||
|
@plugin_factory = LogStash::Plugins::PluginFactory.new(
|
||||||
|
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
|
||||||
|
@lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new),
|
||||||
|
@logger, LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer),
|
||||||
|
FilterDelegator
|
||||||
|
)
|
||||||
grammar = LogStashConfigParser.new
|
grammar = LogStashConfigParser.new
|
||||||
parsed_config = grammar.parse(config_str)
|
parsed_config = grammar.parse(config_str)
|
||||||
raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil?
|
raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil?
|
||||||
|
@ -110,54 +111,7 @@ module LogStash; class BasePipeline
|
||||||
end
|
end
|
||||||
|
|
||||||
def plugin(plugin_type, name, line, column, *args)
|
def plugin(plugin_type, name, line, column, *args)
|
||||||
@plugin_counter += 1
|
@plugin_factory.plugin(plugin_type, name, line, column, *args)
|
||||||
|
|
||||||
# Collapse the array of arguments into a single merged hash
|
|
||||||
args = args.reduce({}, &:merge)
|
|
||||||
|
|
||||||
if plugin_type == "codec"
|
|
||||||
id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here
|
|
||||||
else
|
|
||||||
# Pull the ID from LIR to keep IDs consistent between the two representations
|
|
||||||
id = lir.graph.vertices.filter do |v|
|
|
||||||
v.source_with_metadata &&
|
|
||||||
v.source_with_metadata.line == line &&
|
|
||||||
v.source_with_metadata.column == column
|
|
||||||
end.findFirst.get.id
|
|
||||||
end
|
|
||||||
|
|
||||||
args["id"] = id # some code pulls the id out of the args
|
|
||||||
|
|
||||||
if !id
|
|
||||||
raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}"
|
|
||||||
end
|
|
||||||
|
|
||||||
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
|
|
||||||
@plugins_by_id[id] = true
|
|
||||||
|
|
||||||
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
|
|
||||||
metric = @metric || Instrument::NullMetric.new
|
|
||||||
|
|
||||||
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
|
|
||||||
# Scope plugins of type 'input' to 'inputs'
|
|
||||||
type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)
|
|
||||||
|
|
||||||
klass = Plugin.lookup(plugin_type, name)
|
|
||||||
|
|
||||||
execution_context = ExecutionContext.new(self, @agent, id, klass.config_name, @dlq_writer)
|
|
||||||
|
|
||||||
if plugin_type == "output"
|
|
||||||
OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args)
|
|
||||||
elsif plugin_type == "filter"
|
|
||||||
FilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args)
|
|
||||||
else # input
|
|
||||||
input_plugin = klass.new(args)
|
|
||||||
scoped_metric = type_scoped_metric.namespace(id.to_sym)
|
|
||||||
scoped_metric.gauge(:name, input_plugin.config_name)
|
|
||||||
input_plugin.metric = scoped_metric
|
|
||||||
input_plugin.execution_context = execution_context
|
|
||||||
input_plugin
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def reloadable?
|
def reloadable?
|
||||||
|
|
100
logstash-core/lib/logstash/plugins/plugin_factory.rb
Normal file
100
logstash-core/lib/logstash/plugins/plugin_factory.rb
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
|
||||||
|
module LogStash
|
||||||
|
module Plugins
|
||||||
|
|
||||||
|
class ExecutionContextFactory
|
||||||
|
|
||||||
|
def initialize(agent, pipeline, dlq_writer)
|
||||||
|
@agent = agent
|
||||||
|
@pipeline = pipeline
|
||||||
|
@dlq_writer = dlq_writer
|
||||||
|
end
|
||||||
|
|
||||||
|
def create(id, klass_cfg_name)
|
||||||
|
ExecutionContext.new(@pipeline, @agent, id, klass_cfg_name, @dlq_writer)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class PluginMetricFactory
|
||||||
|
|
||||||
|
def initialize(pipeline_id, metric)
|
||||||
|
@pipeline_id = pipeline_id.to_s.to_sym
|
||||||
|
@metric = metric
|
||||||
|
end
|
||||||
|
|
||||||
|
def create(plugin_type)
|
||||||
|
@metric.namespace([:stats, :pipelines, @pipeline_id, :plugins])
|
||||||
|
.namespace("#{plugin_type}s".to_sym)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class PluginFactory
|
||||||
|
include org.logstash.config.ir.compiler.RubyIntegration::PluginFactory
|
||||||
|
|
||||||
|
def initialize(lir, metric_factory, logger, exec_factory, filter_class)
|
||||||
|
@lir = lir
|
||||||
|
@plugins_by_id = {}
|
||||||
|
@metric_factory = metric_factory
|
||||||
|
@logger = logger
|
||||||
|
@exec_factory = exec_factory
|
||||||
|
@filter_class = filter_class
|
||||||
|
end
|
||||||
|
|
||||||
|
def buildOutput(name, line, column, *args)
|
||||||
|
plugin("output", name, line, column, *args)
|
||||||
|
end
|
||||||
|
|
||||||
|
def buildFilter(name, line, column, *args)
|
||||||
|
plugin("filter", name, line, column, *args)
|
||||||
|
end
|
||||||
|
|
||||||
|
def buildInput(name, line, column, *args)
|
||||||
|
plugin("input", name, line, column, *args)
|
||||||
|
end
|
||||||
|
|
||||||
|
def buildCodec(name, *args)
|
||||||
|
plugin("codec", name, 0, 0, *args)
|
||||||
|
end
|
||||||
|
|
||||||
|
def plugin(plugin_type, name, line, column, *args)
|
||||||
|
# Collapse the array of arguments into a single merged hash
|
||||||
|
args = args.reduce({}, &:merge)
|
||||||
|
|
||||||
|
if plugin_type == "codec"
|
||||||
|
id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here
|
||||||
|
else
|
||||||
|
# Pull the ID from LIR to keep IDs consistent between the two representations
|
||||||
|
id = @lir.graph.vertices.filter do |v|
|
||||||
|
v.source_with_metadata &&
|
||||||
|
v.source_with_metadata.line == line &&
|
||||||
|
v.source_with_metadata.column == column
|
||||||
|
end.findFirst.get.id
|
||||||
|
end
|
||||||
|
args["id"] = id # some code pulls the id out of the args
|
||||||
|
|
||||||
|
raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}" unless id
|
||||||
|
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
|
||||||
|
|
||||||
|
@plugins_by_id[id] = true
|
||||||
|
# Scope plugins of type 'input' to 'inputs'
|
||||||
|
type_scoped_metric = @metric_factory.create(plugin_type)
|
||||||
|
klass = Plugin.lookup(plugin_type, name)
|
||||||
|
execution_context = @exec_factory.create(id, klass.config_name)
|
||||||
|
|
||||||
|
if plugin_type == "output"
|
||||||
|
OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args)
|
||||||
|
elsif plugin_type == "filter"
|
||||||
|
@filter_class.new(@logger, klass, type_scoped_metric, execution_context, args)
|
||||||
|
else # input or codec plugin
|
||||||
|
plugin_instance = klass.new(args)
|
||||||
|
scoped_metric = type_scoped_metric.namespace(id.to_sym)
|
||||||
|
scoped_metric.gauge(:name, plugin_instance.config_name)
|
||||||
|
plugin_instance.metric = scoped_metric
|
||||||
|
plugin_instance.execution_context = execution_context
|
||||||
|
plugin_instance
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -63,13 +63,14 @@ public final class CompiledPipeline {
|
||||||
private final PipelineIR pipelineIR;
|
private final PipelineIR pipelineIR;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ruby pipeline object.
|
* Ruby plugin factory instance.
|
||||||
*/
|
*/
|
||||||
private final RubyIntegration.Pipeline pipeline;
|
private final RubyIntegration.PluginFactory pluginFactory;
|
||||||
|
|
||||||
public CompiledPipeline(final PipelineIR pipelineIR, final RubyIntegration.Pipeline pipeline) {
|
public CompiledPipeline(final PipelineIR pipelineIR,
|
||||||
|
final RubyIntegration.PluginFactory pluginFactory) {
|
||||||
this.pipelineIR = pipelineIR;
|
this.pipelineIR = pipelineIR;
|
||||||
this.pipeline = pipeline;
|
this.pluginFactory = pluginFactory;
|
||||||
inputs = setupInputs();
|
inputs = setupInputs();
|
||||||
filters = setupFilters();
|
filters = setupFilters();
|
||||||
outputs = setupOutputs();
|
outputs = setupOutputs();
|
||||||
|
@ -126,7 +127,7 @@ public final class CompiledPipeline {
|
||||||
outs.forEach(v -> {
|
outs.forEach(v -> {
|
||||||
final PluginDefinition def = v.getPluginDefinition();
|
final PluginDefinition def = v.getPluginDefinition();
|
||||||
final SourceWithMetadata source = v.getSourceWithMetadata();
|
final SourceWithMetadata source = v.getSourceWithMetadata();
|
||||||
res.put(v.getId(), pipeline.buildOutput(
|
res.put(v.getId(), pluginFactory.buildOutput(
|
||||||
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
||||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
|
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
|
||||||
));
|
));
|
||||||
|
@ -156,7 +157,7 @@ public final class CompiledPipeline {
|
||||||
vertices.forEach(v -> {
|
vertices.forEach(v -> {
|
||||||
final PluginDefinition def = v.getPluginDefinition();
|
final PluginDefinition def = v.getPluginDefinition();
|
||||||
final SourceWithMetadata source = v.getSourceWithMetadata();
|
final SourceWithMetadata source = v.getSourceWithMetadata();
|
||||||
nodes.add(pipeline.buildInput(
|
nodes.add(pluginFactory.buildInput(
|
||||||
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
||||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
|
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
|
||||||
));
|
));
|
||||||
|
@ -168,7 +169,7 @@ public final class CompiledPipeline {
|
||||||
* Converts plugin arguments from the format provided by {@link PipelineIR} into coercible
|
* Converts plugin arguments from the format provided by {@link PipelineIR} into coercible
|
||||||
* Ruby types.
|
* Ruby types.
|
||||||
* @param def PluginDefinition as provided by {@link PipelineIR}
|
* @param def PluginDefinition as provided by {@link PipelineIR}
|
||||||
* @return RubyHash of plugin arguments as understood by {@link RubyIntegration.Pipeline}
|
* @return RubyHash of plugin arguments as understood by {@link RubyIntegration.PluginFactory}
|
||||||
* methods
|
* methods
|
||||||
*/
|
*/
|
||||||
private RubyHash convertArgs(final PluginDefinition def) {
|
private RubyHash convertArgs(final PluginDefinition def) {
|
||||||
|
@ -179,7 +180,7 @@ public final class CompiledPipeline {
|
||||||
final Object toput;
|
final Object toput;
|
||||||
if (value instanceof PluginStatement) {
|
if (value instanceof PluginStatement) {
|
||||||
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
|
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
|
||||||
toput = pipeline.buildCodec(
|
toput = pluginFactory.buildCodec(
|
||||||
RubyUtil.RUBY.newString(codec.getName()),
|
RubyUtil.RUBY.newString(codec.getName()),
|
||||||
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments())
|
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments())
|
||||||
);
|
);
|
||||||
|
@ -199,7 +200,7 @@ public final class CompiledPipeline {
|
||||||
private RubyIntegration.Filter buildFilter(final PluginVertex vertex) {
|
private RubyIntegration.Filter buildFilter(final PluginVertex vertex) {
|
||||||
final PluginDefinition def = vertex.getPluginDefinition();
|
final PluginDefinition def = vertex.getPluginDefinition();
|
||||||
final SourceWithMetadata source = vertex.getSourceWithMetadata();
|
final SourceWithMetadata source = vertex.getSourceWithMetadata();
|
||||||
return pipeline.buildFilter(
|
return pluginFactory.buildFilter(
|
||||||
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
||||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
|
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def)
|
||||||
);
|
);
|
||||||
|
|
|
@ -54,10 +54,9 @@ public final class RubyIntegration {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Main Ruby Pipeline Class. Currently, this interface is implemented only by the Ruby class
|
* Plugin Factory that instantiates Ruby plugins and is implemented in Ruby.
|
||||||
* {@code BasePipeline}.
|
|
||||||
*/
|
*/
|
||||||
public interface Pipeline {
|
public interface PluginFactory {
|
||||||
|
|
||||||
IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column,
|
IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column,
|
||||||
IRubyObject args);
|
IRubyObject args);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue