mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
#9255 Drastically speed up pipeline compilation by making Vertex compilation more efficient
Fixes #9278
This commit is contained in:
parent
32931b8c11
commit
0680c9ec09
1 changed files with 16 additions and 17 deletions
|
@ -7,6 +7,7 @@ import java.util.HashSet;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jruby.RubyHash;
|
||||
|
@ -247,14 +248,14 @@ public final class CompiledPipeline {
|
|||
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
|
||||
* the application of the given filter.
|
||||
* @param vertex Vertex of the filter to create this {@link Dataset} for
|
||||
* @param datasets All the datasets that pass through this filter
|
||||
* @param datasets All the datasets that have children passing into this filter
|
||||
* @return Filter {@link Dataset}
|
||||
*/
|
||||
private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> datasets) {
|
||||
return plugins.computeIfAbsent(
|
||||
vertex.getId(), v -> {
|
||||
final ComputeStepSyntaxElement<Dataset> prepared =
|
||||
DatasetCompiler.filterDataset(datasets, filters.get(v));
|
||||
DatasetCompiler.filterDataset(flatten(datasets, vertex), filters.get(v));
|
||||
LOGGER.debug("Compiled filter\n {} \n into \n {}", vertex, prepared);
|
||||
return prepared.instantiate();
|
||||
}
|
||||
|
@ -265,7 +266,7 @@ public final class CompiledPipeline {
|
|||
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
|
||||
* the application of the given output.
|
||||
* @param vertex Vertex of the output to create this {@link Dataset} for
|
||||
* @param datasets All the datasets that are passed into this output
|
||||
* @param datasets All the datasets that have children passing into this output
|
||||
* @return Output {@link Dataset}
|
||||
*/
|
||||
private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> datasets) {
|
||||
|
@ -273,7 +274,7 @@ public final class CompiledPipeline {
|
|||
vertex.getId(), v -> {
|
||||
final ComputeStepSyntaxElement<Dataset> prepared =
|
||||
DatasetCompiler.outputDataset(
|
||||
datasets, outputs.get(v), outputs.size() == 1
|
||||
flatten(datasets, vertex), outputs.get(v), outputs.size() == 1
|
||||
);
|
||||
LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared);
|
||||
return prepared.instantiate();
|
||||
|
@ -284,7 +285,7 @@ public final class CompiledPipeline {
|
|||
/**
|
||||
* Split the given {@link Dataset}s and return the dataset half of their elements that contains
|
||||
* the {@link JrubyEventExtLibrary.RubyEvent} that fulfil the given {@link EventCondition}.
|
||||
* @param datasets Datasets to split
|
||||
* @param datasets Datasets that are the parents of the datasets to split
|
||||
* @param condition Condition that must be fulfilled
|
||||
* @param vertex Vertex id to cache the resulting {@link Dataset} under
|
||||
* @return The half of the datasets contents that fulfils the condition
|
||||
|
@ -294,7 +295,7 @@ public final class CompiledPipeline {
|
|||
return iffs.computeIfAbsent(
|
||||
vertex.getId(), v -> {
|
||||
final ComputeStepSyntaxElement<SplitDataset> prepared =
|
||||
DatasetCompiler.splitDataset(datasets, condition);
|
||||
DatasetCompiler.splitDataset(flatten(datasets, vertex), condition);
|
||||
LOGGER.debug(
|
||||
"Compiled conditional\n {} \n into \n {}", vertex, prepared
|
||||
);
|
||||
|
@ -314,11 +315,10 @@ public final class CompiledPipeline {
|
|||
*/
|
||||
private Collection<Dataset> flatten(final Collection<Dataset> datasets,
|
||||
final Vertex start) {
|
||||
final Collection<Vertex> dependencies = start.incomingVertices()
|
||||
.filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex)
|
||||
.collect(Collectors.toList());
|
||||
return dependencies.isEmpty() ? datasets
|
||||
: compileDependencies(start, datasets, dependencies);
|
||||
final Collection<Dataset> result = compileDependencies(start, datasets,
|
||||
start.incomingVertices().filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex)
|
||||
);
|
||||
return result.isEmpty() ? datasets : result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -329,20 +329,19 @@ public final class CompiledPipeline {
|
|||
* @return Datasets compiled from vertex children
|
||||
*/
|
||||
private Collection<Dataset> compileDependencies(final Vertex start,
|
||||
final Collection<Dataset> datasets, final Collection<Vertex> dependencies) {
|
||||
return dependencies.stream().map(
|
||||
final Collection<Dataset> datasets, final Stream<Vertex> dependencies) {
|
||||
return dependencies.map(
|
||||
dependency -> {
|
||||
final Collection<Dataset> transientDependencies = flatten(datasets, dependency);
|
||||
if (isFilter(dependency)) {
|
||||
return filterDataset(dependency, transientDependencies);
|
||||
return filterDataset(dependency, datasets);
|
||||
} else if (isOutput(dependency)) {
|
||||
return outputDataset(dependency, transientDependencies);
|
||||
return outputDataset(dependency, datasets);
|
||||
} else {
|
||||
// We know that it's an if vertex since the the input children are either
|
||||
// output, filter or if in type.
|
||||
final IfVertex ifvert = (IfVertex) dependency;
|
||||
final SplitDataset ifDataset = split(
|
||||
transientDependencies,
|
||||
datasets,
|
||||
EventCondition.Compiler.buildCondition(ifvert.getBooleanExpression()),
|
||||
dependency
|
||||
);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue