[ML] AIOps: Chunk groups of field candidates into single queries. (#188137)

## Summary

Part of #187684.

So far we ran individual queries for each field candidate to get
significant items. The historic reason is that we've been uber cautious
not to run into issues with the `max_buckets` setting. But since we
fetch the top 1k items and the `max_buckets` default is 65k it should be
safe to change that.

This PR updates fetching significant items to combine multiple field
candidates within one query using multiple aggs. The current setting in
this PR is now to add up to 50 field candidates into a single query.
This will result in up to ~50k buckets (50 x 1k buckets for the sig
terms agg plus 50 buckets for the cardinality aggs). If there's more
field candidates, we'll still make use of the async queue where we do up
to 5 queries in parallel.

The result is that for example for 200 field candidates we'll just do 4
queries instead of 200 previously.

Previous:

<img width="1624" alt="image"
src="https://github.com/user-attachments/assets/1e11ff1c-a0c2-4dcf-9399-27456439faad">


![aiops-log-rate-analysis-apm-0001](https://github.com/user-attachments/assets/67b6337e-a406-45bc-bb49-85ad047fcbe8)

After:

<img width="1554" alt="image"
src="https://github.com/user-attachments/assets/33ccb9ef-fe5b-4945-a87f-77347ba097ea">


### Checklist

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
Walter Rafelsberger 2024-07-17 18:04:39 +02:00 committed by GitHub
parent 1228b4c778
commit c8c27c4820
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 88 additions and 87 deletions

View file

@ -28,7 +28,7 @@ import { getRequestBase } from './get_request_base';
export const getSignificantTermRequest = (
params: AiopsLogRateAnalysisSchema,
fieldName: string,
fieldNames: string[],
{ wrap }: RandomSamplerWrapper
): estypes.SearchRequest => {
const query = getQueryWithParams({
@ -56,18 +56,18 @@ export const getSignificantTermRequest = (
];
}
const pValueAgg: Record<
'sig_term_p_value' | 'distinct_count',
estypes.AggregationsAggregationContainer
> = {
const fieldCandidateAggs = fieldNames.reduce<
Record<string, estypes.AggregationsAggregationContainer>
>((aggs, fieldName, index) => {
// Used to identify fields with only one distinct value which we'll ignore in the analysis.
distinct_count: {
aggs[`distinct_count_${index}`] = {
cardinality: {
field: fieldName,
},
},
// Used to calculate the p-value for terms of the field.
sig_term_p_value: {
};
// Used to calculate the p-value for the field.
aggs[`sig_term_p_value_${index}`] = {
significant_terms: {
field: fieldName,
background_filter: {
@ -90,13 +90,15 @@ export const getSignificantTermRequest = (
p_value: { background_is_superset: false },
size: 1000,
},
},
};
};
return aggs;
}, {});
const body = {
query,
size: 0,
aggs: wrap(pValueAgg),
aggs: wrap(fieldCandidateAggs),
};
return {
@ -137,21 +139,20 @@ export const fetchSignificantTermPValues = async ({
const result: SignificantItem[] = [];
const settledPromises = await Promise.allSettled(
fieldNames.map((fieldName) =>
esClient.search(getSignificantTermRequest(params, fieldName, randomSamplerWrapper), {
signal: abortSignal,
maxRetries: 0,
})
)
const resp = await esClient.search(
getSignificantTermRequest(params, fieldNames, randomSamplerWrapper),
{
signal: abortSignal,
maxRetries: 0,
}
);
function reportError(fieldName: string, error: unknown) {
if (!isRequestAbortedError(error)) {
if (resp.aggregations === undefined) {
if (!isRequestAbortedError(resp)) {
if (logger) {
logger.error(
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
`Failed to fetch p-value aggregation for field names ${fieldNames.join()}, got: \n${JSON.stringify(
resp,
null,
2
)}`
@ -159,40 +160,23 @@ export const fetchSignificantTermPValues = async ({
}
if (emitError) {
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
emitError(`Failed to fetch p-value aggregation for field names "${fieldNames.join()}".`);
}
}
return result;
}
for (const [index, settledPromise] of settledPromises.entries()) {
const fieldName = fieldNames[index];
if (settledPromise.status === 'rejected') {
reportError(fieldName, settledPromise.reason);
// Still continue the analysis even if individual p-value queries fail.
continue;
}
const resp = settledPromise.value;
if (resp.aggregations === undefined) {
reportError(fieldName, resp);
// Still continue the analysis even if individual p-value queries fail.
continue;
}
const overallResult = (
randomSamplerWrapper.unwrap(resp.aggregations) as Record<'sig_term_p_value', Aggs>
).sig_term_p_value;
const unwrappedResp = randomSamplerWrapper.unwrap(resp.aggregations) as Record<string, Aggs>;
for (const [index, fieldName] of fieldNames.entries()) {
const pValueBuckets = unwrappedResp[`sig_term_p_value_${index}`];
const distinctCount = (
randomSamplerWrapper.unwrap(resp.aggregations) as Record<
'distinct_count',
estypes.AggregationsCardinalityAggregate
>
).distinct_count.value;
unwrappedResp[
`distinct_count_${index}`
] as unknown as estypes.AggregationsCardinalityAggregate
).value;
for (const bucket of overallResult.buckets) {
for (const bucket of pValueBuckets.buckets) {
const pValue = Math.exp(-bucket.score);
if (
@ -209,8 +193,8 @@ export const fetchSignificantTermPValues = async ({
fieldValue: String(bucket.key),
doc_count: bucket.doc_count,
bg_count: bucket.bg_count,
total_doc_count: overallResult.doc_count,
total_bg_count: overallResult.bg_count,
total_doc_count: pValueBuckets.doc_count,
total_bg_count: pValueBuckets.bg_count,
score: bucket.score,
pValue,
normalizedScore: getNormalizedScore(bucket.score),

View file

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
export const QUEUE_CHUNKING_SIZE = 50;
export interface KeywordFieldCandidates {
keywordFieldCandidates: string[];
}
export const isKeywordFieldCandidates = (d: unknown): d is KeywordFieldCandidates =>
isPopulatedObject(d, ['keywordFieldCandidates']);
export interface TextFieldCandidates {
textFieldCandidates: string[];
}
export const isTextFieldCandidates = (d: unknown): d is KeywordFieldCandidates =>
isPopulatedObject(d, ['textFieldCandidates']);
export type QueueFieldCandidate = KeywordFieldCandidates | TextFieldCandidates;

View file

@ -6,6 +6,7 @@
*/
import { queue } from 'async';
import { chunk } from 'lodash';
import { SIGNIFICANT_ITEM_TYPE, type SignificantItem } from '@kbn/ml-agg-utils';
import { i18n } from '@kbn/i18n';
@ -17,10 +18,15 @@ import type {
AiopsLogRateAnalysisSchema,
AiopsLogRateAnalysisApiVersion as ApiVersion,
} from '@kbn/aiops-log-rate-analysis/api/schema';
import type { QueueFieldCandidate } from '@kbn/aiops-log-rate-analysis/queue_field_candidates';
import {
isKeywordFieldCandidates,
isTextFieldCandidates,
QUEUE_CHUNKING_SIZE,
} from '@kbn/aiops-log-rate-analysis/queue_field_candidates';
import { isRequestAbortedError } from '@kbn/aiops-common/is_request_aborted_error';
import { fetchSignificantCategories } from '@kbn/aiops-log-rate-analysis/queries/fetch_significant_categories';
import { fetchSignificantTermPValues } from '@kbn/aiops-log-rate-analysis/queries/fetch_significant_term_p_values';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import {
LOADED_FIELD_CANDIDATES,
@ -29,20 +35,6 @@ import {
} from '../response_stream_utils/constants';
import type { ResponseStreamFetchOptions } from '../response_stream_factory';
interface FieldCandidate {
fieldCandidate: string;
}
const isFieldCandidate = (d: unknown): d is FieldCandidate =>
isPopulatedObject(d, ['fieldCandidate']);
interface TextFieldCandidate {
textFieldCandidate: string;
}
const isTextFieldCandidate = (d: unknown): d is FieldCandidate =>
isPopulatedObject(d, ['textFieldCandidate']);
type Candidate = FieldCandidate | TextFieldCandidate;
export const significantItemsHandlerFactory =
<T extends ApiVersion>({
abortSignal,
@ -82,8 +74,6 @@ export const significantItemsHandlerFactory =
) ?? [])
);
const fieldsToSample = new Set<string>();
let remainingFieldCandidates: string[];
let loadingStepSizePValues = PROGRESS_STEP_P_VALUES;
@ -104,9 +94,12 @@ export const significantItemsHandlerFactory =
const loadingStep =
(1 / (fieldCandidatesCount + textFieldCandidatesCount)) * loadingStepSizePValues;
const pValuesQueue = queue(async function (payload: Candidate) {
if (isFieldCandidate(payload)) {
const { fieldCandidate } = payload;
const pValuesQueue = queue(async function (payload: QueueFieldCandidate) {
let queueItemLoadingStep = 0;
if (isKeywordFieldCandidates(payload)) {
const { keywordFieldCandidates: fieldNames } = payload;
queueItemLoadingStep = loadingStep * fieldNames.length;
let pValues: Awaited<ReturnType<typeof fetchSignificantTermPValues>>;
try {
@ -117,34 +110,30 @@ export const significantItemsHandlerFactory =
emitError: responseStream.pushError,
arguments: {
...requestBody,
fieldNames: [fieldCandidate],
fieldNames,
sampleProbability: stateHandler.sampleProbability(),
},
});
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch p-values for '${fieldCandidate}', got: \n${e.toString()}`
`Failed to fetch p-values for ${fieldNames.join()}, got: \n${e.toString()}`
);
responseStream.pushError(`Failed to fetch p-values for '${fieldCandidate}'.`);
responseStream.pushError(`Failed to fetch p-values for ${fieldNames.join()}.`);
}
return;
}
remainingFieldCandidates = remainingFieldCandidates.filter((d) => d !== fieldCandidate);
remainingFieldCandidates = remainingFieldCandidates.filter((d) => !fieldNames.includes(d));
if (pValues.length > 0) {
pValues.forEach((d) => {
fieldsToSample.add(d.fieldName);
});
significantTerms.push(...pValues);
responseStream.push(addSignificantItems(pValues));
fieldValuePairsCount += pValues.length;
}
} else if (isTextFieldCandidate(payload)) {
const { textFieldCandidate } = payload;
} else if (isTextFieldCandidates(payload)) {
const { textFieldCandidates: fieldNames } = payload;
queueItemLoadingStep = loadingStep * fieldNames.length;
const significantCategoriesForField = await fetchSignificantCategories({
esClient,
@ -153,7 +142,7 @@ export const significantItemsHandlerFactory =
abortSignal,
arguments: {
...requestBody,
fieldNames: [textFieldCandidate],
fieldNames,
sampleProbability: stateHandler.sampleProbability(),
},
});
@ -165,7 +154,7 @@ export const significantItemsHandlerFactory =
}
}
stateHandler.loaded(loadingStep, false);
stateHandler.loaded(queueItemLoadingStep, false);
responseStream.push(
updateLoadingState({
@ -186,10 +175,14 @@ export const significantItemsHandlerFactory =
);
}, MAX_CONCURRENT_QUERIES);
// This chunks keyword and text field candidates, then passes them on
// to the async queue for processing. Each chunk will be part of a single
// query using multiple aggs for each candidate. For many candidates,
// on top of that the async queue will process multiple queries concurrently.
pValuesQueue.push(
[
...textFieldCandidates.map((d) => ({ textFieldCandidate: d })),
...fieldCandidates.map((d) => ({ fieldCandidate: d })),
...chunk(textFieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({ textFieldCandidates: d })),
...chunk(fieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({ keywordFieldCandidates: d })),
],
(err) => {
if (err) {