mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
c0a355f6ab
commit
fcd04658b0
12 changed files with 370 additions and 187 deletions
|
@ -110,6 +110,7 @@ dependencies {
|
|||
compile "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
|
||||
compile "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
|
||||
compile "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
|
||||
compile 'org.codehaus.janino:janino:3.0.7'
|
||||
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jacksonVersion}"
|
||||
testCompile 'org.apache.logging.log4j:log4j-core:2.6.2:tests'
|
||||
testCompile 'org.apache.logging.log4j:log4j-api:2.6.2:tests'
|
||||
|
|
|
@ -8,4 +8,5 @@ gem.requirements << "jar org.apache.logging.log4j:log4j-core, 2.6.2"
|
|||
gem.requirements << "jar com.fasterxml.jackson.core:jackson-core, 2.9.1"
|
||||
gem.requirements << "jar com.fasterxml.jackson.core:jackson-databind, 2.9.1"
|
||||
gem.requirements << "jar com.fasterxml.jackson.core:jackson-annotations, 2.9.1"
|
||||
gem.requirements << "jar org.codehaus.janino:janino, 3.0.7"
|
||||
gem.requirements << "jar com.fasterxml.jackson.dataformat:jackson-dataformat-cbor, 2.9.1"
|
||||
|
|
|
@ -9,7 +9,9 @@ rescue LoadError
|
|||
require 'com/fasterxml/jackson/core/jackson-annotations/2.9.1/jackson-annotations-2.9.1.jar'
|
||||
require 'org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar'
|
||||
require 'com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.9.1/jackson-dataformat-cbor-2.9.1.jar'
|
||||
require 'org/codehaus/janino/commons-compiler/3.0.7/commons-compiler-3.0.7.jar'
|
||||
require 'com/fasterxml/jackson/core/jackson-core/2.9.1/jackson-core-2.9.1.jar'
|
||||
require 'org/codehaus/janino/janino/3.0.7/janino-3.0.7.jar'
|
||||
end
|
||||
|
||||
if defined? Jars
|
||||
|
@ -20,5 +22,7 @@ if defined? Jars
|
|||
require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.9.1' )
|
||||
require_jar( 'org.apache.logging.log4j', 'log4j-slf4j-impl', '2.6.2' )
|
||||
require_jar( 'com.fasterxml.jackson.dataformat', 'jackson-dataformat-cbor', '2.9.1' )
|
||||
require_jar( 'org.codehaus.janino', 'commons-compiler', '3.0.7' )
|
||||
require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.9.1' )
|
||||
require_jar( 'org.codehaus.janino', 'janino', '3.0.7' )
|
||||
end
|
||||
|
|
|
@ -473,7 +473,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
# for this we need to create a new empty batch to contain the final flushed events
|
||||
batch = @filter_queue_client.new_batch
|
||||
@filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
|
||||
batched_execution.compute(batch, true, true)
|
||||
batched_execution.compute(batch.to_a, true, true)
|
||||
@filter_queue_client.close_batch(batch)
|
||||
end
|
||||
|
||||
|
@ -701,7 +701,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
private
|
||||
|
||||
def execute_batch(batched_execution, batch, flush)
|
||||
batched_execution.compute(batch, flush, false)
|
||||
batched_execution.compute(batch.to_a, flush, false)
|
||||
@events_filtered.increment(batch.size)
|
||||
filtered_size = batch.filtered_size
|
||||
@filter_queue_client.add_output_metrics(filtered_size)
|
||||
|
|
|
@ -5,7 +5,6 @@ require "logstash/output_delegator_strategies/single"
|
|||
require "logstash/output_delegator_strategies/legacy"
|
||||
|
||||
module LogStash class OutputDelegator
|
||||
include org.logstash.config.ir.compiler.RubyIntegration::Output
|
||||
attr_reader :metric, :metric_events, :strategy, :namespaced_metric, :metric_events, :id
|
||||
|
||||
def initialize(logger, output_class, metric, execution_context, strategy_registry, plugin_args)
|
||||
|
|
|
@ -227,7 +227,6 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
class ReadBatch
|
||||
include org.logstash.config.ir.compiler.RubyIntegration::Batch
|
||||
def initialize(queue, size, wait)
|
||||
@queue = queue
|
||||
@size = size
|
||||
|
|
|
@ -155,7 +155,6 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
class ReadBatch
|
||||
include org.logstash.config.ir.compiler.RubyIntegration::Batch
|
||||
def initialize(queue, size, wait)
|
||||
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
|
||||
# @cancelled = Hash.new
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.logstash.RubyUtil;
|
|||
import org.logstash.Rubyfier;
|
||||
import org.logstash.common.SourceWithMetadata;
|
||||
import org.logstash.config.ir.compiler.Dataset;
|
||||
import org.logstash.config.ir.compiler.DatasetCompiler;
|
||||
import org.logstash.config.ir.compiler.EventCondition;
|
||||
import org.logstash.config.ir.compiler.RubyIntegration;
|
||||
import org.logstash.config.ir.graph.IfVertex;
|
||||
|
@ -54,7 +55,7 @@ public final class CompiledPipeline {
|
|||
/**
|
||||
* Configured outputs.
|
||||
*/
|
||||
private final Map<String, RubyIntegration.Output> outputs;
|
||||
private final Map<String, IRubyObject> outputs;
|
||||
|
||||
/**
|
||||
* Parsed pipeline configuration graph.
|
||||
|
@ -90,7 +91,7 @@ public final class CompiledPipeline {
|
|||
return periodicFlushes;
|
||||
}
|
||||
|
||||
public Collection<RubyIntegration.Output> outputs() {
|
||||
public Collection<IRubyObject> outputs() {
|
||||
return Collections.unmodifiableCollection(outputs.values());
|
||||
}
|
||||
|
||||
|
@ -119,9 +120,9 @@ public final class CompiledPipeline {
|
|||
/**
|
||||
* Sets up all Ruby outputs learnt from {@link PipelineIR}.
|
||||
*/
|
||||
private Map<String, RubyIntegration.Output> setupOutputs() {
|
||||
private Map<String, IRubyObject> setupOutputs() {
|
||||
final Collection<PluginVertex> outs = pipelineIR.getOutputPluginVertices();
|
||||
final Map<String, RubyIntegration.Output> res = new HashMap<>(outs.size());
|
||||
final Map<String, IRubyObject> res = new HashMap<>(outs.size());
|
||||
outs.forEach(v -> {
|
||||
final PluginDefinition def = v.getPluginDefinition();
|
||||
final SourceWithMetadata source = v.getSourceWithMetadata();
|
||||
|
@ -214,9 +215,9 @@ public final class CompiledPipeline {
|
|||
}
|
||||
|
||||
/**
|
||||
* Checks if a certain {@link Vertex} represents a {@link RubyIntegration.Output}.
|
||||
* Checks if a certain {@link Vertex} represents an output.
|
||||
* @param vertex Vertex to check
|
||||
* @return True iff {@link Vertex} represents a {@link RubyIntegration.Output}
|
||||
* @return True iff {@link Vertex} represents an output
|
||||
*/
|
||||
private boolean isOutput(final Vertex vertex) {
|
||||
return outputs.containsKey(vertex.getId());
|
||||
|
@ -272,7 +273,7 @@ public final class CompiledPipeline {
|
|||
outputDataset(leaf.getId(), flatten(Dataset.ROOT_DATASETS, leaf))
|
||||
)
|
||||
);
|
||||
return Dataset.TerminalDataset.from(datasets);
|
||||
return DatasetCompiler.terminalDataset(datasets);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -309,7 +310,9 @@ public final class CompiledPipeline {
|
|||
*/
|
||||
private Dataset outputDataset(final String vertexId, final Collection<Dataset> datasets) {
|
||||
return plugins.computeIfAbsent(
|
||||
vertexId, v -> new Dataset.OutputDataset(datasets, outputs.get(v))
|
||||
vertexId, v -> DatasetCompiler.outputDataset(
|
||||
datasets, outputs.get(v), outputs.size() == 1
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.logstash.RubyUtil;
|
|||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
/**
|
||||
* <p>A trueData structure backed by a {@link RubyArray} that represents one step of execution flow of a
|
||||
* <p>A data structure backed by a {@link RubyArray} that represents one step of execution flow of a
|
||||
* batch is lazily filled with {@link JrubyEventExtLibrary.RubyEvent} computed from its dependent
|
||||
* {@link Dataset}.</p>
|
||||
* <p>Each {@link Dataset} either represents a filter, output or one branch of an {@code if}
|
||||
|
@ -32,7 +32,7 @@ public interface Dataset {
|
|||
* the pipeline it belongs to is shut down
|
||||
* @return Computed {@link RubyArray} of {@link JrubyEventExtLibrary.RubyEvent}
|
||||
*/
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> compute(RubyIntegration.Batch batch,
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> compute(RubyArray batch,
|
||||
boolean flush, boolean shutdown);
|
||||
|
||||
/**
|
||||
|
@ -46,93 +46,9 @@ public interface Dataset {
|
|||
* the given set of {@link JrubyEventExtLibrary.RubyEvent} and have no state.
|
||||
*/
|
||||
Collection<Dataset> ROOT_DATASETS = Collections.singleton(
|
||||
new Dataset() {
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(
|
||||
final RubyIntegration.Batch batch, final boolean flush, final boolean shutdown) {
|
||||
return batch.to_a();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
}
|
||||
}
|
||||
DatasetCompiler.compile("return batch;", "")
|
||||
);
|
||||
|
||||
/**
|
||||
* <p>{@link Dataset} that contains all {@link JrubyEventExtLibrary.RubyEvent} instances of all
|
||||
* from its dependencies and is assumed to be at the end of an execution.</p>
|
||||
* This dataset does not require an explicit call to {@link Dataset#clear()} since it will
|
||||
* automatically {@code clear} all of its parents.
|
||||
*/
|
||||
final class TerminalDataset implements Dataset {
|
||||
|
||||
/**
|
||||
* Empty {@link Collection} returned by this class's
|
||||
* {@link Dataset#compute(RubyIntegration.Batch, boolean, boolean)} implementation.
|
||||
*/
|
||||
private static final Collection<JrubyEventExtLibrary.RubyEvent> EMPTY_RETURN =
|
||||
Collections.emptyList();
|
||||
|
||||
/**
|
||||
* Trivial {@link Dataset} that simply returns an empty collection of elements.
|
||||
*/
|
||||
private static final Dataset EMPTY_DATASET = new Dataset() {
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(
|
||||
final RubyIntegration.Batch batch, final boolean flush, final boolean shutdown) {
|
||||
return EMPTY_RETURN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
}
|
||||
};
|
||||
|
||||
private final Collection<Dataset> parents;
|
||||
|
||||
/**
|
||||
* <p>Builds a terminal {@link Dataset} from the given parent {@link Dataset}s.</p>
|
||||
* <p>If the given set of parent {@link Dataset} is empty the sum is defined as the
|
||||
* trivial dataset that does not invoke any computation whatsoever.</p>
|
||||
* {@link Dataset#compute(RubyIntegration.Batch, boolean, boolean)} is always
|
||||
* {@link Collections#emptyList()}.
|
||||
* @param parents Parent {@link Dataset} to sum and terminate
|
||||
* @return Dataset representing the sum of given parent {@link Dataset}
|
||||
*/
|
||||
public static Dataset from(final Collection<Dataset> parents) {
|
||||
final int count = parents.size();
|
||||
final Dataset result;
|
||||
if (count > 0) {
|
||||
result = new Dataset.TerminalDataset(parents);
|
||||
} else {
|
||||
result = EMPTY_DATASET;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private TerminalDataset(final Collection<Dataset> parents) {
|
||||
this.parents = parents;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
parents.forEach(dataset -> dataset.compute(batch, flush, shutdown));
|
||||
this.clear();
|
||||
return EMPTY_RETURN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
for (final Dataset parent : parents) {
|
||||
parent.clear();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Dataset} that results from the {@code if} branch of its backing
|
||||
* {@link EventCondition} being applied to all of its dependencies.
|
||||
|
@ -162,7 +78,7 @@ public interface Dataset {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyArray batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return trueData;
|
||||
|
@ -207,7 +123,7 @@ public interface Dataset {
|
|||
private final Dataset parent;
|
||||
|
||||
/**
|
||||
* This collection is shared with {@link Dataset.SplitDataset.Complement#parent} and
|
||||
* This collection is shared with {@link Dataset.SplitDataset.Complement#parent} and
|
||||
* mutated when calling its {@code compute} method. This class does not directly compute
|
||||
* it.
|
||||
*/
|
||||
|
@ -229,7 +145,7 @@ public interface Dataset {
|
|||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(
|
||||
final RubyIntegration.Batch batch, final boolean flush, final boolean shutdown) {
|
||||
final RubyArray batch, final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return data;
|
||||
}
|
||||
|
@ -271,7 +187,7 @@ public interface Dataset {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyArray batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return data;
|
||||
|
@ -325,7 +241,7 @@ public interface Dataset {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyArray batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return data;
|
||||
|
@ -384,7 +300,7 @@ public interface Dataset {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyArray batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return data;
|
||||
|
@ -410,60 +326,4 @@ public interface Dataset {
|
|||
done = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Output {@link Dataset} that passes all its {@link JrubyEventExtLibrary.RubyEvent}
|
||||
* to the underlying {@link RubyIntegration.Output#multiReceive(Collection)}.
|
||||
*/
|
||||
final class OutputDataset implements Dataset {
|
||||
|
||||
/**
|
||||
* Empty {@link Collection} returned by this class's
|
||||
* {@link Dataset#compute(RubyIntegration.Batch, boolean, boolean)} implementation.
|
||||
*/
|
||||
private static final Collection<JrubyEventExtLibrary.RubyEvent> EMPTY_RETURN =
|
||||
Collections.emptyList();
|
||||
|
||||
private final Collection<Dataset> parents;
|
||||
|
||||
private final RubyIntegration.Output output;
|
||||
|
||||
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
|
||||
|
||||
private boolean done;
|
||||
|
||||
public OutputDataset(Collection<Dataset> parents, final RubyIntegration.Output output) {
|
||||
this.parents = parents;
|
||||
this.output = output;
|
||||
buffer = new ArrayList<>(5);
|
||||
done = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyIntegration.Batch batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if(!done) {
|
||||
for (final Dataset set : parents) {
|
||||
for (final JrubyEventExtLibrary.RubyEvent event
|
||||
: set.compute(batch, flush, shutdown)) {
|
||||
if (!event.getEvent().isCancelled()) {
|
||||
buffer.add(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
output.multiReceive(buffer);
|
||||
done = true;
|
||||
buffer.clear();
|
||||
}
|
||||
return EMPTY_RETURN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
for (final Dataset parent : parents) {
|
||||
parent.clear();
|
||||
}
|
||||
done = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,302 @@
|
|||
package org.logstash.config.ir.compiler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.codehaus.commons.compiler.CompileException;
|
||||
import org.codehaus.janino.ClassBodyEvaluator;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.internal.runtime.methods.DynamicMethod;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
|
||||
/**
|
||||
* Compiler that can compile implementations of {@link Dataset} at runtime.
|
||||
*/
|
||||
public final class DatasetCompiler {
|
||||
|
||||
/**
|
||||
* Sequence number to ensure unique naming for runtime compiled classes.
|
||||
*/
|
||||
private static final AtomicInteger SEQUENCE = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* Cache of runtime compiled classes to prevent duplicate classes being compiled.
|
||||
*/
|
||||
private static final Map<String, Class<?>> CLASS_CACHE = new HashMap<>();
|
||||
|
||||
private static final String RETURN_NULL = "return null;";
|
||||
/**
|
||||
* Trivial {@link Dataset} that simply returns an empty collection of elements.
|
||||
*/
|
||||
private static final Dataset EMPTY_DATASET =
|
||||
DatasetCompiler.compile("return Collections.EMPTY_LIST;", "");
|
||||
|
||||
private DatasetCompiler() {
|
||||
// Utility Class
|
||||
}
|
||||
|
||||
/**
|
||||
* Compiles and subsequently instantiates a {@link Dataset} from given code snippets and
|
||||
* constructor arguments.
|
||||
* This method must be {@code synchronized} to avoid compiling duplicate classes.
|
||||
* @param compute Method body of {@link Dataset#compute(RubyArray, boolean, boolean)}
|
||||
* @param clear Method body of {@link Dataset#clear()}
|
||||
* @param fieldValues Constructor Arguments
|
||||
* @return Dataset Instance
|
||||
*/
|
||||
public static synchronized Dataset compile(final String compute, final String clear,
|
||||
final Object... fieldValues) {
|
||||
final String source = String.format(
|
||||
"public Collection compute(RubyArray batch, boolean flush, boolean shutdown) { %s } public void clear() { %s }",
|
||||
compute, clear
|
||||
);
|
||||
try {
|
||||
final Class<?> clazz;
|
||||
if (CLASS_CACHE.containsKey(source)) {
|
||||
clazz = CLASS_CACHE.get(source);
|
||||
} else {
|
||||
final ClassBodyEvaluator se = new ClassBodyEvaluator();
|
||||
se.setImplementedInterfaces(new Class[]{Dataset.class});
|
||||
final String classname =
|
||||
String.format("CompiledDataset%d", SEQUENCE.incrementAndGet());
|
||||
se.setClassName(classname);
|
||||
se.setDefaultImports(
|
||||
new String[]{
|
||||
"java.util.Collection", "java.util.Collections",
|
||||
"org.logstash.config.ir.compiler.Dataset",
|
||||
"org.logstash.ext.JrubyEventExtLibrary",
|
||||
"org.logstash.RubyUtil", "org.logstash.config.ir.compiler.DatasetCompiler",
|
||||
"org.jruby.runtime.Block", "org.jruby.RubyArray"
|
||||
}
|
||||
);
|
||||
se.cook(new StringReader(fieldsAndCtor(classname, fieldValues) + source));
|
||||
clazz = se.getClazz();
|
||||
CLASS_CACHE.put(source, clazz);
|
||||
}
|
||||
final Class<?>[] args = new Class[fieldValues.length];
|
||||
Arrays.fill(args, Object.class);
|
||||
return (Dataset) clazz.getConstructor(args).newInstance(fieldValues);
|
||||
} catch (final CompileException | IOException | NoSuchMethodException
|
||||
| InvocationTargetException | InstantiationException | IllegalAccessException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Builds a terminal {@link Dataset} from the given parent {@link Dataset}s.</p>
|
||||
* <p>If the given set of parent {@link Dataset} is empty the sum is defined as the
|
||||
* trivial dataset that does not invoke any computation whatsoever.</p>
|
||||
* {@link Dataset#compute(RubyArray, boolean, boolean)} is always
|
||||
* {@link Collections#emptyList()}.
|
||||
* @param parents Parent {@link Dataset} to sum and terminate
|
||||
* @return Dataset representing the sum of given parent {@link Dataset}
|
||||
*/
|
||||
public static Dataset terminalDataset(final Collection<Dataset> parents) {
|
||||
final int count = parents.size();
|
||||
final Dataset result;
|
||||
if (count > 1) {
|
||||
final Object[] parentArr = parents.toArray();
|
||||
final int cnt = parentArr.length;
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
syntax.append(computeDataset(i)).append(';');
|
||||
}
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
syntax.append(clear(i));
|
||||
}
|
||||
syntax.append(RETURN_NULL);
|
||||
result = compile(syntax.toString(), "", (Object[]) parentArr);
|
||||
} else if (count == 1) {
|
||||
// No need for a terminal dataset here, if there is only a single parent node we can
|
||||
// call it directly.
|
||||
result = parents.iterator().next();
|
||||
} else {
|
||||
result = EMPTY_DATASET;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compiles the {@link Dataset} representing an output plugin.
|
||||
* Note: The efficiency of the generated code rests on invoking the Ruby method
|
||||
* {@code multi_receive} in the cheapest possible way.
|
||||
* This is achieved by:
|
||||
* 1. Caching the method's {@link org.jruby.runtime.CallSite} into an instance
|
||||
* variable.
|
||||
* 2. Calling the low level CallSite invocation
|
||||
* {@link DynamicMethod#call(org.jruby.runtime.ThreadContext, IRubyObject, org.jruby.RubyModule, String, IRubyObject[], org.jruby.runtime.Block)}
|
||||
* using an {@code IRubyObject[]} field that is repopulated with the current Event array on
|
||||
* every call to {@code compute}.
|
||||
* @param parents Parent Datasets
|
||||
* @param output Output Plugin (of Ruby type OutputDelegator)
|
||||
* @param terminal Set to true if this output is the only output in the pipeline
|
||||
* @return Output Dataset
|
||||
*/
|
||||
public static Dataset outputDataset(Collection<Dataset> parents, final IRubyObject output,
|
||||
final boolean terminal) {
|
||||
final String multiReceive = "multi_receive";
|
||||
final DynamicMethod method = output.getMetaClass().searchMethod(multiReceive);
|
||||
// Short-circuit trivial case of only output(s) in the pipeline
|
||||
if (parents == Dataset.ROOT_DATASETS) {
|
||||
return outputDatasetFromRoot(output, method);
|
||||
}
|
||||
final RubyArray buffer = RubyUtil.RUBY.newArray();
|
||||
final Object[] parentArr = parents.toArray();
|
||||
final int cnt = parentArr.length;
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
final int bufferIndex = cnt;
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
syntax.append("for (JrubyEventExtLibrary.RubyEvent event : ")
|
||||
.append(computeDataset(i)).append(") {")
|
||||
.append("if (!event.getEvent().isCancelled()) { ")
|
||||
.append(field(bufferIndex)).append(".add(event); } }");
|
||||
}
|
||||
final int callsiteIndex = cnt + 1;
|
||||
final int argArrayIndex = cnt + 2;
|
||||
final int pluginIndex = cnt + 3;
|
||||
syntax.append(callOutput(callsiteIndex, argArrayIndex, pluginIndex));
|
||||
syntax.append(clear(bufferIndex));
|
||||
final Object[] allArgs = new Object[cnt + 4];
|
||||
System.arraycopy(parentArr, 0, allArgs, 0, cnt);
|
||||
allArgs[bufferIndex] = buffer;
|
||||
allArgs[callsiteIndex] = method;
|
||||
allArgs[argArrayIndex] = new IRubyObject[]{buffer};
|
||||
allArgs[pluginIndex] = output;
|
||||
final StringBuilder clearSyntax = new StringBuilder();
|
||||
if (terminal) {
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
syntax.append(clear(i));
|
||||
}
|
||||
} else {
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
clearSyntax.append(clear(i));
|
||||
}
|
||||
}
|
||||
syntax.append(RETURN_NULL);
|
||||
return compile(syntax.toString(), clearSyntax.toString(), allArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Special case optimization for when the output plugin is directly connected to the Queue
|
||||
* without any filters or conditionals in between. This special case does not arise naturally
|
||||
* from {@link DatasetCompiler#outputDataset(Collection, IRubyObject, boolean)} since it saves
|
||||
* the internal buffering of events and instead forwards events directly from the batch to the
|
||||
* Output plugin.
|
||||
* @param output Output Plugin
|
||||
* @return Dataset representing the Output
|
||||
*/
|
||||
private static Dataset outputDatasetFromRoot(final IRubyObject output,
|
||||
final DynamicMethod method) {
|
||||
final int argArrayIndex = 1;
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
syntax.append(field(argArrayIndex)).append("[0] = batch;");
|
||||
final int callsiteIndex = 0;
|
||||
final int pluginIndex = 2;
|
||||
syntax.append(callOutput(callsiteIndex, argArrayIndex, pluginIndex));
|
||||
final Object[] allArgs = new Object[3];
|
||||
allArgs[callsiteIndex] = method;
|
||||
allArgs[argArrayIndex] = new IRubyObject[1];
|
||||
allArgs[pluginIndex] = output;
|
||||
syntax.append(RETURN_NULL);
|
||||
return compile(syntax.toString(), "", allArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the code for invoking the Output plugin's `multi_receive` method.
|
||||
* @param callsiteIndex Field index of the `multi_receive` call site
|
||||
* @param argArrayIndex Field index of the invocation argument array
|
||||
* @param pluginIndex Field index of the Output plugin's Ruby object
|
||||
* @return Java Code String
|
||||
*/
|
||||
private static String callOutput(final int callsiteIndex, final int argArrayIndex,
|
||||
final int pluginIndex) {
|
||||
return new StringBuilder().append(field(callsiteIndex)).append(
|
||||
".call(RubyUtil.RUBY.getCurrentContext(), ").append(field(pluginIndex))
|
||||
.append(", RubyUtil.LOGSTASH_MODULE, \"multi_receive\", ")
|
||||
.append(field(argArrayIndex)).append(", Block.NULL_BLOCK);").toString();
|
||||
}
|
||||
|
||||
private static String clear(final int fieldIndex) {
|
||||
return String.format("%s.clear();", field(fieldIndex));
|
||||
}
|
||||
|
||||
private static String computeDataset(final int fieldIndex) {
|
||||
return String.format("%s.compute(batch, flush, shutdown)", field(fieldIndex));
|
||||
}
|
||||
|
||||
private static String field(final int id) {
|
||||
return String.format("field%d", id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the Java code for defining one field and constructor argument for each given value.
|
||||
* @param classname Classname to generate constructor for
|
||||
* @param values Values to store in instance fields and to generate assignments in the
|
||||
* constructor for
|
||||
* @return Java Source String
|
||||
*/
|
||||
private static String fieldsAndCtor(final String classname, final Object... values) {
|
||||
final StringBuilder result = new StringBuilder();
|
||||
int i = 0;
|
||||
for (final Object fieldValue : values) {
|
||||
result.append("private final ");
|
||||
result.append(typeName(fieldValue));
|
||||
result.append(' ').append(field(i)).append(';');
|
||||
++i;
|
||||
}
|
||||
result.append("public ").append(classname).append('(');
|
||||
for (int k = 0; k < i; ++k) {
|
||||
if (k > 0) {
|
||||
result.append(',');
|
||||
}
|
||||
result.append("Object");
|
||||
result.append(' ').append(field(k));
|
||||
}
|
||||
result.append(')').append('{');
|
||||
int j = 0;
|
||||
for (final Object fieldValue : values) {
|
||||
final String fieldName = field(j);
|
||||
result.append("this.").append(fieldName).append('=').append(castToOwnType(fieldValue))
|
||||
.append(fieldName).append(';');
|
||||
++j;
|
||||
}
|
||||
result.append('}');
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a code-snippet typecast to the strictest possible type for the given object.
|
||||
* Example: Given a obj = "foo" the method generates {@code (java.lang.String) obj}
|
||||
* @param obj Object to generate type cast snippet for
|
||||
* @return Java Source Code
|
||||
*/
|
||||
private static String castToOwnType(final Object obj) {
|
||||
return String.format("(%s)", typeName(obj));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the strictest possible syntax conform type for the given object. Note that for
|
||||
* any {@link Dataset} instance, this will be {@code org.logstash.config.ir.compiler.Dataset}
|
||||
* instead of a concrete class, since Dataset implementations are using runtime compiled
|
||||
* classes.
|
||||
* @param obj Object to lookup type name for
|
||||
* @return Syntax conform type name
|
||||
*/
|
||||
private static String typeName(final Object obj) {
|
||||
final Class<?> clazz;
|
||||
if (obj instanceof Dataset) {
|
||||
clazz = Dataset.class;
|
||||
} else {
|
||||
clazz = obj.getClass();
|
||||
}
|
||||
return clazz.getTypeName();
|
||||
}
|
||||
|
||||
}
|
|
@ -53,14 +53,6 @@ public final class RubyIntegration {
|
|||
boolean periodicFlush();
|
||||
}
|
||||
|
||||
/**
|
||||
* A Ruby Output. Currently, this interface is implemented only by the Ruby class
|
||||
* {@code OutputDelegator}.
|
||||
*/
|
||||
public interface Output extends RubyIntegration.Plugin {
|
||||
void multiReceive(Collection<JrubyEventExtLibrary.RubyEvent> events);
|
||||
}
|
||||
|
||||
/**
|
||||
* The Main Ruby Pipeline Class. Currently, this interface is implemented only by the Ruby class
|
||||
* {@code BasePipeline}.
|
||||
|
@ -70,7 +62,7 @@ public final class RubyIntegration {
|
|||
IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column,
|
||||
IRubyObject args);
|
||||
|
||||
RubyIntegration.Output buildOutput(RubyString name, RubyInteger line, RubyInteger column,
|
||||
IRubyObject buildOutput(RubyString name, RubyInteger line, RubyInteger column,
|
||||
IRubyObject args);
|
||||
|
||||
RubyIntegration.Filter buildFilter(RubyString name, RubyInteger line, RubyInteger column,
|
||||
|
@ -78,20 +70,4 @@ public final class RubyIntegration {
|
|||
|
||||
RubyIntegration.Filter buildCodec(RubyString name, IRubyObject args);
|
||||
}
|
||||
|
||||
/**
|
||||
* A Ruby {@code ReadBatch} implemented by {@code WrappedSynchronousQueue::ReadClient::ReadBatch}
|
||||
* and {@code WrappedAckedQueue::ReadClient::ReadBatch}.
|
||||
*/
|
||||
public interface Batch {
|
||||
|
||||
/**
|
||||
* Retrieves all {@link JrubyEventExtLibrary.RubyEvent} from the batch that are not
|
||||
* cancelled.
|
||||
* @return Collection of all {@link JrubyEventExtLibrary.RubyEvent} in the batch that
|
||||
* are not cancelled
|
||||
*/
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> to_a();
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package org.logstash.config.ir.compiler;
|
||||
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.jruby.RubyArray;
|
||||
import org.junit.Test;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
public final class DatasetCompilerTest {
|
||||
|
||||
@Test
|
||||
public void compilesEmptyMethod() {
|
||||
final Dataset func = DatasetCompiler.compile("return batch.to_a();", "");
|
||||
final RubyArray batch = RubyUtil.RUBY.newArray();
|
||||
MatcherAssert.assertThat(
|
||||
func.compute(batch, false, false),
|
||||
CoreMatchers.is(batch)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void compilesParametrizedMethod() {
|
||||
final JrubyEventExtLibrary.RubyEvent additional =
|
||||
JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());
|
||||
final RubyArray batch = RubyUtil.RUBY.newArray(
|
||||
JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event())
|
||||
);
|
||||
final Dataset func = DatasetCompiler.compile(
|
||||
"final Collection events = batch.to_a();events.add(field0);return events;", "",
|
||||
additional
|
||||
);
|
||||
MatcherAssert.assertThat(
|
||||
func.compute(batch, false, false).size(),
|
||||
CoreMatchers.is(2)
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue