mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
1ed4f018a1
commit
4bdff7a9fc
2 changed files with 39 additions and 25 deletions
|
@ -208,16 +208,8 @@ public final class DatasetCompiler {
|
|||
final ValueSyntaxElement ifData, final ValueSyntaxElement elseData) {
|
||||
final ValueSyntaxElement eventVal = event.access();
|
||||
return Closure.wrap(
|
||||
SyntaxFactory.forLoop(
|
||||
event, inputBuffer,
|
||||
Closure.wrap(
|
||||
SyntaxFactory.ifCondition(
|
||||
condition.call("fulfilled", eventVal),
|
||||
Closure.wrap(ifData.call("add", eventVal)),
|
||||
Closure.wrap(elseData.call("add", eventVal))
|
||||
)
|
||||
)
|
||||
)
|
||||
SyntaxFactory.value("org.logstash.config.ir.compiler.Utils")
|
||||
.call("filterEvents", inputBuffer, condition, ifData, elseData)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -245,22 +237,10 @@ public final class DatasetCompiler {
|
|||
*/
|
||||
private static Closure withInputBuffering(final Closure compute,
|
||||
final Collection<ValueSyntaxElement> parents, final ValueSyntaxElement inputBuffer) {
|
||||
final VariableDefinition event =
|
||||
new VariableDefinition(JrubyEventExtLibrary.RubyEvent.class, "e");
|
||||
final ValueSyntaxElement eventVar = event.access();
|
||||
return Closure.wrap(
|
||||
parents.stream().map(par ->
|
||||
SyntaxFactory.forLoop(
|
||||
event, computeDataset(par),
|
||||
Closure.wrap(
|
||||
SyntaxFactory.ifCondition(
|
||||
SyntaxFactory.not(
|
||||
eventVar.call("getEvent").call("isCancelled")
|
||||
), Closure.wrap(inputBuffer.call("add", eventVar))
|
||||
)
|
||||
)
|
||||
)
|
||||
).toArray(MethodLevelSyntaxElement[]::new)
|
||||
parents.stream().map(par -> SyntaxFactory.value("org.logstash.config.ir.compiler.Utils")
|
||||
.call("copyNonCancelledEvents", computeDataset(par), inputBuffer)
|
||||
).toArray(MethodLevelSyntaxElement[]::new)
|
||||
).add(compute).add(clear(inputBuffer));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package org.logstash.config.ir.compiler;
|
||||
|
||||
import org.jruby.RubyArray;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Static utility methods that replace common blocks of generated code in the Java execution.
|
||||
*/
|
||||
public class Utils {
|
||||
|
||||
// has field1.compute(batchArg, flushArg, shutdownArg) passed as input
|
||||
public static void copyNonCancelledEvents(Collection<JrubyEventExtLibrary.RubyEvent> input, RubyArray output) {
|
||||
for (JrubyEventExtLibrary.RubyEvent e : input) {
|
||||
if (!(e.getEvent().isCancelled())) {
|
||||
output.add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void filterEvents(Collection<JrubyEventExtLibrary.RubyEvent> input, EventCondition filter,
|
||||
ArrayList fulfilled, ArrayList unfulfilled) {
|
||||
for (JrubyEventExtLibrary.RubyEvent e : input) {
|
||||
if (filter.fulfilled(e)) {
|
||||
fulfilled.add(e);
|
||||
} else {
|
||||
unfulfilled.add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue