check parent circuit breaker when allocating empty bucket (#89568)

closes https://github.com/elastic/elasticsearch/issues/80789
This commit is contained in:
boicehuang 2022-08-25 00:16:22 +08:00 committed by GitHub
parent 773aeabf3d
commit 061e6432bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 33 deletions

View file

@ -0,0 +1,5 @@
pr: 89568
summary: check parent circuit breaker when allocating empty bucket
area: Aggregations
type: bug
issues: [80789]

View file

@ -408,7 +408,19 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext);
ListIterator<Bucket> iter = list.listIterator();
iterateEmptyBuckets(list, iter, key -> iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)));
iterateEmptyBuckets(list, iter, new LongConsumer() {
private int size = 0;
@Override
public void accept(long key) {
size++;
if (size >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(size);
size = 0;
}
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
}
});
}
private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, LongConsumer onBucket) {

View file

@ -400,7 +400,19 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
reduceContext
);
ListIterator<Bucket> iter = list.listIterator();
iterateEmptyBuckets(list, iter, key -> iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)));
iterateEmptyBuckets(list, iter, new DoubleConsumer() {
private int size;
@Override
public void accept(double key) {
size++;
if (size >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(size);
size = 0;
}
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
}
});
}
private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, DoubleConsumer onBucket) {

View file

@ -214,8 +214,7 @@ public class InternalDateHistogramTests extends InternalMultiBucketAggregationTe
}
public void testLargeReduce() {
expectReduceUsesTooManyBuckets(
new InternalDateHistogram(
InternalDateHistogram largeHisto = new InternalDateHistogram(
"h",
List.of(),
BucketOrder.key(true),
@ -225,15 +224,15 @@ public class InternalDateHistogramTests extends InternalMultiBucketAggregationTe
Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(),
InternalAggregations.EMPTY,
new LongBounds(
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2018-01-01T00:00:00Z"),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-01-01T00:00:00Z")
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2020-01-01T00:00:00Z"),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z")
)
),
DocValueFormat.RAW,
false,
null
),
100000
);
expectReduceUsesTooManyBuckets(largeHisto, 100000);
expectReduceThrowsRealMemoryBreaker(largeHisto);
}
}

View file

@ -103,19 +103,18 @@ public class InternalHistogramTests extends InternalMultiBucketAggregationTestCa
}
public void testLargeReduce() {
expectReduceUsesTooManyBuckets(
new InternalHistogram(
InternalHistogram largeHisto = new InternalHistogram(
"h",
List.of(),
BucketOrder.key(true),
0,
new InternalHistogram.EmptyBucketInfo(5e-10, 0, 0, 100, InternalAggregations.EMPTY),
new InternalHistogram.EmptyBucketInfo(5e-8, 0, 0, 100, InternalAggregations.EMPTY),
DocValueFormat.RAW,
false,
null
),
100000
);
expectReduceUsesTooManyBuckets(largeHisto, 100000);
expectReduceThrowsRealMemoryBreaker(largeHisto);
}
@Override

View file

@ -8,7 +8,11 @@
package org.elasticsearch.test;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
@ -33,6 +37,7 @@ import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
public abstract class InternalMultiBucketAggregationTestCase<T extends InternalAggregation & MultiBucketsAggregation> extends
@ -248,4 +253,30 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
Exception e = expectThrows(IllegalArgumentException.class, () -> agg.reduce(List.of(agg), reduceContext));
assertThat(e.getMessage(), equalTo("too big!"));
}
/**
* Expect that reducing this aggregation will break the real memory breaker.
*/
protected static void expectReduceThrowsRealMemoryBreaker(InternalAggregation agg) {
HierarchyCircuitBreakerService breaker = new HierarchyCircuitBreakerService(
Settings.builder().put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%").build(),
List.of(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
) {
@Override
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
super.checkParentLimit(newBytesReserved, label);
}
};
AggregationReduceContext reduceContext = new AggregationReduceContext.ForFinal(
BigArrays.NON_RECYCLING_INSTANCE,
null,
() -> false,
mock(AggregationBuilder.class),
v -> breaker.getBreaker("request").addEstimateBytesAndMaybeBreak(0, "test"),
PipelineTree.EMPTY
);
Exception e = expectThrows(CircuitBreakingException.class, () -> agg.reduce(List.of(agg), reduceContext));
assertThat(e.getMessage(), startsWith("[parent] Data too large, data for [test] "));
}
}