mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
8ee9a990cc
commit
53b9b7dbf9
4 changed files with 89 additions and 8 deletions
|
@ -75,6 +75,3 @@
|
|||
|
||||
# Entropy source for randomness
|
||||
-Djava.security.egd=file:/dev/urandom
|
||||
-agentlib:jdwp=transport=dt_socket,server=n,address=localhost:5006,suspend=y
|
||||
-Dorg.codehaus.janino.source_debugging.dir=/Users/brownbear/src/logstash/logs/debug
|
||||
-Dorg.codehaus.janino.source_debugging.enable=true
|
||||
|
|
|
@ -297,16 +297,25 @@ public final class CompiledPipeline {
|
|||
*/
|
||||
private SplitDataset split(final Collection<Dataset> datasets,
|
||||
final EventCondition condition, final Vertex vertex) {
|
||||
return iffs.computeIfAbsent(
|
||||
vertex.getId(), v -> {
|
||||
final String key = vertex.getId();
|
||||
SplitDataset conditional = iffs.get(key);
|
||||
if (conditional == null) {
|
||||
final Collection<Dataset> dependencies = flatten(datasets, vertex);
|
||||
conditional = iffs.get(key);
|
||||
// Check that compiling the dependencies did not already instantiate the conditional
|
||||
// by requiring its else branch.
|
||||
if (conditional == null) {
|
||||
final ComputeStepSyntaxElement<SplitDataset> prepared =
|
||||
DatasetCompiler.splitDataset(flatten(datasets, vertex), condition);
|
||||
DatasetCompiler.splitDataset(dependencies, condition);
|
||||
LOGGER.debug(
|
||||
"Compiled conditional\n {} \n into \n {}", vertex, prepared
|
||||
);
|
||||
return prepared.instantiate();
|
||||
conditional = prepared.instantiate();
|
||||
iffs.put(key, conditional);
|
||||
}
|
||||
);
|
||||
|
||||
}
|
||||
return conditional;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -46,10 +46,20 @@ final class ClassFields {
|
|||
return addField(FieldDefinition.mutableUnassigned(definitions.size(), type));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a {@link Closure} that should be executed in the constructor after field assignments
|
||||
* have been executed.
|
||||
* @param closure Closure to run after field assignments
|
||||
*/
|
||||
public void addAfterInit(final Closure closure) {
|
||||
afterInit.add(closure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a closure of actions that should be run in the constructor after all field
|
||||
* assignments have been executed.
|
||||
* @return Closure that should be executed after field assignments are done
|
||||
*/
|
||||
public Closure afterInit() {
|
||||
return Closure.wrap(afterInit.toArray(new Closure[0]));
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package org.logstash.config.ir;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedTransferQueue;
|
||||
|
@ -91,6 +92,37 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void buildsForkedPipeline() throws Exception {
|
||||
final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR(
|
||||
"input {mockinput{}} filter { " +
|
||||
"if [foo] != \"bar\" { " +
|
||||
"mockfilter {} " +
|
||||
"mockaddfilter {} " +
|
||||
"if [foo] != \"bar\" { " +
|
||||
"mockfilter {} " +
|
||||
"}} " +
|
||||
"} output {mockoutput{} }",
|
||||
false
|
||||
);
|
||||
final JrubyEventExtLibrary.RubyEvent testEvent =
|
||||
JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());
|
||||
final Map<String, Supplier<RubyIntegration.Filter>> filters = new HashMap<>();
|
||||
filters.put("mockfilter", CompiledPipelineTest.IdentityFilter::new);
|
||||
filters.put("mockaddfilter", CompiledPipelineTest.AddFieldFilter::new);
|
||||
new CompiledPipeline(
|
||||
pipelineIR,
|
||||
new CompiledPipelineTest.MockPluginFactory(
|
||||
Collections.singletonMap("mockinput", () -> null),
|
||||
filters,
|
||||
Collections.singletonMap("mockoutput", mockOutputSupplier())
|
||||
)
|
||||
).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false);
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> outputEvents = EVENT_SINKS.get(runId);
|
||||
MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1));
|
||||
MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true));
|
||||
}
|
||||
|
||||
private Supplier<IRubyObject> mockOutputSupplier() {
|
||||
return () -> RubyUtil.RUBY.evalScriptlet(
|
||||
String.join(
|
||||
|
@ -161,6 +193,39 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mock filter that adds the value 'bar' to the field 'foo' for every event in the batch.
|
||||
*/
|
||||
private static final class AddFieldFilter implements RubyIntegration.Filter {
|
||||
@Override
|
||||
public IRubyObject toRuby() {
|
||||
return RubyUtil.RUBY.evalScriptlet(
|
||||
String.join(
|
||||
"\n",
|
||||
"output = Object.new",
|
||||
"output.define_singleton_method(:multi_filter) do |batch|",
|
||||
"batch.each { |e| e.set('foo', 'bar')}",
|
||||
"end",
|
||||
"output"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasFlush() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean periodicFlush() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mock filter that does not modify the batch.
|
||||
*/
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue