CONFIG COMPILER: Logging to local source files

This reverts commit e5f359367f.

Fixes #8913
This commit is contained in:
Armin 2017-12-29 17:41:08 +03:00 committed by Armin Braun
parent da70fc6da6
commit 284366d01c
14 changed files with 461 additions and 366 deletions

View file

@ -64,6 +64,7 @@ configurations.archives {
task javaTests(type: Test) {
exclude '/org/logstash/RSpecTests.class'
exclude '/org/logstash/config/ir/ConfigCompilerTest.class'
exclude '/org/logstash/config/ir/CompiledPipelineTest.class'
}
task rubyTests(type: Test) {
@ -72,6 +73,7 @@ task rubyTests(type: Test) {
systemProperty 'logstash.core.root.dir', projectDir.absolutePath
include '/org/logstash/RSpecTests.class'
include '/org/logstash/config/ir/ConfigCompilerTest.class'
include '/org/logstash/config/ir/CompiledPipelineTest.class'
}
test {
@ -115,9 +117,10 @@ dependencies {
compile "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
compile "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
compile "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
compile 'org.codehaus.janino:janino:3.0.7'
compile 'org.codehaus.janino:janino:3.0.8'
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jacksonVersion}"
compile "org.jruby:jruby-complete:${jrubyVersion}"
compile 'com.google.googlejavaformat:google-java-format:1.5'
testCompile 'org.apache.logging.log4j:log4j-core:2.6.2:tests'
testCompile 'org.apache.logging.log4j:log4j-api:2.6.2:tests'
testCompile 'junit:junit:4.12'

View file

@ -374,9 +374,6 @@ module LogStash; class JavaPipeline < JavaBasePipeline
pipeline_workers.times do |t|
batched_execution = @lir_execution.buildExecution
if t.eql? 0
@logger.debug ("Generated Java pipeline entry class: " + batched_execution.class.to_s)
end
thread = Thread.new(self, batched_execution) do |_pipeline, _batched_execution|
_pipeline.worker_loop(_batched_execution)
end
@ -384,12 +381,6 @@ module LogStash; class JavaPipeline < JavaBasePipeline
@worker_threads << thread
end
if @logger.debug? || @logger.trace?
@lir_execution.getGeneratedSource.each do |line|
@logger.debug line
end
end
# inputs should be started last, after all workers
begin
start_inputs

View file

@ -4,15 +4,17 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyHash;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.Rubyfier;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.compiler.ComputeStepSyntaxElement;
import org.logstash.config.ir.compiler.Dataset;
import org.logstash.config.ir.compiler.DatasetCompiler;
import org.logstash.config.ir.compiler.EventCondition;
@ -33,6 +35,8 @@ import org.logstash.ext.JrubyEventExtLibrary;
*/
public final class CompiledPipeline {
private static final Logger LOGGER = LogManager.getLogger(CompiledPipeline.class);
/**
* Configured inputs.
*/
@ -88,14 +92,6 @@ public final class CompiledPipeline {
return new CompiledPipeline.CompiledExecution().toDataset();
}
/**
* Get the generated source
* @return sorted and formatted lines of generated code
*/
public List<String> getGeneratedSource(){
return DatasetCompiler.getGeneratedSource();
}
/**
* Sets up all Ruby outputs learnt from {@link PipelineIR}.
*/
@ -235,15 +231,14 @@ public final class CompiledPipeline {
* @return Compiled {@link Dataset} representing the pipeline.
*/
private Dataset compile() {
final Collection<Vertex> outputs = pipelineIR.getGraph()
final Collection<Vertex> outputNodes = pipelineIR.getGraph()
.allLeaves().filter(CompiledPipeline.this::isOutput)
.collect(Collectors.toList());
if (outputs.isEmpty()) {
if (outputNodes.isEmpty()) {
return DatasetCompiler.ROOT_DATASETS.iterator().next();
} else {
return DatasetCompiler.terminalDataset(outputs.stream().map(
leaf ->
outputDataset(leaf.getId(), getConfigSource(leaf), flatten(DatasetCompiler.ROOT_DATASETS, leaf))
return DatasetCompiler.terminalDataset(outputNodes.stream().map(
leaf -> outputDataset(leaf, flatten(DatasetCompiler.ROOT_DATASETS, leaf))
).collect(Collectors.toList()));
}
}
@ -251,31 +246,38 @@ public final class CompiledPipeline {
/**
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
* the application of the given filter.
* @param vertexId Vertex Id of the filter to create this {@link Dataset}
* @param configSource The Logstash configuration that maps to the returned Dataset
* @param vertex Vertex of the filter to create this {@link Dataset} for
* @param datasets All the datasets that pass through this filter
* @return Filter {@link Dataset}
*/
private Dataset filterDataset(final String vertexId, String configSource, final Collection<Dataset> datasets) {
private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> datasets) {
return plugins.computeIfAbsent(
vertexId, v -> DatasetCompiler.filterDataset(datasets, filters.get(v), configSource)
vertex.getId(), v -> {
final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.filterDataset(datasets, filters.get(v));
LOGGER.debug("Compiled filter\n {} \n into \n {}", vertex, prepared);
return prepared.instantiate();
}
);
}
/**
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
* the application of the given output.
* @param vertexId Vertex Id of the filter to create this {@link Dataset} for
* filter node in the topology once
* @param configSource The Logstash configuration that maps to the returned Dataset
* @param vertex Vertex of the output to create this {@link Dataset} for
* @param datasets All the datasets that are passed into this output
* @return Output {@link Dataset}
*/
private Dataset outputDataset(final String vertexId, String configSource, final Collection<Dataset> datasets) {
private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> datasets) {
return plugins.computeIfAbsent(
vertexId, v -> DatasetCompiler.outputDataset(
datasets, outputs.get(v), configSource, outputs.size() == 1
)
vertex.getId(), v -> {
final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.outputDataset(
datasets, outputs.get(v), outputs.size() == 1
);
LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared);
return prepared.instantiate();
}
);
}
@ -284,14 +286,20 @@ public final class CompiledPipeline {
* the {@link JrubyEventExtLibrary.RubyEvent} that fulfil the given {@link EventCondition}.
* @param datasets Datasets to split
* @param condition Condition that must be fulfilled
* @param index Vertex id to cache the resulting {@link Dataset} under
* @param configSource The Logstash configuration that maps to the returned Dataset
* @param vertex Vertex id to cache the resulting {@link Dataset} under
* @return The half of the datasets contents that fulfils the condition
*/
private SplitDataset split(final Collection<Dataset> datasets,
final EventCondition condition, final String index, String configSource) {
final EventCondition condition, final Vertex vertex) {
return iffs.computeIfAbsent(
index, ind -> DatasetCompiler.splitDataset(datasets, condition, configSource)
vertex.getId(), v -> {
final ComputeStepSyntaxElement<SplitDataset> prepared =
DatasetCompiler.splitDataset(datasets, condition);
LOGGER.debug(
"Compiled conditional\n {} \n into \n {}", vertex, prepared
);
return prepared.instantiate();
}
);
}
@ -325,11 +333,10 @@ public final class CompiledPipeline {
return dependencies.stream().map(
dependency -> {
final Collection<Dataset> transientDependencies = flatten(datasets, dependency);
final String id = dependency.getId();
if (isFilter(dependency)) {
return filterDataset(id, getConfigSource(dependency), transientDependencies);
return filterDataset(dependency, transientDependencies);
} else if (isOutput(dependency)) {
return outputDataset(id, getConfigSource(dependency), transientDependencies);
return outputDataset(dependency, transientDependencies);
} else {
// We know that it's an if vertex since the the input children are either
// output, filter or if in type.
@ -337,7 +344,7 @@ public final class CompiledPipeline {
final SplitDataset ifDataset = split(
transientDependencies,
EventCondition.Compiler.buildCondition(ifvert.getBooleanExpression()),
id, getConfigSource(dependency)
dependency
);
// It is important that we double check that we are actually dealing with the
// positive/left branch of the if condition
@ -351,21 +358,4 @@ public final class CompiledPipeline {
}).collect(Collectors.toList());
}
}
/**
* Gets the configuration source for debugging purposes. Uses the metadata text if it exists, else the vertex toString method
* @param vertex The {@link Vertex} to read the Logstash configuration source
* @return A String that can be useful for debugging the Logstash configuration to generated Dataset/code
*/
private String getConfigSource(Vertex vertex){
if( vertex == null){
return "(vertex is null, this is likely a bug)";
}
//conditionals will use this
if(vertex.getSourceWithMetadata() == null){
return vertex.toString();
}
String text = vertex.getSourceWithMetadata().getText();
return text == null ? "(vertex.getSourceWithMetadata().getText() is null, this is likely a bug)" : text;
}
}

View file

@ -72,13 +72,17 @@ final class Closure implements MethodLevelSyntaxElement {
return this;
}
public boolean empty() {
return statements.isEmpty();
}
@Override
public String generateCode() {
final Collection<MethodLevelSyntaxElement> optimized =
this.optimizeRubyThreadContexts().statements;
return optimized.isEmpty() ? "" : SyntaxFactory.join(
optimized.stream().map(MethodLevelSyntaxElement::generateCode).collect(
Collectors.joining(";")
Collectors.joining(";\n")
), ";"
);
}

View file

@ -1,49 +1,52 @@
package org.logstash.config.ir.compiler;
import com.google.googlejavaformat.java.Formatter;
import com.google.googlejavaformat.java.FormatterException;
import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.janino.ClassBodyEvaluator;
import org.codehaus.commons.compiler.ICookable;
import org.codehaus.commons.compiler.ISimpleCompiler;
import org.codehaus.janino.SimpleCompiler;
/**
* One step a compiled pipeline. In the current implementation each step compiles to a
* {@link Dataset}.
* One step of a compiled pipeline that compiles to a {@link Dataset}.
*/
final class ComputeStepSyntaxElement implements SyntaxElement {
public final class ComputeStepSyntaxElement<T extends Dataset> implements SyntaxElement {
private static final Logger LOGGER = LogManager.getLogger(ComputeStepSyntaxElement.class);
private static final DynamicClassLoader CLASS_LOADER = new DynamicClassLoader();
private static final Path SOURCE_DIR = debugDir();
private static final ISimpleCompiler COMPILER = new SimpleCompiler();
/**
* Cache of runtime compiled classes to prevent duplicate classes being compiled.
*/
private static final Map<ComputeStepSyntaxElement, Class<? extends Dataset>> CLASS_CACHE
private static final Map<ComputeStepSyntaxElement<?>, Class<? extends Dataset>> CLASS_CACHE
= new HashMap<>();
private static final Map<String, String> SOURCE_CACHE = new HashMap<>();
private static final Map<String, String> CONFIG_SOURCE_CACHE = new HashMap<>();
/**
* Sequence number to ensure unique naming for runtime compiled classes.
*/
private static final AtomicInteger SEQUENCE = new AtomicInteger(0);
private static final int INDENT_WIDTH = 4;
/**
* Pattern to remove redundant {@code ;} from formatted code since {@link Formatter} does not
* remove those.
*/
private static final Pattern REDUNDANT_SEMICOLON = Pattern.compile("\n[ ]*;\n");
private final String name;
@ -51,110 +54,109 @@ final class ComputeStepSyntaxElement implements SyntaxElement {
private final ClassFields fields;
private final String configSource;
/**
* Get the generated source
* @return sorted and formatted lines of generated code
*/
public static List<String> getGeneratedSource() {
List<String> output = new ArrayList<>();
output.add("/******************************************************************************************");
CONFIG_SOURCE_CACHE.forEach((k, v) -> {
output.add("* " + v + " <==> " + k.replaceAll("[\\t\\n\\r\\s]+",""));
});
output.add("******************************************************************************************/");
SOURCE_CACHE.forEach((k, v) -> {
output.add(String.format("class %s {", k));
LOGGER.trace("{}:{}", k, v);
getFormattedLines(v, output, INDENT_WIDTH);
output.add("}");
});
return output;
}
private static List<String> getFormattedLines(String input, List<String> output, int indent) {
int curlyOpen = input.indexOf("{");
int curlyClose = input.indexOf("}");
int semiColon = input.indexOf(";");
List<Integer> positions = Arrays.asList(curlyOpen, curlyClose, semiColon);
positions.sort(Comparator.naturalOrder());
Optional<Integer> firstMatch = positions.stream().filter(i -> i >= 0).findFirst();
if (firstMatch.isPresent()) {
int pos = firstMatch.get();
int preIndent = indent;
int postIndent = indent;
if (pos == curlyOpen) {
postIndent += INDENT_WIDTH;
} else if (pos == curlyClose) {
preIndent -= INDENT_WIDTH;
postIndent = preIndent;
}
if (input.trim().length() > 0) {
String sub = input.substring(0, pos + 1);
if (!sub.equals(";")) {
output.add(String.format("%" + preIndent + "s%s", " ", sub));
}
}
return getFormattedLines(input.substring(pos + 1), output, postIndent);
}
if (input.trim().equals("}")) {
indent -= INDENT_WIDTH;
}
if (input.trim().length() > 0 && !input.trim().equals(";")) {
output.add(String.format("%" + indent + "s%s", " ", input));
}
return output;
}
private final Class<T> type;
ComputeStepSyntaxElement(final Iterable<MethodSyntaxElement> methods,
final ClassFields fields, DatasetCompiler.DatasetFlavor datasetFlavor, String configSource) {
this(String.format("Generated%d_" + datasetFlavor.getDisplay() + "Dataset", SEQUENCE.incrementAndGet()), methods, fields, configSource);
final ClassFields fields, final Class<T> interfce) {
this(
String.format("CompiledDataset%d", SEQUENCE.incrementAndGet()), methods, fields,
interfce
);
}
private ComputeStepSyntaxElement(final String name, final Iterable<MethodSyntaxElement> methods,
final ClassFields fields, String configSource) {
final ClassFields fields, final Class<T> interfce) {
this.name = name;
this.methods = methods;
this.fields = fields;
this.configSource = configSource;
type = interfce;
}
public <T extends Dataset> T instantiate(final Class<T> interfce) {
try {
final Class<? extends Dataset> clazz;
if (CLASS_CACHE.containsKey(this)) {
clazz = CLASS_CACHE.get(this);
} else {
final ClassBodyEvaluator se = new ClassBodyEvaluator();
se.setParentClassLoader(CLASS_LOADER);
se.setImplementedInterfaces(new Class[]{interfce});
se.setClassName(name);
String code = generateCode();
SOURCE_CACHE.put(name, generateCode());
se.cook(new StringReader(code));
se.toString();
clazz = (Class<T>) se.getClazz();
CLASS_LOADER.addClass(clazz);
CLASS_CACHE.put(this, clazz);
@SuppressWarnings("unchecked")
public T instantiate() {
// We need to globally synchronize to avoid concurrency issues with the internal class
// loader and the CLASS_CACHE
synchronized (COMPILER) {
try {
final Class<? extends Dataset> clazz;
if (CLASS_CACHE.containsKey(this)) {
clazz = CLASS_CACHE.get(this);
} else {
final String code = generateCode();
final Path sourceFile = SOURCE_DIR.resolve(String.format("%s.java", name));
Files.write(sourceFile, code.getBytes(StandardCharsets.UTF_8));
COMPILER.cookFile(sourceFile.toFile());
COMPILER.setParentClassLoader(COMPILER.getClassLoader());
clazz = (Class<T>) COMPILER.getClassLoader().loadClass(
String.format("org.logstash.generated.%s", name)
);
CLASS_CACHE.put(this, clazz);
}
return (T) clazz.<T>getConstructor(ctorTypes()).newInstance(ctorArguments());
} catch (final CompileException | ClassNotFoundException | IOException
| NoSuchMethodException | InvocationTargetException | InstantiationException
| IllegalAccessException ex) {
throw new IllegalStateException(ex);
}
CONFIG_SOURCE_CACHE.putIfAbsent(configSource, clazz.getName());
return (T) clazz.<T>getConstructor(ctorTypes()).newInstance(ctorArguments());
} catch (final CompileException | IOException | NoSuchMethodException
| InvocationTargetException | InstantiationException | IllegalAccessException ex) {
}
}
@Override
public String generateCode() {
try {
return REDUNDANT_SEMICOLON.matcher(new Formatter().formatSource(
String.format(
"package org.logstash.generated;\npublic final class %s implements %s { %s }",
name,
type.getName(),
SyntaxFactory.join(
fields.inlineAssigned().generateCode(), fieldsAndCtor(),
combine(
StreamSupport.stream(methods.spliterator(), false)
.toArray(SyntaxElement[]::new)
)
)
)
)).replaceAll("\n");
} catch (final FormatterException ex) {
throw new IllegalStateException(ex);
}
}
@Override
public int hashCode() {
return normalizedSource().hashCode();
}
@Override
public boolean equals(final Object other) {
return other instanceof ComputeStepSyntaxElement &&
normalizedSource().equals(((ComputeStepSyntaxElement<?>) other).normalizedSource());
}
@Override
public String toString() {
return generateCode();
}
private static Path debugDir() {
final Path sourceDir;
try {
final Path parentDir;
final String dir = System.getProperty(ICookable.SYSTEM_PROPERTY_SOURCE_DEBUGGING_DIR);
if (dir == null) {
parentDir = Files.createTempDirectory("logstash");
} else {
parentDir = Paths.get(dir);
}
sourceDir = parentDir.resolve("org").resolve("logstash").resolve("generated");
Files.createDirectories(sourceDir);
} catch (final IOException ex) {
throw new IllegalStateException(ex);
}
return sourceDir;
}
/**
* @return Array of constructor argument types with the same ordering that is used by
* {@link #ctorArguments()}.
@ -173,34 +175,13 @@ final class ComputeStepSyntaxElement implements SyntaxElement {
.map(FieldDefinition::getCtorArgument).toArray();
}
@Override
public String generateCode() {
return SyntaxFactory.join(
combine(
StreamSupport.stream(methods.spliterator(), false)
.toArray(SyntaxElement[]::new)
), fields.inlineAssigned().generateCode(), fieldsAndCtor()
);
}
@Override
public int hashCode() {
return normalizedSource().hashCode();
}
@Override
public boolean equals(final Object other) {
return other instanceof ComputeStepSyntaxElement &&
normalizedSource().equals(((ComputeStepSyntaxElement) other).normalizedSource());
}
/**
* Normalizes away the name of the class so that two classes of different name but otherwise
* equivalent syntax get correctly compared by {@link #equals(Object)}.
* @return Source of this class, with its name set to {@code CONSTANT}.
*/
private String normalizedSource() {
return new ComputeStepSyntaxElement("CONSTANT", methods, fields, "")
return new ComputeStepSyntaxElement<>("CONSTANT", methods, fields, type)
.generateCode();
}
@ -227,13 +208,13 @@ final class ComputeStepSyntaxElement implements SyntaxElement {
}
/**
* Renders the string concatenation of the given {@link SyntaxElement}
* Renders the string concatenation of the given {@link SyntaxElement}, delimited by
* line breaks.
* @param parts Elements to concatenate
* @return Java source
*/
private static String combine(final SyntaxElement... parts) {
return Arrays.stream(parts).map(SyntaxElement::generateCode)
.collect(Collectors.joining(""));
.collect(Collectors.joining("\n"));
}
}

View file

@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.jruby.RubyArray;
import org.jruby.RubyHash;
@ -19,16 +18,6 @@ import org.logstash.ext.JrubyEventExtLibrary;
*/
public final class DatasetCompiler {
/**
* Argument passed to Ruby Filter flush method in generated code.
*/
public static final IRubyObject[] FLUSH_FINAL = {flushOpts(true)};
/**
* Argument passed to Ruby Filter flush method in generated code.
*/
public static final IRubyObject[] FLUSH_NOT_FINAL = {flushOpts(false)};
private static final String MULTI_RECEIVE = "multi_receive";
private static final String FLUSH = "flush";
@ -42,28 +31,13 @@ public final class DatasetCompiler {
public static final SyntaxFactory.IdentifierStatement BATCH_ARG =
SyntaxFactory.identifier("batchArg");
/**
* Hint to the intended purpose of a Dataset.
*/
enum DatasetFlavor {
ROOT("Root"), FILTER("Filter"), OUTPUT("Output"), CONDITIONAL("Conditional");
private final String display;
DatasetFlavor(final String display) {
this.display = display;
}
String getDisplay() {
return display;
}
}
/**
* Root {@link Dataset}s at the beginning of the execution tree that simply pass through
* the given set of {@link JrubyEventExtLibrary.RubyEvent} and have no state.
*/
public static final Collection<Dataset> ROOT_DATASETS = Collections.singleton(
compile(Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields(), DatasetFlavor.ROOT, "(root)")
prepare(Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields())
.instantiate()
);
private DatasetCompiler() {
@ -77,21 +51,18 @@ public final class DatasetCompiler {
* @param compute Method body of {@link Dataset#compute(RubyArray, boolean, boolean)}
* @param clear Method body of {@link Dataset#clear()}
* @param fieldValues Constructor Arguments
* @param datasetFlavor The flavor of {@link Dataset} to compile.
* @param configSource The Logstash configuration that maps to the returned Dataset
* This is only helpful for human debugging to differentiate between the intended usage of the {@link Dataset}
* @return Dataset Instance
*/
public static synchronized Dataset compile(final Closure compute, final Closure clear,
final ClassFields fieldValues, final DatasetFlavor datasetFlavor, String configSource) {
return new ComputeStepSyntaxElement(
public static synchronized ComputeStepSyntaxElement<Dataset> prepare(final Closure compute, final Closure clear,
final ClassFields fieldValues) {
return new ComputeStepSyntaxElement<>(
Arrays.asList(MethodSyntaxElement.compute(compute), MethodSyntaxElement.clear(clear)),
fieldValues, datasetFlavor, configSource
).instantiate(Dataset.class);
fieldValues, Dataset.class
);
}
public static SplitDataset splitDataset(final Collection<Dataset> parents,
final EventCondition condition, String configSource) {
public static ComputeStepSyntaxElement<SplitDataset> splitDataset(final Collection<Dataset> parents,
final EventCondition condition) {
final ClassFields fields = new ClassFields();
final Collection<ValueSyntaxElement> parentFields =
parents.stream().map(fields::add).collect(Collectors.toList());
@ -104,13 +75,14 @@ public final class DatasetCompiler {
final VariableDefinition event =
new VariableDefinition(JrubyEventExtLibrary.RubyEvent.class, "event");
final ValueSyntaxElement eventVal = event.access();
return new ComputeStepSyntaxElement(
return new ComputeStepSyntaxElement<>(
Arrays.asList(
MethodSyntaxElement.compute(
returnBuffer(ifData, done)
returnIffBuffered(ifData, done)
.add(bufferParents(parentFields, buffer))
.add(
SyntaxFactory.forLoop(event, buffer,
SyntaxFactory.forLoop(
event, buffer,
Closure.wrap(
SyntaxFactory.ifCondition(
fields.add(condition).call("fulfilled", eventVal),
@ -128,19 +100,18 @@ public final class DatasetCompiler {
.add(SyntaxFactory.assignment(done, SyntaxFactory.FALSE))
),
MethodSyntaxElement.right(elseData)
), fields, DatasetFlavor.CONDITIONAL, configSource
).instantiate(SplitDataset.class);
), fields, SplitDataset.class
);
}
/**
* Compiles a {@link Dataset} representing a filter plugin without flush behaviour.
* @param parents Parent {@link Dataset} to aggregate for this filter
* @param plugin Filter Plugin
* @param configSource The Logstash configuration that maps to the returned Dataset
* @return Dataset representing the filter plugin
*/
public static Dataset filterDataset(final Collection<Dataset> parents,
final RubyIntegration.Filter plugin, String configSource) {
public static ComputeStepSyntaxElement<Dataset> filterDataset(final Collection<Dataset> parents,
final RubyIntegration.Filter plugin) {
final ClassFields fields = new ClassFields();
final Collection<ValueSyntaxElement> parentFields =
parents.stream().map(fields::add).collect(Collectors.toList());
@ -151,7 +122,7 @@ public final class DatasetCompiler {
final ValueSyntaxElement filterField = fields.add(filter);
final ValueSyntaxElement done = fields.add(boolean.class);
final String multiFilter = "multi_filter";
final Closure body = returnBuffer(outputBuffer, done).add(
final Closure body = returnIffBuffered(outputBuffer, done).add(
bufferParents(parentFields, inputBufferField)
.add(
buffer(
@ -170,18 +141,18 @@ public final class DatasetCompiler {
if (plugin.hasFlush()) {
body.add(
callFilterFlush(
outputBuffer, fields.add(rubyCallsite(filter, FLUSH)), filterField,
fields, outputBuffer, fields.add(rubyCallsite(filter, FLUSH)), filterField,
!plugin.periodicFlush()
)
);
}
return compile(
return prepare(
body.add(SyntaxFactory.assignment(done, SyntaxFactory.TRUE))
.add(SyntaxFactory.ret(outputBuffer)),
Closure.wrap(
clearSyntax(parentFields), clear(outputBuffer),
SyntaxFactory.assignment(done, SyntaxFactory.FALSE)
), fields, DatasetFlavor.FILTER, configSource
), fields
);
}
@ -205,8 +176,8 @@ public final class DatasetCompiler {
Closure.wrap(
parentFields.stream().map(DatasetCompiler::computeDataset)
.toArray(MethodLevelSyntaxElement[]::new)
).add(clearSyntax(parentFields)), Closure.EMPTY, fields, "(terminal)"
);
).add(clearSyntax(parentFields)), Closure.EMPTY, fields
).instantiate();
} else if (count == 1) {
// No need for a terminal dataset here, if there is only a single parent node we can
// call it directly.
@ -232,16 +203,16 @@ public final class DatasetCompiler {
* every call to {@code compute}.
* @param parents Parent Datasets
* @param output Output Plugin (of Ruby type OutputDelegator)
* @param configSource The Logstash configuration that maps to the output Dataset
* @param terminal Set to true if this output is the only output in the pipeline
* @return Output Dataset
*/
public static Dataset outputDataset(final Collection<Dataset> parents, final IRubyObject output, String configSource,
public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<Dataset> parents,
final IRubyObject output,
final boolean terminal) {
final DynamicMethod method = rubyCallsite(output, MULTI_RECEIVE);
// Short-circuit trivial case of only output(s) in the pipeline
if (parents == ROOT_DATASETS) {
return outputDatasetFromRoot(output, method, configSource);
return outputDatasetFromRoot(output, method);
}
final ClassFields fields = new ClassFields();
final Collection<ValueSyntaxElement> parentFields =
@ -267,32 +238,30 @@ public final class DatasetCompiler {
clear(inputBuffer),
inlineClear
),
clearSyntax, fields, configSource
clearSyntax, fields
);
}
private static Closure returnBuffer(final MethodLevelSyntaxElement ifData,
private static Closure returnIffBuffered(final MethodLevelSyntaxElement ifData,
final MethodLevelSyntaxElement done) {
return Closure.wrap(
SyntaxFactory.ifCondition(done, Closure.wrap(SyntaxFactory.ret(ifData)))
);
}
private static MethodLevelSyntaxElement callFilterFlush(final ValueSyntaxElement resultBuffer,
final ValueSyntaxElement flushMethod, final ValueSyntaxElement filterPlugin,
final boolean shutdownOnly) {
private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields,
final ValueSyntaxElement resultBuffer, final ValueSyntaxElement flushMethod,
final ValueSyntaxElement filterPlugin, final boolean shutdownOnly) {
final MethodLevelSyntaxElement condition;
final ValueSyntaxElement flushArgs;
final ValueSyntaxElement flushFinal =
SyntaxFactory.constant(DatasetCompiler.class, "FLUSH_FINAL");
final ValueSyntaxElement flushFinal = fields.add(flushOpts(true));
if (shutdownOnly) {
condition = SyntaxFactory.and(FLUSH_ARG, SHUTDOWN_ARG);
flushArgs = flushFinal;
} else {
condition = FLUSH_ARG;
flushArgs = SyntaxFactory.ternary(
SHUTDOWN_ARG, flushFinal,
SyntaxFactory.constant(DatasetCompiler.class, "FLUSH_NOT_FINAL")
SHUTDOWN_ARG, flushFinal, fields.add(flushOpts(false))
);
}
return SyntaxFactory.ifCondition(
@ -317,10 +286,10 @@ public final class DatasetCompiler {
return parent.call("compute", BATCH_ARG, FLUSH_ARG, SHUTDOWN_ARG);
}
private static IRubyObject flushOpts(final boolean fin) {
private static IRubyObject[] flushOpts(final boolean fin) {
final RubyHash res = RubyHash.newHash(RubyUtil.RUBY);
res.put(RubyUtil.RUBY.newSymbol("final"), RubyUtil.RUBY.newBoolean(fin));
return res;
return new IRubyObject[]{res};
}
private static Closure bufferParents(final Collection<ValueSyntaxElement> parents,
@ -347,15 +316,14 @@ public final class DatasetCompiler {
/**
* Special case optimization for when the output plugin is directly connected to the Queue
* without any filters or conditionals in between. This special case does not arise naturally
* from {@link DatasetCompiler#outputDataset(Collection, IRubyObject, String, boolean)} since it saves
* from {@link DatasetCompiler#outputDataset(Collection, IRubyObject, boolean)} since it saves
* the internal buffering of events and instead forwards events directly from the batch to the
* Output plugin.
* @param output Output Plugin
* @param configSource The Logstash configuration that maps to the returned Dataset
* @return Dataset representing the Output
*/
private static Dataset outputDatasetFromRoot(final IRubyObject output,
final DynamicMethod method, String configSource) {
private static ComputeStepSyntaxElement<Dataset> outputDatasetFromRoot(final IRubyObject output,
final DynamicMethod method) {
final ClassFields fields = new ClassFields();
final ValueSyntaxElement args = fields.add(new IRubyObject[1]);
return compileOutput(
@ -363,22 +331,14 @@ public final class DatasetCompiler {
SyntaxFactory.assignment(SyntaxFactory.arrayField(args, 0), BATCH_ARG),
callRubyCallsite(fields.add(method), args, fields.add(output), MULTI_RECEIVE)
),
Closure.EMPTY, fields, configSource
Closure.EMPTY, fields
);
}
/**
* Get the generated source
* @return sorted and formatted lines of generated code
*/
public static List<String> getGeneratedSource() {
return ComputeStepSyntaxElement.getGeneratedSource();
}
private static Dataset compileOutput(final Closure syntax, final Closure clearSyntax,
final ClassFields fields, String configSource) {
return compile(
syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields, DatasetFlavor.OUTPUT, configSource
private static ComputeStepSyntaxElement<Dataset> compileOutput(final Closure syntax,
final Closure clearSyntax, final ClassFields fields) {
return prepare(
syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields
);
}

View file

@ -1,31 +0,0 @@
package org.logstash.config.ir.compiler;
import java.util.HashMap;
import java.util.Map;
/**
* Classloader capable of loading runtime compiled classes that were registered with it.
*/
final class DynamicClassLoader extends ClassLoader {
/**
* Map of classname to class for runtime compiled classes.
*/
private final Map<String, Class<?>> cache = new HashMap<>();
/**
* Register a runtime compiled class with this classloader.
* @param clazz Class to register
*/
public void addClass(final Class<?> clazz) {
cache.put(clazz.getName(), clazz);
}
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
if (cache.containsKey(name)) {
return cache.get(name);
}
return Thread.currentThread().getContextClassLoader().loadClass(name);
}
}

View file

@ -21,7 +21,7 @@ final class FieldDeclarationGroup implements SyntaxElement {
@Override
public String generateCode() {
return fields.isEmpty() ? "" : SyntaxFactory.join(fields.stream().map(
SyntaxElement::generateCode).collect(Collectors.joining(";")), ";"
SyntaxElement::generateCode).collect(Collectors.joining(";\n")), ";"
);
}
}

View file

@ -22,8 +22,7 @@ final class FieldDefinition implements SyntaxElement {
*/
public static FieldDefinition fromValue(final int index, final Object value) {
return new FieldDefinition(
new VariableDefinition(value.getClass(), field(index)), false,
null, value
variableDefinition(value.getClass(), index), false, null, value
);
}
@ -35,7 +34,7 @@ final class FieldDefinition implements SyntaxElement {
*/
public static FieldDefinition mutableUnassigned(final int index, final Class<?> type) {
return new FieldDefinition(
new VariableDefinition(type, field(index)), true, null, null
variableDefinition(type, index), true, null, null
);
}
@ -50,7 +49,7 @@ final class FieldDefinition implements SyntaxElement {
public static FieldDefinition withInitializer(final int index, final Class<?> type,
final SyntaxElement initializer) {
return new FieldDefinition(
new VariableDefinition(type, field(index)), false, initializer, null
variableDefinition(type, index), false, initializer, null
);
}
@ -91,12 +90,7 @@ final class FieldDefinition implements SyntaxElement {
);
}
/**
* Field Naming Schema.
* @param id Index for naming
* @return Field name
*/
private static String field(final int id) {
return String.format("field%d", id);
private static VariableDefinition variableDefinition(final Class<?> type, final int index) {
return new VariableDefinition(type, String.format("field%d", index));
}
}

View file

@ -148,7 +148,7 @@ final class SyntaxFactory {
public String generateCode() {
return join(
"for (", element.generateCode(), " : ",
iterable.generateCode(), ") {", body.generateCode(), "}"
iterable.generateCode(), ") {\n", body.generateCode(), "\n}"
);
}
};
@ -165,8 +165,9 @@ final class SyntaxFactory {
@Override
public String generateCode() {
return join(
"if(", condition.generateCode(), ") {", left.generateCode(),
"} else {", right.generateCode(), "}"
"if(", condition.generateCode(), ") {\n", left.generateCode(),
"\n}",
right.empty() ? "" : join(" else {\n", right.generateCode(), "\n}")
);
}

View file

@ -0,0 +1,196 @@
package org.logstash.config.ir;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.RubyUtil;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.ext.JrubyEventExtLibrary;
/**
* Tests for {@link CompiledPipeline}.
*/
public final class CompiledPipelineTest extends RubyEnvTestCase {
/**
* Globally accessible map of test run id to a queue of {@link JrubyEventExtLibrary.RubyEvent}
* that can be used by Ruby outputs.
*/
public static final Map<Long, Collection<JrubyEventExtLibrary.RubyEvent>> EVENT_SINKS =
new ConcurrentHashMap<>();
private static final AtomicLong TEST_RUN = new AtomicLong();
/**
* Unique identifier for this test run so that mock test outputs can correctly identify
* their event sink in {@link #EVENT_SINKS}.
*/
private long runId;
@Before
public void beforeEach() {
runId = TEST_RUN.incrementAndGet();
EVENT_SINKS.put(runId, new LinkedTransferQueue<>());
}
@After
public void afterEach() {
EVENT_SINKS.remove(runId);
}
@Test
public void buildsTrivialPipeline() throws Exception {
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
"input {mockinput{}} output{mockoutput{}}", false
);
final JrubyEventExtLibrary.RubyEvent testEvent =
JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());
new CompiledPipeline(pipelineIR,
new CompiledPipelineTest.MockPluginFactory(
Collections.singletonMap("mockinput", () -> null),
Collections.emptyMap(),
Collections.singletonMap("mockoutput", mockOutputSupplier())
)
).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false);
final Collection<JrubyEventExtLibrary.RubyEvent> outputEvents = EVENT_SINKS.get(runId);
MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1));
MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true));
}
@Test
public void buildsStraightPipeline() throws Exception {
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
"input {mockinput{}} filter { mockfilter {} mockfilter {} mockfilter {}} output{mockoutput{}}",
false
);
final JrubyEventExtLibrary.RubyEvent testEvent =
JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());
new CompiledPipeline(
pipelineIR,
new CompiledPipelineTest.MockPluginFactory(
Collections.singletonMap("mockinput", () -> null),
Collections.singletonMap("mockfilter", CompiledPipelineTest.IdentityFilter::new),
Collections.singletonMap("mockoutput", mockOutputSupplier())
)
).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false);
final Collection<JrubyEventExtLibrary.RubyEvent> outputEvents = EVENT_SINKS.get(runId);
MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1));
MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true));
}
private Supplier<IRubyObject> mockOutputSupplier() {
return () -> RubyUtil.RUBY.evalScriptlet(
String.join(
"\n",
"output = Object.new",
"output.define_singleton_method(:multi_receive) do |batch|",
String.format(
"batch.to_a.each {|e| org.logstash.config.ir.CompiledPipelineTest::EVENT_SINKS.get(%d).put(e)}",
runId
),
"end",
"output"
)
);
}
/**
* Configurable Mock {@link RubyIntegration.PluginFactory}
*/
private static final class MockPluginFactory implements RubyIntegration.PluginFactory {
private final Map<String, Supplier<IRubyObject>> inputs;
private final Map<String, Supplier<RubyIntegration.Filter>> filters;
private final Map<String, Supplier<IRubyObject>> outputs;
MockPluginFactory(final Map<String, Supplier<IRubyObject>> inputs,
final Map<String, Supplier<RubyIntegration.Filter>> filters,
final Map<String, Supplier<IRubyObject>> outputs) {
this.inputs = inputs;
this.filters = filters;
this.outputs = outputs;
}
@Override
public IRubyObject buildInput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return setupPlugin(name, inputs);
}
@Override
public IRubyObject buildOutput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return setupPlugin(name, outputs);
}
@Override
public RubyIntegration.Filter buildFilter(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return setupPlugin(name, filters);
}
@Override
public RubyIntegration.Filter buildCodec(final RubyString name, final IRubyObject args) {
throw new IllegalStateException("No codec setup expected in this test.");
}
private static <T> T setupPlugin(final RubyString name,
final Map<String, Supplier<T>> suppliers) {
final String key = name.asJavaString();
if (!suppliers.containsKey(key)) {
throw new IllegalStateException(
String.format("Tried to set up unexpected plugin %s.", key)
);
}
return suppliers.get(name.asJavaString()).get();
}
}
/**
* Mock filter that does not modify the batch.
*/
private static final class IdentityFilter implements RubyIntegration.Filter {
@Override
public IRubyObject toRuby() {
return RubyUtil.RUBY.evalScriptlet(
String.join(
"\n",
"output = Object.new",
"output.define_singleton_method(:multi_filter) do |batch|",
"batch",
"end",
"output"
)
);
}
@Override
public boolean hasFlush() {
return false;
}
@Override
public boolean periodicFlush() {
return false;
}
@Override
public void register() {
}
}
}

View file

@ -2,25 +2,14 @@ package org.logstash.config.ir;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.jruby.RubyHash;
import org.jruby.runtime.load.LoadService;
import org.junit.BeforeClass;
import org.junit.Test;
import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.config.ir.graph.Graph;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class ConfigCompilerTest {
@BeforeClass
public static void before() {
ensureLoadpath();
}
public class ConfigCompilerTest extends RubyEnvTestCase {
@Test
public void testConfigToPipelineIR() throws Exception {
@ -75,24 +64,4 @@ public class ConfigCompilerTest {
throws IncompleteSourceWithMetadataException {
return ConfigCompiler.configToPipelineIR(config, false).uniqueHash();
}
/**
* Loads the logstash-core/lib path if the load service can't find {@code logstash/compiler}
* because {@code environment.rb} hasn't been loaded yet.
*/
private static void ensureLoadpath() {
final LoadService loader = RubyUtil.RUBY.getLoadService();
if (loader.findFileForLoad("logstash/compiler").library == null) {
final RubyHash environment = RubyUtil.RUBY.getENV();
final Path root = Paths.get(
System.getProperty("logstash.core.root.dir", "")
).toAbsolutePath();
final String gems = root.getParent().resolve("vendor").resolve("bundle")
.resolve("jruby").resolve("2.3.0").toFile().getAbsolutePath();
environment.put("GEM_HOME", gems);
environment.put("GEM_PATH", gems);
loader.addPaths(root.resolve("lib").toFile().getAbsolutePath()
);
}
}
}

View file

@ -0,0 +1,36 @@
package org.logstash.config.ir;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.jruby.RubyHash;
import org.jruby.runtime.load.LoadService;
import org.junit.BeforeClass;
import org.logstash.RubyUtil;
public abstract class RubyEnvTestCase {
@BeforeClass
public static void before() {
ensureLoadpath();
}
/**
* Loads the logstash-core/lib path if the load service can't find {@code logstash/compiler}
* because {@code environment.rb} hasn't been loaded yet.
*/
private static void ensureLoadpath() {
final LoadService loader = RubyUtil.RUBY.getLoadService();
if (loader.findFileForLoad("logstash/compiler").library == null) {
final RubyHash environment = RubyUtil.RUBY.getENV();
final Path root = Paths.get(
System.getProperty("logstash.core.root.dir", "")
).toAbsolutePath();
final String gems = root.getParent().resolve("vendor").resolve("bundle")
.resolve("jruby").resolve("2.3.0").toFile().getAbsolutePath();
environment.put("GEM_HOME", gems);
environment.put("GEM_PATH", gems);
loader.addPaths(root.resolve("lib").toFile().getAbsolutePath()
);
}
}
}

View file

@ -17,10 +17,10 @@ public final class DatasetCompilerTest {
@Test
public void compilesEmptyMethod() {
final Dataset func = DatasetCompiler.compile(
final Dataset func = DatasetCompiler.prepare(
Closure.wrap(SyntaxFactory.ret(DatasetCompiler.BATCH_ARG.call("to_a"))),
Closure.EMPTY, new ClassFields(), DatasetCompiler.DatasetFlavor.ROOT, "foo"
);
Closure.EMPTY, new ClassFields()
).instantiate();
final RubyArray batch = RubyUtil.RUBY.newArray();
assertThat(func.compute(batch, false, false), is(batch));
}
@ -33,7 +33,7 @@ public final class DatasetCompilerTest {
final VariableDefinition eventsDef = new VariableDefinition(Collection.class, "events");
final ValueSyntaxElement events = eventsDef.access();
final ClassFields fields = new ClassFields();
final Dataset func = DatasetCompiler.compile(
final Dataset func = DatasetCompiler.prepare(
Closure.wrap(
SyntaxFactory.definition(eventsDef, DatasetCompiler.BATCH_ARG.call("to_a")),
events.call(
@ -44,8 +44,8 @@ public final class DatasetCompilerTest {
),
SyntaxFactory.ret(events)
),
Closure.EMPTY, fields, DatasetCompiler.DatasetFlavor.ROOT, "foo"
);
Closure.EMPTY, fields
).instantiate();
assertThat(func.compute(batch, false, false).size(), is(2));
}
@ -60,8 +60,8 @@ public final class DatasetCompilerTest {
RubyUtil.RUBY.evalScriptlet(
"output = Object.new\noutput.define_singleton_method(:multi_receive) do |batch|\nend\noutput"
),
"foo", true
).compute(RubyUtil.RUBY.newArray(), false, false),
true
).instantiate().compute(RubyUtil.RUBY.newArray(), false, false),
nullValue()
);
}
@ -69,9 +69,9 @@ public final class DatasetCompilerTest {
@Test
public void compilesSplitDataset() {
final FieldReference key = FieldReference.from("foo");
final EventCondition condition = event -> event.getEvent().includes(key);
final SplitDataset left =
DatasetCompiler.splitDataset(DatasetCompiler.ROOT_DATASETS, condition, "foo");
final SplitDataset left = DatasetCompiler.splitDataset(
DatasetCompiler.ROOT_DATASETS, event -> event.getEvent().includes(key)
).instantiate();
final Event trueEvent = new Event();
trueEvent.setField(key, "val");
final JrubyEventExtLibrary.RubyEvent falseEvent =
@ -98,7 +98,8 @@ public final class DatasetCompilerTest {
)
).generateCode(),
is(
String.join("",
String.join(
"\n",
"org.jruby.runtime.ThreadContext context=org.logstash.RubyUtil.RUBY.getCurrentContext();",
"org.jruby.runtime.ThreadContext context1=context;",
"org.jruby.runtime.ThreadContext context2=context;"