Backport of PR #11773 to branch 7.x

Backports a PR that moved code out of LogStash::Compiler to org.logstash.config.ir.ConfigCompiler
This commit is contained in:
andsel 2020-04-10 17:43:24 +02:00 committed by J.A.R.V.I.S. - an Elastic git bot
parent d767848394
commit 72f4e2f8e2
8 changed files with 194 additions and 146 deletions

View file

@ -17,35 +17,9 @@
require 'logstash/compiler/lscl/lscl_grammar'
java_import org.logstash.config.ir.PipelineIR
java_import org.logstash.config.ir.graph.Graph
module LogStash; class Compiler
include ::LogStash::Util::Loggable
def self.compile_sources(sources_with_metadata, support_escapes)
graph_sections = sources_with_metadata.map do |swm|
self.compile_graph(swm, support_escapes)
end
input_graph = Graph.combine(*graph_sections.map {|s| s[:input] }).graph
output_graph = Graph.combine(*graph_sections.map {|s| s[:output] }).graph
filter_graph = graph_sections.reduce(nil) do |acc, s|
filter_section = s[:filter]
if acc.nil?
filter_section
else
acc.chain(filter_section)
end
end
original_source = sources_with_metadata.map(&:text).join("\n")
PipelineIR.new(input_graph, filter_graph, output_graph, original_source)
end
def self.compile_imperative(source_with_metadata, support_escapes)
if !source_with_metadata.is_a?(org.logstash.common.SourceWithMetadata)
raise ArgumentError, "Expected 'org.logstash.common.SourceWithMetadata', got #{source_with_metadata.class}"
@ -61,8 +35,4 @@ module LogStash; class Compiler
config.process_escape_sequences = support_escapes
config.compile(source_with_metadata)
end
def self.compile_graph(source_with_metadata, support_escapes)
Hash[compile_imperative(source_with_metadata, support_escapes).map {|section,icompiled| [section, icompiled.toGraph]}]
end
end; end

View file

@ -49,103 +49,6 @@ describe LogStash::Compiler do
end
end
describe "compile with empty source" do
let(:sources_with_metadata) do
[
org.logstash.common.SourceWithMetadata.new("str", "in_plugin", 0, 0, "input { input_0 {} } "),
org.logstash.common.SourceWithMetadata.new("str", "out_plugin", 0, 0, "output { output_0 {} } "),
org.logstash.common.SourceWithMetadata.new("str", "<empty>", 0, 0, " ")
]
end
it "should compile only the text parts" do
described_class.compile_sources(sources_with_metadata, false)
end
end
describe "compile with fully commented source" do
let(:sources_with_metadata) do
[
org.logstash.common.SourceWithMetadata.new("str", "in_plugin", 0, 0, "input { input_0 {} } "),
org.logstash.common.SourceWithMetadata.new("str", "commented_filter", 0, 0, "#filter{...}\n"),
org.logstash.common.SourceWithMetadata.new("str", "out_plugin", 0, 0, "output { output_0 {} } "),
]
end
it "should compile only non commented text parts" do
described_class.compile_sources(sources_with_metadata, false)
end
end
describe "compiling to Pipeline" do
subject(:source_id) { "fake_sourcefile" }
let(:source_with_metadata) { org.logstash.common.SourceWithMetadata.new(source_protocol, source_id, 0, 0, source) }
subject(:compiled) { puts "PCOMP"; described_class.compile_pipeline(source_with_metadata, settings) }
describe "compiling multiple sources" do
let(:sources) do
[
"input { input_0 {} } filter { filter_0 {} } output { output_0 {} }",
"input { input_1 {} } filter { filter_1 {} } output { output_1 {} }"
]
end
let(:sources_with_metadata) do
sources.map.with_index do |source, idx|
org.logstash.common.SourceWithMetadata.new("#{source_protocol}_#{idx}", "#{source_id}_#{idx}", 0, 0, source)
end
end
subject(:pipeline) { described_class.compile_sources(sources_with_metadata, false) }
it "should generate a hash" do
expect(pipeline.unique_hash).to be_a(String)
end
it "should compile cleanly" do
expect(pipeline).to be_a(org.logstash.config.ir.PipelineIR)
end
it "should provide the original source" do
expect(pipeline.original_source).to eq(sources.join("\n"))
end
describe "applying protocol and id metadata" do
it "should apply the correct source metadata to all components" do
# TODO: seems to be a jruby regression we cannot currently call each on a stream
pipeline.get_plugin_vertices.each do |pv|
name_idx = pv.plugin_definition.name.split("_").last
source_protocol_idx = pv.source_with_metadata.protocol.split("_").last
source_id_idx = pv.source_with_metadata.id.split("_").last
expect(name_idx).to eq(source_protocol_idx)
expect(name_idx).to eq(source_id_idx)
end
end
end
end
describe "complex configs" do
shared_examples_for "compilable LSCL files" do |path|
describe "parsing #{path}" do
let(:source) { File.read(path) }
it "should compile" do
expect(compiled).to be_java_kind_of(Java::OrgLogstashConfigIr::Pipeline)
end
it "should have a hash" do
expect(compiled.uniqueHash)
end
end
end
Dir.glob(File.join(SUPPORT_DIR, "lscl_configs", "*.conf")).each do |path|
it_should_behave_like "compilable LSCL files", path
end
end
end
describe "compiling imperative" do
let(:source_id) { "fake_sourcefile" }
let(:source_with_metadata) { org.logstash.common.SourceWithMetadata.new(source_protocol, source_id, 0, 0, source) }

View file

@ -21,12 +21,20 @@
package org.logstash.config.ir;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.graph.Graph;
import org.logstash.config.ir.imperative.Statement;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.*;
/**
* Java Implementation of the config compiler that is implemented by wrapping the Ruby
@ -42,17 +50,75 @@ public final class ConfigCompiler {
* @param sourcesWithMetadata Logstash Config partitioned
* @param supportEscapes The value of the setting {@code config.support_escapes}
* @return Compiled {@link PipelineIR}
* @throws InvalidIRException if the the configuration contains errors
*/
@SuppressWarnings("unchecked")
public static PipelineIR configToPipelineIR(final @SuppressWarnings("rawtypes") RubyArray sourcesWithMetadata,
final boolean supportEscapes) {
final boolean supportEscapes) throws InvalidIRException {
return compileSources((List<SourceWithMetadata>) sourcesWithMetadata, supportEscapes);
}
public static PipelineIR compileSources(List<SourceWithMetadata> sourcesWithMetadata, boolean supportEscapes) throws InvalidIRException {
Map<PluginDefinition.Type, List<Graph>> groupedPipelineSections = sourcesWithMetadata.stream()
.map(swm -> compileGraph(swm, supportEscapes))
.flatMap(m -> m.entrySet().stream())
.filter(e -> e.getValue() != null)
.collect(groupingBy(Map.Entry::getKey,
mapping(Map.Entry::getValue, toList())));
Graph inputGraph = Graph.combine(groupedPipelineSections.get(PluginDefinition.Type.INPUT).toArray(new Graph[0])).graph;
Graph outputGraph = Graph.combine(groupedPipelineSections.get(PluginDefinition.Type.OUTPUT).toArray(new Graph[0])).graph;
Graph filterGraph = groupedPipelineSections.get(PluginDefinition.Type.FILTER).stream()
.reduce(ConfigCompiler::chainWithUntypedException).orElse(null);
String originalSource = sourcesWithMetadata.stream().map(SourceWithMetadata::getText).collect(Collectors.joining("\n"));
return new PipelineIR(inputGraph, filterGraph, outputGraph, originalSource);
}
private static Graph chainWithUntypedException(Graph g1, Graph g2) {
try {
return g1.chain(g2);
} catch (InvalidIRException iirex) {
throw new IllegalArgumentException(iirex);
}
}
private static Map<PluginDefinition.Type, Statement> compileImperative(SourceWithMetadata sourceWithMetadata,
boolean supportEscapes) {
final IRubyObject compiler = RubyUtil.RUBY.executeScript(
"require 'logstash/compiler'\nLogStash::Compiler",
""
);
final IRubyObject code =
compiler.callMethod(RubyUtil.RUBY.getCurrentContext(), "compile_sources",
new IRubyObject[]{sourcesWithMetadata, RubyUtil.RUBY.newBoolean(supportEscapes)}
);
return code.toJava(PipelineIR.class);
// invoke Ruby interpreter to execute LSCL treetop
final IRubyObject code = compiler.callMethod(RubyUtil.RUBY.getCurrentContext(), "compile_imperative",
new IRubyObject[]{
JavaUtil.convertJavaToRuby(RubyUtil.RUBY, sourceWithMetadata),
RubyUtil.RUBY.newBoolean(supportEscapes)
});
RubyHash hash = (RubyHash) code;
Map<PluginDefinition.Type, Statement> result = new HashMap<>();
result.put(PluginDefinition.Type.INPUT, readStatementFromRubyHash(hash, "input"));
result.put(PluginDefinition.Type.FILTER, readStatementFromRubyHash(hash, "filter"));
result.put(PluginDefinition.Type.OUTPUT, readStatementFromRubyHash(hash, "output"));
return result;
}
private static Statement readStatementFromRubyHash(RubyHash hash, String key) {
IRubyObject inputValue = hash.fastARef(RubyUtil.RUBY.newSymbol(key));
return inputValue.toJava(Statement.class);
}
private static Map<PluginDefinition.Type, Graph> compileGraph(SourceWithMetadata swm, boolean supportEscapes) {
Map<PluginDefinition.Type, Statement> pluginStatements = compileImperative(swm, supportEscapes);
return pluginStatements.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> toGraphWithUntypedException(e.getValue())));
}
private static Graph toGraphWithUntypedException(Statement s) {
try {
return s.toGraph();
} catch (InvalidIRException iirex) {
throw new IllegalArgumentException(iirex);
}
}
}

View file

@ -53,6 +53,7 @@ import org.logstash.common.DeadLetterQueueFactory;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.ConfigCompiler;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.config.ir.PipelineIR;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
import org.logstash.ext.JRubyWrappedWriteClientExt;
@ -178,7 +179,11 @@ public class AbstractPipelineExt extends RubyBasicObject {
}
}
boolean supportEscapes = getSetting(context, "config.support_escapes").isTrue();
lir = ConfigCompiler.configToPipelineIR(configParts, supportEscapes);
try {
lir = ConfigCompiler.configToPipelineIR(configParts, supportEscapes);
} catch (InvalidIRException iirex) {
throw new IllegalArgumentException(iirex);
}
return this;
}

View file

@ -282,18 +282,18 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
@Test
public void correctlyCompilesRegexMatchesWithConstant() throws IncompleteSourceWithMetadataException {
public void correctlyCompilesRegexMatchesWithConstant() throws InvalidIRException {
verifyRegex("=~", 1);
}
@Test
public void correctlyCompilesRegexNoMatchesWithConstant() throws IncompleteSourceWithMetadataException {
public void correctlyCompilesRegexNoMatchesWithConstant() throws InvalidIRException {
verifyRegex("!~", 0);
}
@SuppressWarnings({"unchecked"})
private void verifyRegex(String operator, int expectedEvents)
throws IncompleteSourceWithMetadataException {
throws InvalidIRException {
final Event event = new Event();
final JrubyEventExtLibrary.RubyEvent testEvent =
@ -457,8 +457,8 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
@SuppressWarnings({"unchecked"})
private void verifyComparison(final boolean expected, final String conditional,
final Event event) throws IncompleteSourceWithMetadataException {
private void verifyComparison(final boolean expected, final String conditional, final Event event)
throws InvalidIRException {
final JrubyEventExtLibrary.RubyEvent testEvent =
JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, event);

View file

@ -21,7 +21,14 @@
package org.logstash.config.ir;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
@ -30,9 +37,11 @@ import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.graph.Graph;
import org.logstash.config.ir.graph.PluginVertex;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;
public class ConfigCompilerTest extends RubyEnvTestCase {
@ -41,7 +50,7 @@ public class ConfigCompilerTest extends RubyEnvTestCase {
IRubyObject swm = JavaUtil.convertJavaToRuby(
RubyUtil.RUBY, new SourceWithMetadata("proto", "path", 1, 1, "input {stdin{}} output{stdout{}}"));
final PipelineIR pipelineIR =
ConfigCompiler.configToPipelineIR(RubyUtil.RUBY.newArray(swm), false);
ConfigCompiler.configToPipelineIR(RubyUtil.RUBY.newArray(swm), false);
assertThat(pipelineIR.getOutputPluginVertices().size(), is(1));
assertThat(pipelineIR.getFilterPluginVertices().size(), is(0));
}
@ -50,6 +59,7 @@ public class ConfigCompilerTest extends RubyEnvTestCase {
* Tests that repeatedly parsing the same config (containing a large number of duplicated sections)
* into a {@link Graph} repeatedly results in a graph with a constant (i.e. deterministic)
* hash code as returned by {@link Graph#uniqueHash()}.
*
* @throws Exception On Failure
*/
@Test
@ -70,6 +80,7 @@ public class ConfigCompilerTest extends RubyEnvTestCase {
* Tests that repeatedly parsing the same complex config String into a {@link Graph} repeatedly
* results in a graph with a constant (i.e. deterministic) hash code as returned by
* {@link Graph#uniqueHash()}.
*
* @throws Exception On Failure
*/
@Test
@ -87,9 +98,99 @@ public class ConfigCompilerTest extends RubyEnvTestCase {
assertThat(graphHash(config), is(first));
}
private static String graphHash(final String config) throws IncompleteSourceWithMetadataException {
private static String graphHash(final String config) throws InvalidIRException {
IRubyObject swm = JavaUtil.convertJavaToRuby(
RubyUtil.RUBY, new SourceWithMetadata("proto", "path", 1, 1, config));
return ConfigCompiler.configToPipelineIR(RubyUtil.RUBY.newArray(swm), false).uniqueHash();
}
@Test
public void testCompileWithAnEmptySource() throws InvalidIRException {
List<SourceWithMetadata> sourcesWithMetadata = Arrays.asList(
new SourceWithMetadata("str", "in_plugin", 0, 0, "input { input_0 {} } "),
new SourceWithMetadata("str", "out_plugin", 0, 0, "output { output_0 {} } "),
new SourceWithMetadata("str", "<empty>", 0, 0, " ")
);
PipelineIR pipeline = ConfigCompiler.compileSources(sourcesWithMetadata, false);
assertEquals("should compile only the text parts", 2L, pipeline.pluginVertices().count());
}
@Test
public void testCompileWithFullyCommentedSource() throws InvalidIRException {
List<SourceWithMetadata> sourcesWithMetadata = Arrays.asList(
new SourceWithMetadata("str", "in_plugin", 0, 0, "input { input_0 {} } "),
new SourceWithMetadata("str","commented_filter",0,0,"#filter{...}\n"),
new SourceWithMetadata("str","out_plugin",0,0,"output { output_0 {} } ")
);
PipelineIR pipeline = ConfigCompiler.compileSources(sourcesWithMetadata, false);
assertEquals("should compile only non commented text parts", 2L, pipeline.pluginVertices().count());
}
@Test
public void testCompilingPipelineWithMultipleSources() throws InvalidIRException {
String sourceId = "fake_sourcefile";
String sourceProtocol = "test_proto";
SourceWithMetadata sourceWithMetadata = new SourceWithMetadata(sourceProtocol, sourceId, 0, 0, "booo");
String[] sources = new String[] {
"input { input_0 {} } filter { filter_0 {} } output { output_0 {} }",
"input { input_1 {} } filter { filter_1 {} } output { output_1 {} }"};
List<SourceWithMetadata> sourcesWithMetadata = Arrays.asList(
new SourceWithMetadata(sourceProtocol + "_" + 0, sourceId + "_" + 0, 0, 0, sources[0]),
new SourceWithMetadata(sourceProtocol + "_" + 1, sourceId + "_" + 1, 0, 0, sources[1]));
PipelineIR pipeline = ConfigCompiler.compileSources(sourcesWithMetadata, false);
assertFalse("should generate a hash", pipeline.uniqueHash().isEmpty());
assertEquals("should provide the original source", String.join("\n", sources),
pipeline.getOriginalSource());
verifyApplyingProtocolAndIdMetadata(pipeline);
}
private void verifyApplyingProtocolAndIdMetadata(PipelineIR pipeline) {
for (PluginVertex pv : pipeline.getPluginVertices()) {
String nameIdx = last(pv.getPluginDefinition().getName().split("_"));
String sourceProtocolIdx = last(pv.getSourceWithMetadata().getProtocol().split("_"));
String sourceIdIdx = last(pv.getSourceWithMetadata().getId().split("_"));
assertEquals("should apply the correct source metadata to protocol", nameIdx, sourceProtocolIdx);
assertEquals("should apply the correct source metadata to id", nameIdx, sourceIdIdx);
}
}
private static String last(String[] s) {
return s[s.length - 1];
}
@Test
public void testComplexConfigs() throws IOException {
Path path = Paths.get(".").toAbsolutePath().resolve("../spec/support/lscl_configs").normalize();
Files.list(path).forEach(this::verifyComplexConfig);
}
private void verifyComplexConfig(Path path) {
String configName = path.getFileName().toString();
String source = null;
try {
source = new String(Files.readAllBytes(path));
} catch (IOException e) {
fail(configName + " not readable");
}
PipelineIR pipelineIR = null;
try {
SourceWithMetadata sourceWithMetadata = new SourceWithMetadata("test_proto", "fake_sourcefile", 0, 0, source);
pipelineIR = ConfigCompiler.compileSources(Collections.singletonList(sourceWithMetadata), false);
} catch (InvalidIRException iirex) {
fail("error compiling " + configName + ": " + iirex.getMessage());
}
assertNotNull(configName + " should compile", pipelineIR);
assertFalse(configName + " should have a hash", pipelineIR.uniqueHash().isEmpty());
}
}

View file

@ -31,6 +31,7 @@ import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.ConfigCompiler;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.RubyEnvTestCase;
import org.logstash.instrument.metrics.NamespacedMetricExt;
@ -83,7 +84,7 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase {
}
@Test
public void testPluginIdResolvedWithEnvironmentVariables() throws IncompleteSourceWithMetadataException {
public void testPluginIdResolvedWithEnvironmentVariables() throws InvalidIRException {
PluginFactoryExt.PluginResolver mockPluginResolver = wrapWithSearchable(MockInputPlugin.class);
SourceWithMetadata sourceWithMetadata = new SourceWithMetadata("proto", "path", 1, 8, "input {mockinput{ id => \"${CUSTOM}\"}} output{mockoutput{}}");
@ -109,7 +110,7 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase {
}
@SuppressWarnings("rawtypes")
private static PipelineIR compilePipeline(SourceWithMetadata sourceWithMetadata) {
private static PipelineIR compilePipeline(SourceWithMetadata sourceWithMetadata) throws InvalidIRException {
RubyArray sourcesWithMetadata = RubyUtil.RUBY.newArray(JavaUtil.convertJavaToRuby(
RubyUtil.RUBY, sourceWithMetadata));

View file

@ -22,7 +22,9 @@ describe ::LogStash::Config::LIRSerializer do
end
let(:lir_pipeline) do
::LogStash::Compiler.compile_sources(config_source_with_metadata, LogStash::SETTINGS)
# ::LogStash::Compiler.compile_sources(config_source_with_metadata, true)
java_import org.logstash.config.ir.ConfigCompiler
ConfigCompiler.compileSources(config_source_with_metadata, true)
end
describe "#serialize" do