#9255 Drastically speed up pipeline compilation by making Vertex compilation more efficient

Fixes #9278
This commit is contained in:
Armin 2018-03-22 18:48:17 +01:00 committed by Armin Braun
parent 798313f193
commit d2af7706bd

View file

@ -7,6 +7,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyHash;
@ -247,14 +248,14 @@ public final class CompiledPipeline {
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
* the application of the given filter.
* @param vertex Vertex of the filter to create this {@link Dataset} for
* @param datasets All the datasets that pass through this filter
* @param datasets All the datasets that have children passing into this filter
* @return Filter {@link Dataset}
*/
private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> datasets) {
return plugins.computeIfAbsent(
vertex.getId(), v -> {
final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.filterDataset(datasets, filters.get(v));
DatasetCompiler.filterDataset(flatten(datasets, vertex), filters.get(v));
LOGGER.debug("Compiled filter\n {} \n into \n {}", vertex, prepared);
return prepared.instantiate();
}
@ -265,7 +266,7 @@ public final class CompiledPipeline {
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
* the application of the given output.
* @param vertex Vertex of the output to create this {@link Dataset} for
* @param datasets All the datasets that are passed into this output
* @param datasets All the datasets that have children passing into this output
* @return Output {@link Dataset}
*/
private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> datasets) {
@ -273,7 +274,7 @@ public final class CompiledPipeline {
vertex.getId(), v -> {
final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.outputDataset(
datasets, outputs.get(v), outputs.size() == 1
flatten(datasets, vertex), outputs.get(v), outputs.size() == 1
);
LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared);
return prepared.instantiate();
@ -284,7 +285,7 @@ public final class CompiledPipeline {
/**
* Split the given {@link Dataset}s and return the dataset half of their elements that contains
* the {@link JrubyEventExtLibrary.RubyEvent} that fulfil the given {@link EventCondition}.
* @param datasets Datasets to split
* @param datasets Datasets that are the parents of the datasets to split
* @param condition Condition that must be fulfilled
* @param vertex Vertex id to cache the resulting {@link Dataset} under
* @return The half of the datasets contents that fulfils the condition
@ -294,7 +295,7 @@ public final class CompiledPipeline {
return iffs.computeIfAbsent(
vertex.getId(), v -> {
final ComputeStepSyntaxElement<SplitDataset> prepared =
DatasetCompiler.splitDataset(datasets, condition);
DatasetCompiler.splitDataset(flatten(datasets, vertex), condition);
LOGGER.debug(
"Compiled conditional\n {} \n into \n {}", vertex, prepared
);
@ -314,11 +315,10 @@ public final class CompiledPipeline {
*/
private Collection<Dataset> flatten(final Collection<Dataset> datasets,
final Vertex start) {
final Collection<Vertex> dependencies = start.incomingVertices()
.filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex)
.collect(Collectors.toList());
return dependencies.isEmpty() ? datasets
: compileDependencies(start, datasets, dependencies);
final Collection<Dataset> result = compileDependencies(start, datasets,
start.incomingVertices().filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex)
);
return result.isEmpty() ? datasets : result;
}
/**
@ -329,20 +329,19 @@ public final class CompiledPipeline {
* @return Datasets compiled from vertex children
*/
private Collection<Dataset> compileDependencies(final Vertex start,
final Collection<Dataset> datasets, final Collection<Vertex> dependencies) {
return dependencies.stream().map(
final Collection<Dataset> datasets, final Stream<Vertex> dependencies) {
return dependencies.map(
dependency -> {
final Collection<Dataset> transientDependencies = flatten(datasets, dependency);
if (isFilter(dependency)) {
return filterDataset(dependency, transientDependencies);
return filterDataset(dependency, datasets);
} else if (isOutput(dependency)) {
return outputDataset(dependency, transientDependencies);
return outputDataset(dependency, datasets);
} else {
// We know that it's an if vertex since the the input children are either
// output, filter or if in type.
final IfVertex ifvert = (IfVertex) dependency;
final SplitDataset ifDataset = split(
transientDependencies,
datasets,
EventCondition.Compiler.buildCondition(ifvert.getBooleanExpression()),
dependency
);