mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
fc6d8a43e9
commit
e9ba86e3b6
8 changed files with 131 additions and 21 deletions
|
@ -371,6 +371,9 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
|
||||
pipeline_workers.times do |t|
|
||||
batched_execution = @lir_execution.buildExecution
|
||||
if t.eql? 0
|
||||
@logger.debug ("Generated Java pipeline entry class: " + batched_execution.class.to_s)
|
||||
end
|
||||
thread = Thread.new(self, batched_execution) do |_pipeline, _batched_execution|
|
||||
_pipeline.worker_loop(_batched_execution)
|
||||
end
|
||||
|
@ -378,6 +381,12 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
@worker_threads << thread
|
||||
end
|
||||
|
||||
if @logger.debug? || @logger.trace?
|
||||
@lir_execution.getGeneratedSource.each do |line|
|
||||
@logger.debug line
|
||||
end
|
||||
end
|
||||
|
||||
# inputs should be started last, after all workers
|
||||
begin
|
||||
start_inputs
|
||||
|
|
|
@ -4,6 +4,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -92,6 +93,14 @@ public final class CompiledPipeline {
|
|||
return new CompiledPipeline.CompiledExecution().toDataset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the generated source
|
||||
* @return sorted and formatted lines of generated code
|
||||
*/
|
||||
public List<String> getGeneratedSource(){
|
||||
return DatasetCompiler.getGeneratedSource();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up all Ruby outputs learnt from {@link PipelineIR}.
|
||||
*/
|
||||
|
|
|
@ -78,7 +78,7 @@ final class Closure implements MethodLevelSyntaxElement {
|
|||
this.optimizeRubyThreadContexts().statements;
|
||||
return optimized.isEmpty() ? "" : SyntaxFactory.join(
|
||||
optimized.stream().map(MethodLevelSyntaxElement::generateCode).collect(
|
||||
Collectors.joining(";\n")
|
||||
Collectors.joining(";")
|
||||
), ";"
|
||||
);
|
||||
}
|
||||
|
|
|
@ -6,11 +6,16 @@ import java.lang.reflect.InvocationTargetException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.codehaus.commons.compiler.CompileException;
|
||||
import org.codehaus.janino.ClassBodyEvaluator;
|
||||
|
||||
|
@ -20,6 +25,7 @@ import org.codehaus.janino.ClassBodyEvaluator;
|
|||
*/
|
||||
final class ComputeStepSyntaxElement implements SyntaxElement {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ComputeStepSyntaxElement.class);
|
||||
private static final DynamicClassLoader CLASS_LOADER = new DynamicClassLoader();
|
||||
|
||||
/**
|
||||
|
@ -28,20 +34,78 @@ final class ComputeStepSyntaxElement implements SyntaxElement {
|
|||
private static final Map<ComputeStepSyntaxElement, Class<? extends Dataset>> CLASS_CACHE
|
||||
= new HashMap<>();
|
||||
|
||||
private static final Map<String, String> SOURCE_CACHE = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Sequence number to ensure unique naming for runtime compiled classes.
|
||||
*/
|
||||
private static final AtomicInteger SEQUENCE = new AtomicInteger(0);
|
||||
|
||||
private static final int INDENT_WIDTH = 4;
|
||||
|
||||
private final String name;
|
||||
|
||||
private final Iterable<MethodSyntaxElement> methods;
|
||||
|
||||
private final ClassFields fields;
|
||||
|
||||
/**
|
||||
* Get the generated source
|
||||
* @return sorted and formatted lines of generated code
|
||||
*/
|
||||
public static List<String> getGeneratedSource() {
|
||||
List<String> output = new ArrayList<>();
|
||||
SOURCE_CACHE.forEach((k, v) -> {
|
||||
output.add(String.format("class %s {", k));
|
||||
LOGGER.trace("{}:{}", k, v);
|
||||
getFormattedLines(v, output, INDENT_WIDTH);
|
||||
output.add("}");
|
||||
});
|
||||
return output;
|
||||
}
|
||||
|
||||
private static List<String> getFormattedLines(String input, List<String> output, int indent) {
|
||||
int curlyOpen = input.indexOf("{");
|
||||
int curlyClose = input.indexOf("}");
|
||||
int semiColon = input.indexOf(";");
|
||||
|
||||
List<Integer> positions = Arrays.asList(curlyOpen, curlyClose, semiColon);
|
||||
positions.sort(Comparator.naturalOrder());
|
||||
Optional<Integer> firstMatch = positions.stream().filter(i -> i >= 0).findFirst();
|
||||
|
||||
if (firstMatch.isPresent()) {
|
||||
int pos = firstMatch.get();
|
||||
int preIndent = indent;
|
||||
int postIndent = indent;
|
||||
if (pos == curlyOpen) {
|
||||
postIndent += INDENT_WIDTH;
|
||||
} else if (pos == curlyClose) {
|
||||
preIndent -= INDENT_WIDTH;
|
||||
postIndent = preIndent;
|
||||
}
|
||||
|
||||
if (input.trim().length() > 0) {
|
||||
String sub = input.substring(0, pos + 1);
|
||||
if (!sub.equals(";")) {
|
||||
output.add(String.format("%" + preIndent + "s%s", " ", sub));
|
||||
}
|
||||
}
|
||||
return getFormattedLines(input.substring(pos + 1), output, postIndent);
|
||||
}
|
||||
|
||||
if (input.trim().equals("}")) {
|
||||
indent -= INDENT_WIDTH;
|
||||
}
|
||||
|
||||
if (input.trim().length() > 0 && !input.trim().equals(";")) {
|
||||
output.add(String.format("%" + indent + "s%s", " ", input));
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
ComputeStepSyntaxElement(final Iterable<MethodSyntaxElement> methods,
|
||||
final ClassFields fields) {
|
||||
this(String.format("CompiledDataset%d", SEQUENCE.incrementAndGet()), methods, fields);
|
||||
final ClassFields fields, DatasetCompiler.DatasetFlavor datasetFlavor) {
|
||||
this(String.format("Generated%d_" + datasetFlavor.getDisplay() + "Dataset", SEQUENCE.incrementAndGet()), methods, fields);
|
||||
}
|
||||
|
||||
private ComputeStepSyntaxElement(final String name, final Iterable<MethodSyntaxElement> methods,
|
||||
|
@ -61,7 +125,10 @@ final class ComputeStepSyntaxElement implements SyntaxElement {
|
|||
se.setParentClassLoader(CLASS_LOADER);
|
||||
se.setImplementedInterfaces(new Class[]{interfce});
|
||||
se.setClassName(name);
|
||||
se.cook(new StringReader(generateCode()));
|
||||
String code = generateCode();
|
||||
SOURCE_CACHE.put(name, generateCode());
|
||||
se.cook(new StringReader(code));
|
||||
se.toString();
|
||||
clazz = (Class<T>) se.getClazz();
|
||||
CLASS_LOADER.addClass(clazz);
|
||||
CLASS_CACHE.put(this, clazz);
|
||||
|
@ -145,14 +212,13 @@ final class ComputeStepSyntaxElement implements SyntaxElement {
|
|||
}
|
||||
|
||||
/**
|
||||
* Renders the string concatenation of the given {@link SyntaxElement}, delimited by
|
||||
* line breaks.
|
||||
* Renders the string concatenation of the given {@link SyntaxElement}
|
||||
* @param parts Elements to concatenate
|
||||
* @return Java source
|
||||
*/
|
||||
private static String combine(final SyntaxElement... parts) {
|
||||
return Arrays.stream(parts).map(SyntaxElement::generateCode)
|
||||
.collect(Collectors.joining("\n"));
|
||||
.collect(Collectors.joining(""));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyHash;
|
||||
|
@ -41,12 +42,28 @@ public final class DatasetCompiler {
|
|||
public static final SyntaxFactory.IdentifierStatement BATCH_ARG =
|
||||
SyntaxFactory.identifier("batchArg");
|
||||
|
||||
/**
|
||||
* Hint to the intended purpose of a Dataset.
|
||||
*/
|
||||
enum DatasetFlavor {
|
||||
ROOT("Root"), FILTER("Filter"), OUTPUT("Output"), CONDITIONAL("Conditional");
|
||||
private final String display;
|
||||
|
||||
DatasetFlavor(final String display) {
|
||||
this.display = display;
|
||||
}
|
||||
|
||||
String getDisplay() {
|
||||
return display;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Root {@link Dataset}s at the beginning of the execution tree that simply pass through
|
||||
* the given set of {@link JrubyEventExtLibrary.RubyEvent} and have no state.
|
||||
*/
|
||||
public static final Collection<Dataset> ROOT_DATASETS = Collections.singleton(
|
||||
compile(Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields())
|
||||
compile(Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields(), DatasetFlavor.ROOT)
|
||||
);
|
||||
|
||||
private DatasetCompiler() {
|
||||
|
@ -60,13 +77,15 @@ public final class DatasetCompiler {
|
|||
* @param compute Method body of {@link Dataset#compute(RubyArray, boolean, boolean)}
|
||||
* @param clear Method body of {@link Dataset#clear()}
|
||||
* @param fieldValues Constructor Arguments
|
||||
* @param datasetFlavor The flavor of {@link Dataset} to compile.
|
||||
* This is only helpful for human debugging to differentiate between the intended usage of the {@link Dataset}
|
||||
* @return Dataset Instance
|
||||
*/
|
||||
public static synchronized Dataset compile(final Closure compute, final Closure clear,
|
||||
final ClassFields fieldValues) {
|
||||
final ClassFields fieldValues, final DatasetFlavor datasetFlavor) {
|
||||
return new ComputeStepSyntaxElement(
|
||||
Arrays.asList(MethodSyntaxElement.compute(compute), MethodSyntaxElement.clear(clear)),
|
||||
fieldValues
|
||||
fieldValues, datasetFlavor
|
||||
).instantiate(Dataset.class);
|
||||
}
|
||||
|
||||
|
@ -108,7 +127,7 @@ public final class DatasetCompiler {
|
|||
.add(SyntaxFactory.assignment(done, SyntaxFactory.FALSE))
|
||||
),
|
||||
MethodSyntaxElement.right(elseData)
|
||||
), fields
|
||||
), fields, DatasetFlavor.CONDITIONAL
|
||||
).instantiate(SplitDataset.class);
|
||||
}
|
||||
|
||||
|
@ -160,7 +179,7 @@ public final class DatasetCompiler {
|
|||
Closure.wrap(
|
||||
clearSyntax(parentFields), clear(outputBuffer),
|
||||
SyntaxFactory.assignment(done, SyntaxFactory.FALSE)
|
||||
), fields
|
||||
), fields, DatasetFlavor.FILTER
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -344,10 +363,18 @@ public final class DatasetCompiler {
|
|||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the generated source
|
||||
* @return sorted and formatted lines of generated code
|
||||
*/
|
||||
public static List<String> getGeneratedSource() {
|
||||
return ComputeStepSyntaxElement.getGeneratedSource();
|
||||
}
|
||||
|
||||
private static Dataset compileOutput(final Closure syntax, final Closure clearSyntax,
|
||||
final ClassFields fields) {
|
||||
return compile(
|
||||
syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields
|
||||
syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields, DatasetFlavor.OUTPUT
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ final class FieldDeclarationGroup implements SyntaxElement {
|
|||
@Override
|
||||
public String generateCode() {
|
||||
return fields.isEmpty() ? "" : SyntaxFactory.join(fields.stream().map(
|
||||
SyntaxElement::generateCode).collect(Collectors.joining(";\n")), ";"
|
||||
SyntaxElement::generateCode).collect(Collectors.joining(";")), ";"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ final class SyntaxFactory {
|
|||
public String generateCode() {
|
||||
return join(
|
||||
"for (", element.generateCode(), " : ",
|
||||
iterable.generateCode(), ") {\n", body.generateCode(), "\n}"
|
||||
iterable.generateCode(), ") {", body.generateCode(), "}"
|
||||
);
|
||||
}
|
||||
};
|
||||
|
@ -165,8 +165,8 @@ final class SyntaxFactory {
|
|||
@Override
|
||||
public String generateCode() {
|
||||
return join(
|
||||
"if(", condition.generateCode(), ") {\n", left.generateCode(),
|
||||
"\n} else {\n", right.generateCode(), "\n}"
|
||||
"if(", condition.generateCode(), ") {", left.generateCode(),
|
||||
"} else {", right.generateCode(), "}"
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ public final class DatasetCompilerTest {
|
|||
public void compilesEmptyMethod() {
|
||||
final Dataset func = DatasetCompiler.compile(
|
||||
Closure.wrap(SyntaxFactory.ret(DatasetCompiler.BATCH_ARG.call("to_a"))),
|
||||
Closure.EMPTY, new ClassFields()
|
||||
Closure.EMPTY, new ClassFields(), DatasetCompiler.DatasetFlavor.ROOT
|
||||
);
|
||||
final RubyArray batch = RubyUtil.RUBY.newArray();
|
||||
assertThat(func.compute(batch, false, false), is(batch));
|
||||
|
@ -44,7 +44,7 @@ public final class DatasetCompilerTest {
|
|||
),
|
||||
SyntaxFactory.ret(events)
|
||||
),
|
||||
Closure.EMPTY, fields
|
||||
Closure.EMPTY, fields, DatasetCompiler.DatasetFlavor.ROOT
|
||||
);
|
||||
assertThat(func.compute(batch, false, false).size(), is(2));
|
||||
}
|
||||
|
@ -98,8 +98,7 @@ public final class DatasetCompilerTest {
|
|||
)
|
||||
).generateCode(),
|
||||
is(
|
||||
String.join(
|
||||
"\n",
|
||||
String.join("",
|
||||
"org.jruby.runtime.ThreadContext context=org.logstash.RubyUtil.RUBY.getCurrentContext();",
|
||||
"org.jruby.runtime.ThreadContext context1=context;",
|
||||
"org.jruby.runtime.ThreadContext context2=context;"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue