Revert "Move some of the collector wrapping logic into ProfileCollectorBuilder"

This reverts commit 02cc31767f.
This commit is contained in:
Zachary Tong 2015-12-09 13:41:48 -05:00
parent 57f7c04cea
commit 9aa1a3a25c
4 changed files with 62 additions and 129 deletions

View file

@ -31,13 +31,13 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.ProfileCollectorBuilder;
import org.elasticsearch.search.profile.CollectorResult;
import org.elasticsearch.search.profile.InternalProfileCollector;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -74,8 +74,6 @@ public class AggregationPhase implements SearchPhase {
context.aggregations().aggregationContext(aggregationContext);
List<Aggregator> collectors = new ArrayList<>();
boolean doProfile = context.getProfilers() != null;
ProfileCollectorBuilder collectorBuilder = new ProfileCollectorBuilder(doProfile);
Aggregator[] aggregators;
try {
AggregatorFactories factories = context.aggregations().factories();
@ -89,9 +87,11 @@ public class AggregationPhase implements SearchPhase {
if (!collectors.isEmpty()) {
Collector collector = BucketCollector.wrap(collectors);
((BucketCollector)collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
// TODO: report on child aggs as well
collector = collectorBuilder.wrap(collector, CollectorResult.REASON_AGGREGATION);
Collections.emptyList());
}
context.queryCollectors().put(AggregationPhase.class, collector);
}
} catch (IOException e) {
@ -125,8 +125,6 @@ public class AggregationPhase implements SearchPhase {
BucketCollector globalsCollector = BucketCollector.wrap(globals);
Query query = Queries.newMatchAllQuery();
Query searchFilter = context.searchFilter(context.types());
boolean doProfile = context.getProfilers() != null;
ProfileCollectorBuilder collectorBuilder = new ProfileCollectorBuilder(doProfile);
if (searchFilter != null) {
BooleanQuery filtered = new BooleanQuery.Builder()
@ -136,10 +134,17 @@ public class AggregationPhase implements SearchPhase {
query = filtered;
}
try {
final Collector collector = collectorBuilder.wrap(globalsCollector, CollectorResult.REASON_AGGREGATION_GLOBAL);
if (doProfile) {
final Collector collector;
if (context.getProfilers() == null) {
collector = globalsCollector;
} else {
InternalProfileCollector profileCollector = new InternalProfileCollector(
globalsCollector, CollectorResult.REASON_AGGREGATION_GLOBAL,
// TODO: report on sub collectors
Collections.emptyList());
collector = profileCollector;
// start a new profile with this collector
context.getProfilers().addProfiler().setCollector((InternalProfileCollector)collector);
context.getProfilers().addProfiler().setCollector(profileCollector);
}
globalsCollector.preCollection();
context.searcher().search(query, collector);

View file

@ -123,8 +123,8 @@ public class CollectorResult implements ToXContent, Writeable {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder = builder.startObject()
.field(NAME.getPreferredName(), getName())
.field(REASON.getPreferredName(), getReason())
.field(NAME.getPreferredName(), toString())
.field(REASON.getPreferredName(), reason)
.field(TIME.getPreferredName(), String.format(Locale.US, "%.10gms", (double) (getTime() / 1000000.0)));
if (!children.isEmpty()) {

View file

@ -1,107 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.profile;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.MultiCollector;
import java.util.AbstractList;
import java.util.Collections;
import java.util.List;
/**
* Convenience object to help wrap collectors when profiling is enabled
*/
public class ProfileCollectorBuilder {
private final boolean profile;
private Collector currentCollector;
public ProfileCollectorBuilder(boolean profile) {
this.profile = profile;
}
/**
* If profiling is enabled, this will wrap the Collector in an InternalProfileCollector
* and record the hierarchy. Otherwise it just returns the provided Collector
*
* @param collector The collector to wrap
* @param reason The "hint" about what this collector is used for
* @return A wrapped collector if profiling is enabled, the original otherwise
*/
public Collector wrap(Collector collector, String reason) {
if (profile) {
if (currentCollector == null) {
collector = new InternalProfileCollector(collector, reason, Collections.emptyList());
} else {
final Collector child = currentCollector;
collector = new InternalProfileCollector(collector, reason,
Collections.singletonList((InternalProfileCollector) child));
}
currentCollector = collector;
}
return collector;
}
/**
* If profiling is enabled, this will wrap a MultiCollector in an InternalProfileCollector
* and record the hierarchy.
*
* Because MultiCollector does not have any methods to retrieve it's wrapped sub-collectors,
* the caller must provide those as an argument
*
* @param collector The collector to wrap
* @param reason The "hint" about what this collector is used for
* @param subCollectors The sub-collectors that are inside the multicollector
* @return
*/
public Collector wrapMultiCollector(Collector collector, String reason, List<Collector> subCollectors) {
if (!(collector instanceof MultiCollector)) {
// When there is a single collector to wrap, MultiCollector returns it
// directly, so use default wrapping logic for that case
return wrap(collector, reason);
}
if (profile) {
final List<InternalProfileCollector> children = new AbstractList<InternalProfileCollector>() {
@Override
public InternalProfileCollector get(int index) {
return (InternalProfileCollector) subCollectors.get(index);
}
@Override
public int size() {
return subCollectors.size();
}
};
if (currentCollector == null) {
collector = new InternalProfileCollector(collector, reason, Collections.emptyList());
} else {
collector = new InternalProfileCollector(collector, reason, children);
}
}
return collector;
}
}

View file

@ -59,7 +59,9 @@ import org.elasticsearch.search.sort.SortParseElement;
import org.elasticsearch.search.sort.TrackScoresParseElement;
import org.elasticsearch.search.suggest.SuggestPhase;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -166,7 +168,6 @@ public class QueryPhase implements SearchPhase {
int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
Collector collector;
ProfileCollectorBuilder collectorBuilder = new ProfileCollectorBuilder(doProfile);
Callable<TopDocs> topDocsCallable;
assert query == searcher.rewrite(query); // already rewritten
@ -174,7 +175,9 @@ public class QueryPhase implements SearchPhase {
if (searchContext.size() == 0) { // no matter what the value of from is
final TotalHitCountCollector totalHitCountCollector = new TotalHitCountCollector();
collector = totalHitCountCollector;
collector = collectorBuilder.wrap(collector, CollectorResult.REASON_SEARCH_COUNT);
if (searchContext.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_COUNT, Collections.emptyList());
}
topDocsCallable = new Callable<TopDocs>() {
@Override
public TopDocs call() throws Exception {
@ -229,7 +232,9 @@ public class QueryPhase implements SearchPhase {
topDocsCollector = TopScoreDocCollector.create(numDocs, lastEmittedDoc);
}
collector = topDocsCollector;
collector = collectorBuilder.wrap(collector, CollectorResult.REASON_SEARCH_TOP_HITS);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TOP_HITS, Collections.emptyList());
}
topDocsCallable = new Callable<TopDocs>() {
@Override
public TopDocs call() throws Exception {
@ -265,18 +270,26 @@ public class QueryPhase implements SearchPhase {
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
if (terminateAfterSet) {
final Collector child = collector;
// throws Lucene.EarlyTerminationException when given count is reached
collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
collector = collectorBuilder.wrap(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT,
Collections.singletonList((InternalProfileCollector) child));
}
}
if (searchContext.parsedPostFilter() != null) {
final Collector child = collector;
// this will only get applied to the actual search collector and not
// to any scoped collectors, also, it will only be applied to the main collector
// since that is where the filter should only work
final Weight filterWeight = searcher.createNormalizedWeight(searchContext.parsedPostFilter().query(), false);
collector = new FilteredCollector(collector, filterWeight);
collector = collectorBuilder.wrap(collector, CollectorResult.REASON_SEARCH_POST_FILTER);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_POST_FILTER,
Collections.singletonList((InternalProfileCollector) child));
}
}
// plug in additional collectors, like aggregations
@ -284,12 +297,30 @@ public class QueryPhase implements SearchPhase {
subCollectors.add(collector);
subCollectors.addAll(searchContext.queryCollectors().values());
collector = MultiCollector.wrap(subCollectors);
collector = collectorBuilder.wrapMultiCollector(collector, CollectorResult.REASON_SEARCH_MULTI, subCollectors);
if (doProfile && collector instanceof InternalProfileCollector == false) {
// When there is a single collector to wrap, MultiCollector returns it
// directly, so only wrap in the case that there are several sub collectors
final List<InternalProfileCollector> children = new AbstractList<InternalProfileCollector>() {
@Override
public InternalProfileCollector get(int index) {
return (InternalProfileCollector) subCollectors.get(index);
}
@Override
public int size() {
return subCollectors.size();
}
};
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_MULTI, children);
}
// apply the minimum score after multi collector so we filter aggs as well
if (searchContext.minimumScore() != null) {
final Collector child = collector;
collector = new MinimumScoreCollector(collector, searchContext.minimumScore());
collector = collectorBuilder.wrap(collector, CollectorResult.REASON_SEARCH_MIN_SCORE);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_MIN_SCORE,
Collections.singletonList((InternalProfileCollector) child));
}
}
if (collector.getClass() == TotalHitCountCollector.class) {
@ -334,10 +365,14 @@ public class QueryPhase implements SearchPhase {
final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis();
if (timeoutSet && collector != null) { // collector might be null if no collection is actually needed
final Collector child = collector;
// TODO: change to use our own counter that uses the scheduler in ThreadPool
// throws TimeLimitingCollector.TimeExceededException when timeout has reached
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeoutInMillis());
collector = collectorBuilder.wrap(collector, CollectorResult.REASON_SEARCH_TIMEOUT);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TIMEOUT,
Collections.singletonList((InternalProfileCollector) child));
}
}
try {
@ -362,7 +397,7 @@ public class QueryPhase implements SearchPhase {
queryResult.topDocs(topDocsCallable.call());
if (doProfile) {
if (searchContext.getProfilers() != null) {
List<ProfileShardResult> shardResults = Profiler.buildShardResults(searchContext.getProfilers().getProfilers());
searchContext.queryResult().profileResults(shardResults);
}