Use LIR to generate vertex plugin IDs

In this patch we unify the IDs reported by LIR with those generated using config_ast.rb

This is a temporary fix until LIR execution is built. It relies on the fact that currently both LIR and config_ast.rb only operate on a single concatenated string of configurations. In the future LIR will compile different files separately, then merge their IRs. For now, however, this solution will hold.

This change also exposes the :id attribute of FilterDelegator to bring it to parity with OutputDelegator and InputDelegator which is required for testing purposes.

Fixes #7930
This commit is contained in:
Andrew Cholakian 2017-08-07 15:28:35 -05:00
parent f17b2eb6c2
commit 2844a179cf
6 changed files with 101 additions and 61 deletions

View file

@ -2,6 +2,7 @@
require 'logstash/errors'
require "treetop"
require "logstash/compiler/treetop_monkeypatches"
require "logstash/compiler/lscl/helpers"
require "logstash/config/string_escape"
java_import org.logstash.config.ir.DSL
@ -10,59 +11,7 @@ java_import org.logstash.common.SourceWithMetadata
module LogStashCompilerLSCLGrammar; module LogStash; module Compiler; module LSCL; module AST
PROCESS_ESCAPE_SEQUENCES = :process_escape_sequences
# Helpers for parsing LSCL files
module Helpers
def source_meta
line, column = line_and_column
org.logstash.common.SourceWithMetadata.new(base_protocol, base_id, line, column, self.text_value)
end
def base_source_with_metadata=(value)
set_meta(:base_source_with_metadata, value)
end
def base_source_with_metadata
get_meta(:base_source_with_metadata)
end
def base_protocol
self.base_source_with_metadata.protocol
end
def base_id
self.base_source_with_metadata.id
end
def compose(*statements)
compose_for(section_type.to_sym).call(source_meta, *statements)
end
def compose_for(section_sym)
if section_sym == :filter
jdsl.method(:iComposeSequence)
else
jdsl.method(:iComposeParallel)
end
end
def line_and_column
start = self.interval.first
[self.input.line_of(start), self.input.column_of(start)]
end
def jdsl
org.logstash.config.ir.DSL
end
def self.jdsl
org.logstash.config.ir.DSL
end
AND_METHOD = jdsl.method(:eAnd)
OR_METHOD = jdsl.method(:eOr)
end
class Node < Treetop::Runtime::SyntaxNode
class Node < Treetop::Runtime::SyntaxNode
include Helpers
def section_type

View file

@ -0,0 +1,55 @@
# encoding: utf-8
module LogStashCompilerLSCLGrammar; module LogStash; module Compiler; module LSCL; module AST
# Helpers for parsing LSCL files
module Helpers
def source_meta
line, column = line_and_column
org.logstash.common.SourceWithMetadata.new(base_protocol, base_id, line, column, self.text_value)
end
def base_source_with_metadata=(value)
set_meta(:base_source_with_metadata, value)
end
def base_source_with_metadata
get_meta(:base_source_with_metadata)
end
def base_protocol
self.base_source_with_metadata ? self.base_source_with_metadata.protocol : 'config_ast'
end
def base_id
self.base_source_with_metadata ? self.base_source_with_metadata.id : 'config_ast'
end
def compose(*statements)
compose_for(section_type.to_sym).call(source_meta, *statements)
end
def compose_for(section_sym)
if section_sym == :filter
jdsl.method(:iComposeSequence)
else
jdsl.method(:iComposeParallel)
end
end
def line_and_column
start = self.interval.first
[self.input.line_of(start), self.input.column_of(start)]
end
def jdsl
org.logstash.config.ir.DSL
end
def self.jdsl
org.logstash.config.ir.DSL
end
AND_METHOD = jdsl.method(:eAnd)
OR_METHOD = jdsl.method(:eOr)
end
end; end; end; end; end

View file

@ -1,5 +1,6 @@
# encoding: utf-8
require 'logstash/errors'
require "logstash/compiler/lscl/helpers"
require "treetop"
require "logstash/compiler/treetop_monkeypatches"
@ -32,6 +33,8 @@ module LogStash; module Config; module AST
end
class Node < Treetop::Runtime::SyntaxNode
include LogStashCompilerLSCLGrammar::LogStash::Compiler::LSCL::AST::Helpers
def text_value_for_comments
text_value.gsub(/[\r\n]/, " ")
end
@ -189,12 +192,12 @@ module LogStash; module Config; module AST
# If any parent is a Plugin, this must be a codec.
if attributes.elements.nil?
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect})" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column})" << (plugin_type == "codec" ? "" : "\n")
else
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column}, #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
end
end
@ -211,7 +214,7 @@ module LogStash; module Config; module AST
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{source_meta.line}, #{source_meta.column}, #{attributes_code})"
end
end

View file

@ -14,6 +14,8 @@ module LogStash
]
def_delegators :@filter, *DELEGATED_METHODS
attr_reader :id
def initialize(logger, klass, metric, execution_context, plugin_args)
@logger = logger
@klass = klass

View file

@ -107,16 +107,23 @@ module LogStash; class BasePipeline
LogStash::Compiler.compile_sources(sources_with_metadata, @settings)
end
def plugin(plugin_type, name, *args)
def plugin(plugin_type, name, line, column, *args)
@plugin_counter += 1
# Collapse the array of arguments into a single merged hash
args = args.reduce({}, &:merge)
id = if args["id"].nil? || args["id"].empty?
args["id"] = "#{@config_hash}-#{@plugin_counter}"
else
args["id"]
# Pull the ID from LIR to keep IDs consistent between the two representations
id = lir.graph.vertices.filter do |v|
v.source_with_metadata &&
v.source_with_metadata.line == line &&
v.source_with_metadata.column == column
end.findFirst.get.id
args["id"] = id # some code pulls the id out of the args
if !id
raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}"
end
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]

View file

@ -365,6 +365,30 @@ describe LogStash::Pipeline do
end
end
context "with no explicit ids declared" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end
let(:config) { "input { dummyinput {} } filter { dummyfilter {} } output { dummyoutput {} }"}
let(:pipeline) { mock_pipeline_from_string(config) }
after do
# If you don't start/stop the pipeline it won't release the queue lock and will
# cause the suite to fail :(
pipeline.close
end
it "should use LIR provided IDs" do
expect(pipeline.inputs.first.id).to eq(pipeline.lir.input_plugin_vertices.first.id)
expect(pipeline.filters.first.id).to eq(pipeline.lir.filter_plugin_vertices.first.id)
expect(pipeline.outputs.first.id).to eq(pipeline.lir.output_plugin_vertices.first.id)
end
end
context "compiled flush function" do
describe "flusher thread" do
before(:each) do