[ML] Explain Log Rate Spikes: Make use of abort signal for ES queries. (#143683)

So far we passed on the abort signal from the client to possibly cancel the analysis, but the signal was not passed on to the ES queries to cancel those. That means the analysis could be cancelled after each step but it did not cancel ES queries that were already running. This PR takes the already existing abort signal and passes it on to all ES queries.

This surfaced an issue with running too many queries in parallel: We didn't have a limit so far when fetching the histogram data. With using the abort signals, Kibana would report a warning if more than 10 queries were run at once. The PR updates fetching histogram data to also do it in chunks of 10 queries like we already do for the p-value aggregation. So the larger bulk of the file diff is the result of wrapping the histogram queries inside a for of to iterate over the chunks of queries.
This commit is contained in:
Walter Rafelsberger 2022-10-25 18:02:44 +02:00 committed by GitHub
parent 13bb47adf6
commit df9dbcd630
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 323 additions and 197 deletions

View file

@ -29,7 +29,8 @@ export const fetchAggIntervals = async (
query: estypes.QueryDslQueryContainer,
fields: HistogramField[],
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal
): Promise<NumericColumnStatsMap> => {
const numericColumns = fields.filter((field) => {
return field.type === KBN_FIELD_TYPES.NUMBER || field.type === KBN_FIELD_TYPES.DATE;
@ -49,16 +50,19 @@ export const fetchAggIntervals = async (
return aggs;
}, {} as Record<string, object>);
const body = await client.search({
index: indexPattern,
size: 0,
body: {
query,
aggs: buildSamplerAggregation(minMaxAggs, samplerShardSize),
const body = await client.search(
{
index: indexPattern,
size: 0,
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
body: {
query,
aggs: buildSamplerAggregation(minMaxAggs, samplerShardSize),
size: 0,
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
},
},
});
{ signal: abortSignal, maxRetries: 0 }
);
const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize);
const aggregations = aggsPath.length > 0 ? get(body.aggregations, aggsPath) : body.aggregations;

View file

@ -146,7 +146,8 @@ export const fetchHistogramsForFields = async (
query: any,
fields: FieldsForHistograms,
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal
) => {
const aggIntervals = {
...(await fetchAggIntervals(
@ -155,7 +156,8 @@ export const fetchHistogramsForFields = async (
query,
fields.filter((f) => !isNumericHistogramFieldWithColumnStats(f)),
samplerShardSize,
runtimeMappings
runtimeMappings,
abortSignal
)),
...fields.filter(isNumericHistogramFieldWithColumnStats).reduce((p, field) => {
const { interval, min, max, fieldName } = field;
@ -209,7 +211,7 @@ export const fetchHistogramsForFields = async (
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
},
},
{ maxRetries: 0 }
{ signal: abortSignal, maxRetries: 0 }
);
const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize);

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isRequestAbortedError } from './is_request_aborted_error';
describe('isRequestAbortedError', () => {
it('returns false for a string', () => {
expect(isRequestAbortedError('the-error')).toBe(false);
});
it('returns false for a an object without a name field', () => {
expect(isRequestAbortedError({ error: 'the-error' })).toBe(false);
});
it(`returns false for a an object with a name field other than 'RequestAbortedError'`, () => {
expect(isRequestAbortedError({ name: 'the-error' })).toBe(false);
});
it(`returns true for a an object with a name field that contains 'RequestAbortedError'`, () => {
expect(isRequestAbortedError({ name: 'RequestAbortedError' })).toBe(true);
});
});

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
interface RequestAbortedError extends Error {
name: 'RequestAbortedError';
}
export function isRequestAbortedError(arg: unknown): arg is RequestAbortedError {
return isPopulatedObject(arg, ['name']) && arg.name === 'RequestAbortedError';
}

View file

@ -34,6 +34,7 @@ import {
} from '../../common/api/explain_log_rate_spikes';
import { API_ENDPOINT } from '../../common/api';
import { isRequestAbortedError } from '../lib/is_request_aborted_error';
import type { AiopsLicense } from '../types';
import { fetchChangePointPValues } from './queries/fetch_change_point_p_values';
@ -92,6 +93,7 @@ export const defineExplainLogRateSpikesRoute = (
const client = (await context.core).elasticsearch.client.asCurrentUser;
const controller = new AbortController();
const abortSignal = controller.signal;
let isRunning = false;
let loaded = 0;
@ -129,9 +131,13 @@ export const defineExplainLogRateSpikesRoute = (
}
function end() {
isRunning = false;
logDebugMessage('Ending analysis.');
streamEnd();
if (isRunning) {
isRunning = false;
logDebugMessage('Ending analysis.');
streamEnd();
} else {
logDebugMessage('end() was called again with isRunning already being false.');
}
}
function endWithUpdatedLoadingState() {
@ -178,10 +184,12 @@ export const defineExplainLogRateSpikesRoute = (
let fieldCandidates: Awaited<ReturnType<typeof fetchFieldCandidates>>;
try {
fieldCandidates = await fetchFieldCandidates(client, request.body);
fieldCandidates = await fetchFieldCandidates(client, request.body, abortSignal);
} catch (e) {
logger.error(`Failed to fetch field candidates, got: \n${e.toString()}`);
pushError(`Failed to fetch field candidates.`);
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch field candidates, got: \n${e.toString()}`);
pushError(`Failed to fetch field candidates.`);
}
end();
return;
}
@ -208,16 +216,20 @@ export const defineExplainLogRateSpikesRoute = (
if (fieldCandidates.length === 0) {
endWithUpdatedLoadingState();
} else if (shouldStop) {
logDebugMessage('shouldStop after fetching field candidates.');
end();
return;
}
const changePoints: ChangePoint[] = [];
const fieldsToSample = new Set<string>();
const chunkSize = 10;
// Don't use more than 10 here otherwise Kibana will emit an error
// regarding a limit of abort signal listeners of more than 10.
const CHUNK_SIZE = 10;
let chunkCount = 0;
const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize);
const fieldCandidatesChunks = chunk(fieldCandidates, CHUNK_SIZE);
logDebugMessage('Fetch p-values.');
@ -233,16 +245,18 @@ export const defineExplainLogRateSpikesRoute = (
request.body,
fieldCandidatesChunk,
logger,
pushError
pushError,
abortSignal
);
} catch (e) {
logger.error(
`Failed to fetch p-values for ${JSON.stringify(
fieldCandidatesChunk
)}, got: \n${e.toString()}`
);
pushError(`Failed to fetch p-values for ${JSON.stringify(fieldCandidatesChunk)}.`);
// Still continue the analysis even if chunks of p-value queries fail.
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch p-values for ${JSON.stringify(
fieldCandidatesChunk
)}, got: \n${e.toString()}`
);
pushError(`Failed to fetch p-values for ${JSON.stringify(fieldCandidatesChunk)}.`);
} // Still continue the analysis even if chunks of p-value queries fail.
continue;
}
@ -267,7 +281,7 @@ export const defineExplainLogRateSpikesRoute = (
defaultMessage:
'Identified {fieldValuePairsCount, plural, one {# significant field/value pair} other {# significant field/value pairs}}.',
values: {
fieldValuePairsCount: changePoints?.length ?? 0,
fieldValuePairsCount: changePoints.length,
},
}
),
@ -276,13 +290,12 @@ export const defineExplainLogRateSpikesRoute = (
if (shouldStop) {
logDebugMessage('shouldStop fetching p-values.');
end();
return;
}
}
if (changePoints?.length === 0) {
if (changePoints.length === 0) {
logDebugMessage('Stopping analysis, did not find change points.');
endWithUpdatedLoadingState();
return;
@ -305,12 +318,15 @@ export const defineExplainLogRateSpikesRoute = (
histogramFields,
// samplerShardSize
-1,
undefined
undefined,
abortSignal
)) as [NumericChartData]
)[0];
} catch (e) {
logger.error(`Failed to fetch the overall histogram data, got: \n${e.toString()}`);
pushError(`Failed to fetch overall histogram data.`);
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch the overall histogram data, got: \n${e.toString()}`);
pushError(`Failed to fetch overall histogram data.`);
}
// Still continue the analysis even if loading the overall histogram fails.
}
@ -329,6 +345,12 @@ export const defineExplainLogRateSpikesRoute = (
);
}
if (shouldStop) {
logDebugMessage('shouldStop after fetching overall histogram.');
end();
return;
}
if (groupingEnabled) {
logDebugMessage('Group results.');
@ -374,9 +396,16 @@ export const defineExplainLogRateSpikesRoute = (
request.body.deviationMin,
request.body.deviationMax,
logger,
pushError
pushError,
abortSignal
);
if (shouldStop) {
logDebugMessage('shouldStop after fetching frequent_items.');
end();
return;
}
if (fields.length > 0 && df.length > 0) {
// The way the `frequent_items` aggregations works could return item sets that include
// field/value pairs that are not part of the original list of significant change points.
@ -517,172 +546,208 @@ export const defineExplainLogRateSpikesRoute = (
pushHistogramDataLoadingState();
logDebugMessage('Fetch group histograms.');
if (shouldStop) {
logDebugMessage('shouldStop after grouping.');
end();
return;
}
await asyncForEach(changePointGroups, async (cpg) => {
if (overallTimeSeries !== undefined) {
const histogramQuery = {
bool: {
filter: cpg.group.map((d) => ({
term: { [d.fieldName]: d.fieldValue },
})),
},
};
logDebugMessage(`Fetch ${changePointGroups.length} group histograms.`);
let cpgTimeSeries: NumericChartData;
try {
cpgTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined
)) as [NumericChartData]
)[0];
} catch (e) {
logger.error(
`Failed to fetch the histogram data for group #${
cpg.id
}, got: \n${e.toString()}`
);
pushError(`Failed to fetch the histogram data for group #${cpg.id}.`);
return;
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpgTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
const changePointGroupsChunks = chunk(changePointGroups, CHUNK_SIZE);
push(
addChangePointsGroupHistogramAction([
{
id: cpg.id,
histogram,
},
])
);
for (const changePointGroupsChunk of changePointGroupsChunks) {
if (shouldStop) {
logDebugMessage('shouldStop abort fetching group histograms.');
end();
return;
}
});
await asyncForEach(changePointGroupsChunk, async (cpg) => {
if (overallTimeSeries !== undefined) {
const histogramQuery = {
bool: {
filter: cpg.group.map((d) => ({
term: { [d.fieldName]: d.fieldValue },
})),
},
};
let cpgTimeSeries: NumericChartData;
try {
cpgTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
abortSignal
)) as [NumericChartData]
)[0];
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch the histogram data for group #${
cpg.id
}, got: \n${e.toString()}`
);
pushError(`Failed to fetch the histogram data for group #${cpg.id}.`);
}
return;
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpgTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
push(
addChangePointsGroupHistogramAction([
{
id: cpg.id,
histogram,
},
])
);
}
});
}
}
} catch (e) {
logger.error(
`Failed to transform field/value pairs into groups, got: \n${e.toString()}`
);
pushError(`Failed to transform field/value pairs into groups.`);
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to transform field/value pairs into groups, got: \n${e.toString()}`
);
pushError(`Failed to transform field/value pairs into groups.`);
}
}
}
loaded += PROGRESS_STEP_HISTOGRAMS_GROUPS;
logDebugMessage('Fetch field/value histograms.');
logDebugMessage(`Fetch ${changePoints.length} field/value histograms.`);
// time series filtered by fields
if (changePoints && overallTimeSeries !== undefined) {
await asyncForEach(changePoints, async (cp) => {
if (overallTimeSeries !== undefined) {
const histogramQuery = {
bool: {
filter: [
{
term: { [cp.fieldName]: cp.fieldValue },
},
],
},
};
if (changePoints.length > 0 && overallTimeSeries !== undefined) {
const changePointsChunks = chunk(changePoints, CHUNK_SIZE);
let cpTimeSeries: NumericChartData;
for (const changePointsChunk of changePointsChunks) {
if (shouldStop) {
logDebugMessage('shouldStop abort fetching field/value histograms.');
end();
return;
}
try {
cpTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
await asyncForEach(changePointsChunk, async (cp) => {
if (overallTimeSeries !== undefined) {
const histogramQuery = {
bool: {
filter: [
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
term: { [cp.fieldName]: cp.fieldValue },
},
],
// samplerShardSize
-1,
undefined
)) as [NumericChartData]
)[0];
} catch (e) {
logger.error(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${
cp.fieldValue
}", got: \n${e.toString()}`
);
pushError(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`
);
return;
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
const { fieldName, fieldValue } = cp;
loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS;
pushHistogramDataLoadingState();
push(
addChangePointsHistogramAction([
{
fieldName,
fieldValue,
histogram,
},
])
);
}
});
};
let cpTimeSeries: NumericChartData;
try {
cpTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
abortSignal
)) as [NumericChartData]
)[0];
} catch (e) {
logger.error(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${
cp.fieldValue
}", got: \n${e.toString()}`
);
pushError(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`
);
return;
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
const { fieldName, fieldValue } = cp;
loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS;
pushHistogramDataLoadingState();
push(
addChangePointsHistogramAction([
{
fieldName,
fieldValue,
histogram,
},
])
);
}
});
}
}
endWithUpdatedLoadingState();
} catch (e) {
logger.error(`Explain log rate spikes analysis failed to finish, got: \n${e.toString()}`);
pushError(`Explain log rate spikes analysis failed to finish.`);
if (!isRequestAbortedError(e)) {
logger.error(
`Explain log rate spikes analysis failed to finish, got: \n${e.toString()}`
);
pushError(`Explain log rate spikes analysis failed to finish.`);
}
end();
}
}

View file

@ -13,6 +13,8 @@ import { ChangePoint } from '@kbn/ml-agg-utils';
import { SPIKE_ANALYSIS_THRESHOLD } from '../../../common/constants';
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
import { isRequestAbortedError } from '../../lib/is_request_aborted_error';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
@ -95,28 +97,34 @@ export const fetchChangePointPValues = async (
params: AiopsExplainLogRateSpikesSchema,
fieldNames: string[],
logger: Logger,
emitError: (m: string) => void
emitError: (m: string) => void,
abortSignal?: AbortSignal
): Promise<ChangePoint[]> => {
const result: ChangePoint[] = [];
const settledPromises = await Promise.allSettled(
fieldNames.map((fieldName) =>
esClient.search<unknown, { change_point_p_value: Aggs }>(
getChangePointRequest(params, fieldName)
getChangePointRequest(params, fieldName),
{
signal: abortSignal,
maxRetries: 0,
}
)
)
);
function reportError(fieldName: string, error: unknown) {
logger.error(
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
// Still continue the analysis even if individual p-value queries fail.
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
}
}
for (const [index, settledPromise] of settledPromises.entries()) {

View file

@ -47,14 +47,18 @@ export const getRandomDocsRequest = (
export const fetchFieldCandidates = async (
esClient: ElasticsearchClient,
params: AiopsExplainLogRateSpikesSchema
params: AiopsExplainLogRateSpikesSchema,
abortSignal?: AbortSignal
): Promise<string[]> => {
const { index } = params;
// Get all supported fields
const respMapping = await esClient.fieldCaps({
index,
fields: '*',
});
const respMapping = await esClient.fieldCaps(
{
index,
fields: '*',
},
{ signal: abortSignal, maxRetries: 0 }
);
const finalFieldCandidates: Set<string> = new Set([]);
const acceptableFields: Set<string> = new Set();
@ -69,7 +73,10 @@ export const fetchFieldCandidates = async (
}
});
const resp = await esClient.search(getRandomDocsRequest(params));
const resp = await esClient.search(getRandomDocsRequest(params), {
signal: abortSignal,
maxRetries: 0,
});
const sampledDocs = resp.hits.hits.map((d) => d.fields ?? {});
// Get all field names for each returned doc and flatten it

View file

@ -56,7 +56,8 @@ export async function fetchFrequentItems(
deviationMin: number,
deviationMax: number,
logger: Logger,
emitError: (m: string) => void
emitError: (m: string) => void,
abortSignal?: AbortSignal
) {
// get unique fields from change points
const fields = [...new Set(changePoints.map((t) => t.fieldName))];
@ -127,7 +128,7 @@ export async function fetchFrequentItems(
track_total_hits: true,
},
},
{ maxRetries: 0 }
{ signal: abortSignal, maxRetries: 0 }
);
if (body.aggregations === undefined) {