mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
c16917c491
commit
c5a262d925
5 changed files with 282 additions and 277 deletions
|
@ -38,6 +38,10 @@ module LogStash
|
|||
@flushes = @filter.respond_to?(:flush)
|
||||
end
|
||||
|
||||
def toRuby
|
||||
self
|
||||
end
|
||||
|
||||
def config_name
|
||||
@klass.config_name
|
||||
end
|
||||
|
|
|
@ -288,14 +288,13 @@ public final class CompiledPipeline {
|
|||
return plugins.computeIfAbsent(vertex, v -> {
|
||||
final Dataset filter;
|
||||
final RubyIntegration.Filter ruby = filters.get(v);
|
||||
final IRubyObject base = ruby.toRuby();
|
||||
if (ruby.hasFlush()) {
|
||||
if (ruby.periodicFlush()) {
|
||||
filter = new Dataset.FilteredFlushableDataset(datasets, ruby);
|
||||
filter = DatasetCompiler.flushingFilterDataset(
|
||||
datasets, base, !ruby.periodicFlush()
|
||||
);
|
||||
} else {
|
||||
filter = new Dataset.FilteredShutdownFlushableDataset(datasets, ruby);
|
||||
}
|
||||
} else {
|
||||
filter = new Dataset.FilteredDataset(datasets, ruby);
|
||||
filter = DatasetCompiler.filterDataset(datasets, base);
|
||||
}
|
||||
return filter;
|
||||
});
|
||||
|
|
|
@ -4,8 +4,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyHash;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
/**
|
||||
|
@ -162,168 +160,4 @@ public interface Dataset {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Dataset} resulting from applying a backing {@link RubyIntegration.Filter} to all
|
||||
* dependent {@link Dataset}.
|
||||
*/
|
||||
final class FilteredDataset implements Dataset {
|
||||
|
||||
private final Collection<Dataset> parents;
|
||||
|
||||
private final RubyIntegration.Filter func;
|
||||
|
||||
private final Collection<JrubyEventExtLibrary.RubyEvent> data;
|
||||
|
||||
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
|
||||
|
||||
private boolean done;
|
||||
|
||||
public FilteredDataset(Collection<Dataset> parents, final RubyIntegration.Filter func) {
|
||||
this.parents = parents;
|
||||
this.func = func;
|
||||
data = new ArrayList<>(5);
|
||||
buffer = new ArrayList<>(5);
|
||||
done = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyArray batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return data;
|
||||
}
|
||||
for (final Dataset set : parents) {
|
||||
buffer.addAll(set.compute(batch, flush, shutdown));
|
||||
}
|
||||
done = true;
|
||||
data.addAll(func.multiFilter(buffer));
|
||||
buffer.clear();
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
for (final Dataset parent : parents) {
|
||||
parent.clear();
|
||||
}
|
||||
data.clear();
|
||||
done = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Dataset} resulting from applying a backing {@link RubyIntegration.Filter} that flushes
|
||||
* periodically to all dependent {@link Dataset}.
|
||||
*/
|
||||
final class FilteredFlushableDataset implements Dataset {
|
||||
|
||||
public static final RubyHash FLUSH_FINAL = flushOpts(true);
|
||||
|
||||
private static final RubyHash FLUSH_NOT_FINAL = flushOpts(false);
|
||||
|
||||
private final Collection<Dataset> parents;
|
||||
|
||||
private final RubyIntegration.Filter func;
|
||||
|
||||
private final Collection<JrubyEventExtLibrary.RubyEvent> data;
|
||||
|
||||
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
|
||||
|
||||
private boolean done;
|
||||
|
||||
public FilteredFlushableDataset(Collection<Dataset> parents,
|
||||
final RubyIntegration.Filter func) {
|
||||
this.parents = parents;
|
||||
this.func = func;
|
||||
data = new ArrayList<>(5);
|
||||
buffer = new ArrayList<>(5);
|
||||
done = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyArray batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return data;
|
||||
}
|
||||
for (final Dataset set : parents) {
|
||||
buffer.addAll(set.compute(batch, flush, shutdown));
|
||||
}
|
||||
done = true;
|
||||
data.addAll(func.multiFilter(buffer));
|
||||
if (flush) {
|
||||
data.addAll(func.flush(shutdown ? FLUSH_FINAL : FLUSH_NOT_FINAL));
|
||||
}
|
||||
buffer.clear();
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
for (final Dataset parent : parents) {
|
||||
parent.clear();
|
||||
}
|
||||
data.clear();
|
||||
done = false;
|
||||
}
|
||||
|
||||
private static RubyHash flushOpts(final boolean fin) {
|
||||
final RubyHash res = RubyHash.newHash(RubyUtil.RUBY);
|
||||
res.put(RubyUtil.RUBY.newSymbol("final"), RubyUtil.RUBY.newBoolean(fin));
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Dataset} resulting from applying a backing {@link RubyIntegration.Filter} that does
|
||||
* flush, but only on shutdown, to all dependent {@link Dataset}.
|
||||
*/
|
||||
final class FilteredShutdownFlushableDataset implements Dataset {
|
||||
|
||||
private final Collection<Dataset> parents;
|
||||
|
||||
private final RubyIntegration.Filter func;
|
||||
|
||||
private final Collection<JrubyEventExtLibrary.RubyEvent> data;
|
||||
|
||||
private final Collection<JrubyEventExtLibrary.RubyEvent> buffer;
|
||||
|
||||
private boolean done;
|
||||
|
||||
public FilteredShutdownFlushableDataset(Collection<Dataset> parents,
|
||||
final RubyIntegration.Filter func) {
|
||||
this.parents = parents;
|
||||
this.func = func;
|
||||
data = new ArrayList<>(5);
|
||||
buffer = new ArrayList<>(5);
|
||||
done = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JrubyEventExtLibrary.RubyEvent> compute(final RubyArray batch,
|
||||
final boolean flush, final boolean shutdown) {
|
||||
if (done) {
|
||||
return data;
|
||||
}
|
||||
for (final Dataset set : parents) {
|
||||
buffer.addAll(set.compute(batch, flush, shutdown));
|
||||
}
|
||||
done = true;
|
||||
data.addAll(func.multiFilter(buffer));
|
||||
if (flush && shutdown) {
|
||||
data.addAll(func.flush(FilteredFlushableDataset.FLUSH_FINAL));
|
||||
}
|
||||
buffer.clear();
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
for (final Dataset parent : parents) {
|
||||
parent.clear();
|
||||
}
|
||||
data.clear();
|
||||
done = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.logstash.config.ir.compiler;
|
|||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -12,6 +13,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.codehaus.commons.compiler.CompileException;
|
||||
import org.codehaus.janino.ClassBodyEvaluator;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyHash;
|
||||
import org.jruby.internal.runtime.methods.DynamicMethod;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
|
@ -21,6 +23,16 @@ import org.logstash.RubyUtil;
|
|||
*/
|
||||
public final class DatasetCompiler {
|
||||
|
||||
/**
|
||||
* Argument passed to Ruby Filter flush method in generated code.
|
||||
*/
|
||||
public static final IRubyObject[] FLUSH_FINAL = {flushOpts(true)};
|
||||
|
||||
/**
|
||||
* Argument passed to Ruby Filter flush method in generated code.
|
||||
*/
|
||||
public static final IRubyObject[] FLUSH_NOT_FINAL = {flushOpts(false)};
|
||||
|
||||
/**
|
||||
* Sequence number to ensure unique naming for runtime compiled classes.
|
||||
*/
|
||||
|
@ -35,8 +47,47 @@ public final class DatasetCompiler {
|
|||
/**
|
||||
* Trivial {@link Dataset} that simply returns an empty collection of elements.
|
||||
*/
|
||||
private static final Dataset EMPTY_DATASET =
|
||||
DatasetCompiler.compile("return Collections.EMPTY_LIST;", "");
|
||||
private static final Dataset EMPTY_DATASET = DatasetCompiler.compile(RETURN_NULL, "");
|
||||
|
||||
private static final String MULTI_FILTER = "multi_filter";
|
||||
|
||||
private static final String MULTI_RECEIVE = "multi_receive";
|
||||
|
||||
private static final String FLUSH = "flush";
|
||||
|
||||
/**
|
||||
* Relative offset of the field holding the cached arguments used to invoke the
|
||||
* primary callsite of a dataset.
|
||||
*/
|
||||
private static final int ARG_ARRAY_OFFSET = 0;
|
||||
|
||||
/**
|
||||
* Relative offset of the primary (either multi_filter or multi_receive) {@link DynamicMethod}
|
||||
* callsite in generated code.
|
||||
*/
|
||||
private static final int PRIMARY_CALLSITE_OFFSET = 1;
|
||||
|
||||
/**
|
||||
* Relative offset of the field holding a wrapped Ruby plugin.
|
||||
*/
|
||||
private static final int PLUGIN_FIELD_OFFSET = 2;
|
||||
|
||||
/**
|
||||
* Relative offset of the field holding the collection used to buffer input
|
||||
* {@link org.logstash.ext.JrubyEventExtLibrary.RubyEvent}.
|
||||
*/
|
||||
private static final int INPUT_BUFFER_OFFSET = 3;
|
||||
|
||||
/**
|
||||
* Relative offset of the field holding the collection used to buffer computed
|
||||
* {@link org.logstash.ext.JrubyEventExtLibrary.RubyEvent}.
|
||||
*/
|
||||
private static final int RESULT_BUFFER_OFFSET = 4;
|
||||
|
||||
/**
|
||||
* Relative offset of the field holding the filter flush method callsite.
|
||||
*/
|
||||
private static final int FLUSH_CALLSITE_OFFSET = 5;
|
||||
|
||||
private DatasetCompiler() {
|
||||
// Utility Class
|
||||
|
@ -53,12 +104,12 @@ public final class DatasetCompiler {
|
|||
*/
|
||||
public static synchronized Dataset compile(final String compute, final String clear,
|
||||
final Object... fieldValues) {
|
||||
final String source = String.format(
|
||||
"public Collection compute(RubyArray batch, boolean flush, boolean shutdown) { %s } public void clear() { %s }",
|
||||
compute, clear
|
||||
);
|
||||
try {
|
||||
final Class<?> clazz;
|
||||
final String source = String.format(
|
||||
"public Collection compute(RubyArray batch, boolean flush, boolean shutdown) { %s } public void clear() { %s }",
|
||||
compute, clear
|
||||
);
|
||||
if (CLASS_CACHE.containsKey(source)) {
|
||||
clazz = CLASS_CACHE.get(source);
|
||||
} else {
|
||||
|
@ -76,7 +127,7 @@ public final class DatasetCompiler {
|
|||
"org.jruby.runtime.Block", "org.jruby.RubyArray"
|
||||
}
|
||||
);
|
||||
se.cook(new StringReader(fieldsAndCtor(classname, fieldValues) + source));
|
||||
se.cook(new StringReader(join(fieldsAndCtor(classname, fieldValues), source)));
|
||||
clazz = se.getClazz();
|
||||
CLASS_CACHE.put(source, clazz);
|
||||
}
|
||||
|
@ -89,6 +140,51 @@ public final class DatasetCompiler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compiles a {@link Dataset} representing a filter plugin without flush behaviour.
|
||||
* @param parents Parent {@link Dataset} to aggregate for this filter
|
||||
* @param filter Filter Plugin
|
||||
* @return Dataset representing the filter plugin
|
||||
*/
|
||||
public static Dataset filterDataset(final Collection<Dataset> parents,
|
||||
final IRubyObject filter) {
|
||||
final Object[] parentArr = parents.toArray();
|
||||
final int offset = parentArr.length;
|
||||
final Object[] allArgs = new Object[offset + 5];
|
||||
setupFilterFields(filter, parentArr, allArgs);
|
||||
return compileFilterDataset(offset, filterBody(offset), allArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compiles a {@link Dataset} representing a filter plugin with flush behaviour.
|
||||
* @param parents Parent {@link Dataset} to aggregate for this filter
|
||||
* @param filter Filter Plugin
|
||||
* @param shutdownFlushOnly True iff plugin only flushes on shutdown
|
||||
* @return Dataset representing the filter plugin
|
||||
*/
|
||||
public static Dataset flushingFilterDataset(final Collection<Dataset> parents,
|
||||
final IRubyObject filter, final boolean shutdownFlushOnly) {
|
||||
final Object[] parentArr = parents.toArray();
|
||||
final int offset = parentArr.length;
|
||||
final Object[] allArgs = new Object[offset + 6];
|
||||
setupFilterFields(filter, parentArr, allArgs);
|
||||
allArgs[offset + FLUSH_CALLSITE_OFFSET] = rubyCallsite(filter, FLUSH);
|
||||
return compileFilterDataset(
|
||||
offset, join(filterBody(offset), callFilterFlush(offset, shutdownFlushOnly)), allArgs
|
||||
);
|
||||
}
|
||||
|
||||
private static void setupFilterFields(final IRubyObject filter, final Object[] parentArr,
|
||||
final Object[] allArgs) {
|
||||
final RubyArray buffer = RubyUtil.RUBY.newArray();
|
||||
final int offset = parentArr.length;
|
||||
System.arraycopy(parentArr, 0, allArgs, 0, offset);
|
||||
allArgs[offset + INPUT_BUFFER_OFFSET] = buffer;
|
||||
allArgs[offset + PRIMARY_CALLSITE_OFFSET] = rubyCallsite(filter, MULTI_FILTER);
|
||||
allArgs[offset + ARG_ARRAY_OFFSET] = new IRubyObject[]{buffer};
|
||||
allArgs[offset + PLUGIN_FIELD_OFFSET] = filter;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Builds a terminal {@link Dataset} from the given parent {@link Dataset}s.</p>
|
||||
* <p>If the given set of parent {@link Dataset} is empty the sum is defined as the
|
||||
|
@ -103,16 +199,12 @@ public final class DatasetCompiler {
|
|||
final Dataset result;
|
||||
if (count > 1) {
|
||||
final Object[] parentArr = parents.toArray();
|
||||
final int cnt = parentArr.length;
|
||||
final int offset = parentArr.length;
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
for (int i = 0; i < offset; ++i) {
|
||||
syntax.append(computeDataset(i)).append(';');
|
||||
}
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
syntax.append(clear(i));
|
||||
}
|
||||
syntax.append(RETURN_NULL);
|
||||
result = compile(syntax.toString(), "", (Object[]) parentArr);
|
||||
result = compileOutput(join(syntax.toString(), clearSyntax(offset)), "", parentArr);
|
||||
} else if (count == 1) {
|
||||
// No need for a terminal dataset here, if there is only a single parent node we can
|
||||
// call it directly.
|
||||
|
@ -139,88 +231,54 @@ public final class DatasetCompiler {
|
|||
* @param terminal Set to true if this output is the only output in the pipeline
|
||||
* @return Output Dataset
|
||||
*/
|
||||
public static Dataset outputDataset(Collection<Dataset> parents, final IRubyObject output,
|
||||
public static Dataset outputDataset(final Collection<Dataset> parents, final IRubyObject output,
|
||||
final boolean terminal) {
|
||||
final String multiReceive = "multi_receive";
|
||||
final DynamicMethod method = output.getMetaClass().searchMethod(multiReceive);
|
||||
final DynamicMethod method = rubyCallsite(output, MULTI_RECEIVE);
|
||||
// Short-circuit trivial case of only output(s) in the pipeline
|
||||
if (parents == Dataset.ROOT_DATASETS) {
|
||||
return outputDatasetFromRoot(output, method);
|
||||
}
|
||||
final RubyArray buffer = RubyUtil.RUBY.newArray();
|
||||
final Object[] parentArr = parents.toArray();
|
||||
final int cnt = parentArr.length;
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
final int bufferIndex = cnt;
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
syntax.append("for (JrubyEventExtLibrary.RubyEvent event : ")
|
||||
.append(computeDataset(i)).append(") {")
|
||||
.append("if (!event.getEvent().isCancelled()) { ")
|
||||
.append(field(bufferIndex)).append(".add(event); } }");
|
||||
}
|
||||
final int callsiteIndex = cnt + 1;
|
||||
final int argArrayIndex = cnt + 2;
|
||||
final int pluginIndex = cnt + 3;
|
||||
syntax.append(callOutput(callsiteIndex, argArrayIndex, pluginIndex));
|
||||
syntax.append(clear(bufferIndex));
|
||||
final Object[] allArgs = new Object[cnt + 4];
|
||||
System.arraycopy(parentArr, 0, allArgs, 0, cnt);
|
||||
allArgs[bufferIndex] = buffer;
|
||||
allArgs[callsiteIndex] = method;
|
||||
allArgs[argArrayIndex] = new IRubyObject[]{buffer};
|
||||
allArgs[pluginIndex] = output;
|
||||
final StringBuilder clearSyntax = new StringBuilder();
|
||||
final int offset = parentArr.length;
|
||||
final Object[] allArgs = new Object[offset + 4];
|
||||
System.arraycopy(parentArr, 0, allArgs, 0, offset);
|
||||
allArgs[offset + INPUT_BUFFER_OFFSET] = buffer;
|
||||
allArgs[offset + PRIMARY_CALLSITE_OFFSET] = method;
|
||||
allArgs[offset + ARG_ARRAY_OFFSET] = new IRubyObject[]{buffer};
|
||||
allArgs[offset + PLUGIN_FIELD_OFFSET] = output;
|
||||
final String clearSyntax;
|
||||
final String inlineClear;
|
||||
if (terminal) {
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
syntax.append(clear(i));
|
||||
}
|
||||
clearSyntax = "";
|
||||
inlineClear = clearSyntax(offset);
|
||||
} else {
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
clearSyntax.append(clear(i));
|
||||
}
|
||||
inlineClear = "";
|
||||
clearSyntax = clearSyntax(offset);
|
||||
}
|
||||
syntax.append(RETURN_NULL);
|
||||
return compile(syntax.toString(), clearSyntax.toString(), allArgs);
|
||||
return compileOutput(
|
||||
join(
|
||||
join(
|
||||
bufferForOutput(offset), callOutput(offset), clear(offset + INPUT_BUFFER_OFFSET)
|
||||
), inlineClear
|
||||
), clearSyntax, allArgs
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Special case optimization for when the output plugin is directly connected to the Queue
|
||||
* without any filters or conditionals in between. This special case does not arise naturally
|
||||
* from {@link DatasetCompiler#outputDataset(Collection, IRubyObject, boolean)} since it saves
|
||||
* the internal buffering of events and instead forwards events directly from the batch to the
|
||||
* Output plugin.
|
||||
* @param output Output Plugin
|
||||
* @return Dataset representing the Output
|
||||
*/
|
||||
private static Dataset outputDatasetFromRoot(final IRubyObject output,
|
||||
final DynamicMethod method) {
|
||||
final int argArrayIndex = 1;
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
syntax.append(field(argArrayIndex)).append("[0] = batch;");
|
||||
final int callsiteIndex = 0;
|
||||
final int pluginIndex = 2;
|
||||
syntax.append(callOutput(callsiteIndex, argArrayIndex, pluginIndex));
|
||||
final Object[] allArgs = new Object[3];
|
||||
allArgs[callsiteIndex] = method;
|
||||
allArgs[argArrayIndex] = new IRubyObject[1];
|
||||
allArgs[pluginIndex] = output;
|
||||
syntax.append(RETURN_NULL);
|
||||
return compile(syntax.toString(), "", allArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the code for invoking the Output plugin's `multi_receive` method.
|
||||
* @param callsiteIndex Field index of the `multi_receive` call site
|
||||
* @param argArrayIndex Field index of the invocation argument array
|
||||
* @param pluginIndex Field index of the Output plugin's Ruby object
|
||||
* @return Java Code String
|
||||
*/
|
||||
private static String callOutput(final int callsiteIndex, final int argArrayIndex,
|
||||
final int pluginIndex) {
|
||||
return new StringBuilder().append(field(callsiteIndex)).append(
|
||||
".call(RubyUtil.RUBY.getCurrentContext(), ").append(field(pluginIndex))
|
||||
.append(", RubyUtil.LOGSTASH_MODULE, \"multi_receive\", ")
|
||||
.append(field(argArrayIndex)).append(", Block.NULL_BLOCK);").toString();
|
||||
private static String callFilterFlush(final int offset, final boolean shutdownOnly) {
|
||||
final String condition;
|
||||
final String flushArgs;
|
||||
if (shutdownOnly) {
|
||||
condition = "flush && shutdown";
|
||||
flushArgs = "DatasetCompiler.FLUSH_FINAL";
|
||||
} else {
|
||||
condition = "flush";
|
||||
flushArgs = "shutdown ? DatasetCompiler.FLUSH_FINAL : DatasetCompiler.FLUSH_NOT_FINAL";
|
||||
}
|
||||
return join(
|
||||
"if(", condition, "){", field(offset + RESULT_BUFFER_OFFSET), ".addAll((RubyArray)",
|
||||
callRubyCallsite(FLUSH_CALLSITE_OFFSET, flushArgs, offset, FLUSH), ");}"
|
||||
);
|
||||
}
|
||||
|
||||
private static String clear(final int fieldIndex) {
|
||||
|
@ -246,31 +304,140 @@ public final class DatasetCompiler {
|
|||
final StringBuilder result = new StringBuilder();
|
||||
int i = 0;
|
||||
for (final Object fieldValue : values) {
|
||||
result.append("private final ");
|
||||
result.append(typeName(fieldValue));
|
||||
result.append(' ').append(field(i)).append(';');
|
||||
result.append(join("private final ", typeName(fieldValue), " ", field(i), ";"));
|
||||
++i;
|
||||
}
|
||||
result.append("public ").append(classname).append('(');
|
||||
result.append(join("public ", classname, "("));
|
||||
for (int k = 0; k < i; ++k) {
|
||||
if (k > 0) {
|
||||
result.append(',');
|
||||
}
|
||||
result.append("Object");
|
||||
result.append(' ').append(field(k));
|
||||
result.append(join("Object ", field(k)));
|
||||
}
|
||||
result.append(')').append('{');
|
||||
result.append(") {");
|
||||
int j = 0;
|
||||
for (final Object fieldValue : values) {
|
||||
final String fieldName = field(j);
|
||||
result.append("this.").append(fieldName).append('=').append(castToOwnType(fieldValue))
|
||||
.append(fieldName).append(';');
|
||||
result.append(join("this.", fieldName, "=", castToOwnType(fieldValue), fieldName, ";"));
|
||||
++j;
|
||||
}
|
||||
result.append('}');
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
private static IRubyObject flushOpts(final boolean fin) {
|
||||
final RubyHash res = RubyHash.newHash(RubyUtil.RUBY);
|
||||
res.put(RubyUtil.RUBY.newSymbol("final"), RubyUtil.RUBY.newBoolean(fin));
|
||||
return res;
|
||||
}
|
||||
|
||||
private static String bufferForOutput(final int offset) {
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
for (int i = 0; i < offset; ++i) {
|
||||
syntax.append(
|
||||
join(
|
||||
"for (JrubyEventExtLibrary.RubyEvent e : ", computeDataset(i), ") {",
|
||||
"if (!e.getEvent().isCancelled()) { ", field(offset + INPUT_BUFFER_OFFSET),
|
||||
".add(e); } }"
|
||||
)
|
||||
);
|
||||
}
|
||||
return syntax.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Special case optimization for when the output plugin is directly connected to the Queue
|
||||
* without any filters or conditionals in between. This special case does not arise naturally
|
||||
* from {@link DatasetCompiler#outputDataset(Collection, IRubyObject, boolean)} since it saves
|
||||
* the internal buffering of events and instead forwards events directly from the batch to the
|
||||
* Output plugin.
|
||||
* @param output Output Plugin
|
||||
* @return Dataset representing the Output
|
||||
*/
|
||||
private static Dataset outputDatasetFromRoot(final IRubyObject output,
|
||||
final DynamicMethod method) {
|
||||
final Object[] allArgs = new Object[3];
|
||||
allArgs[PRIMARY_CALLSITE_OFFSET] = method;
|
||||
allArgs[ARG_ARRAY_OFFSET] = new IRubyObject[1];
|
||||
allArgs[PLUGIN_FIELD_OFFSET] = output;
|
||||
return compileOutput(
|
||||
join(field(ARG_ARRAY_OFFSET), "[0] = batch;", callOutput(0)), "",
|
||||
allArgs
|
||||
);
|
||||
}
|
||||
|
||||
private static Dataset compileOutput(final String syntax, final String clearSyntax,
|
||||
final Object[] allArgs) {
|
||||
return compile(join(syntax, RETURN_NULL), clearSyntax, allArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the code for invoking the Output plugin's `multi_receive` method.
|
||||
* @param offset Number of Parent Dataset Fields
|
||||
* @return Java Code String
|
||||
*/
|
||||
private static String callOutput(final int offset) {
|
||||
return join(
|
||||
callRubyCallsite(
|
||||
PRIMARY_CALLSITE_OFFSET, field(offset + ARG_ARRAY_OFFSET), offset, MULTI_RECEIVE
|
||||
), ";"
|
||||
);
|
||||
}
|
||||
|
||||
private static String callFilter(final int offset) {
|
||||
return join(
|
||||
field(offset + RESULT_BUFFER_OFFSET), ".addAll((RubyArray)",
|
||||
callRubyCallsite(
|
||||
PRIMARY_CALLSITE_OFFSET, field(offset + ARG_ARRAY_OFFSET), offset, MULTI_FILTER
|
||||
), ");"
|
||||
);
|
||||
}
|
||||
|
||||
private static String callRubyCallsite(final int callsiteOffset, final String argument,
|
||||
final int offset, final String method) {
|
||||
return join(
|
||||
field(offset + callsiteOffset), ".call(RubyUtil.RUBY.getCurrentContext(), ",
|
||||
field(offset + PLUGIN_FIELD_OFFSET),
|
||||
", RubyUtil.LOGSTASH_MODULE,", join("\"", method, "\""), ", ", argument,
|
||||
", Block.NULL_BLOCK)"
|
||||
);
|
||||
}
|
||||
|
||||
private static Dataset compileFilterDataset(final int offset, final String syntax,
|
||||
final Object[] allArgs) {
|
||||
allArgs[offset + RESULT_BUFFER_OFFSET] = new ArrayList<>();
|
||||
return compile(
|
||||
join(syntax, "return ", field(offset + RESULT_BUFFER_OFFSET), ";"),
|
||||
join(clearSyntax(offset), clear(offset + RESULT_BUFFER_OFFSET)), allArgs
|
||||
);
|
||||
}
|
||||
|
||||
private static String clearSyntax(final int count) {
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
syntax.append(clear(i));
|
||||
}
|
||||
return syntax.toString();
|
||||
}
|
||||
|
||||
private static DynamicMethod rubyCallsite(final IRubyObject rubyObject, final String name) {
|
||||
return rubyObject.getMetaClass().searchMethod(name);
|
||||
}
|
||||
|
||||
private static String evalParents(final int count) {
|
||||
final StringBuilder syntax = new StringBuilder();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
syntax.append(
|
||||
join(field(count + INPUT_BUFFER_OFFSET), ".addAll(", computeDataset(i), ");")
|
||||
);
|
||||
}
|
||||
return syntax.toString();
|
||||
}
|
||||
|
||||
private static String filterBody(final int offset) {
|
||||
return join(evalParents(offset), callFilter(offset), clear(offset + INPUT_BUFFER_OFFSET));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a code-snippet typecast to the strictest possible type for the given object.
|
||||
* Example: Given a obj = "foo" the method generates {@code (java.lang.String) obj}
|
||||
|
@ -296,7 +463,16 @@ public final class DatasetCompiler {
|
|||
} else {
|
||||
clazz = obj.getClass();
|
||||
}
|
||||
return clazz.getTypeName();
|
||||
final String classname = clazz.getTypeName();
|
||||
// JavaFilterDelegator classes are runtime generated by Ruby and are not available
|
||||
// to the Janino compiler's classloader. There is no value in casting to the concrete class
|
||||
// here anyways since JavaFilterDelegator instances are only passed as IRubyObject type
|
||||
// method parameters in the generated code.
|
||||
return classname.contains("JavaFilterDelegator")
|
||||
? IRubyObject.class.getTypeName() : classname;
|
||||
}
|
||||
|
||||
private static String join(final String... parts) {
|
||||
return String.join("", parts);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
package org.logstash.config.ir.compiler;
|
||||
|
||||
import java.util.Collection;
|
||||
import org.jruby.RubyHash;
|
||||
import org.jruby.RubyInteger;
|
||||
import org.jruby.RubyString;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
/**
|
||||
* This class holds interfaces implemented by Ruby concrete classes.
|
||||
|
@ -30,15 +27,10 @@ public final class RubyIntegration {
|
|||
public interface Filter extends RubyIntegration.Plugin {
|
||||
|
||||
/**
|
||||
* Same as {@code FilterDelegator}'s {@code multi_filter}.
|
||||
* @param events Events to Filter
|
||||
* @return Filtered {@link JrubyEventExtLibrary.RubyEvent}
|
||||
* Returns the underlying {@link IRubyObject} for this filter instance.
|
||||
* @return Underlying {@link IRubyObject}
|
||||
*/
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> multiFilter(
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> events
|
||||
);
|
||||
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> flush(RubyHash options);
|
||||
IRubyObject toRuby();
|
||||
|
||||
/**
|
||||
* Checks if this filter has a flush method.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue