[ML] Explain Log Rate Spikes: Replace chunks of queries with concurrent queue. (#144220)

The queries for p-values and histograms were done in chunks of 10 parallel queries. The drawback with this approach was that if just one of these 10 queries was a lot slower, we'd still have to wait for it to finish before we could start the next chunk.  This PR replaces the chunking approach with an async concurrent queue of up to 10 queries. The difference is that as soon as the first of the 10 first queries finishes, we can start another query and don't have to wait for the slower ones to finish.

For this PR the `async` library is added to `package.json`, however it's not a completely new library being added since it was already used as a dependency of other packages we use in Kibana.
This commit is contained in:
Walter Rafelsberger 2022-11-04 16:19:08 +01:00 committed by GitHub
parent b5ee95d053
commit 4b617042f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 179 additions and 171 deletions

View file

@ -459,6 +459,7 @@
"adm-zip": "^0.5.9",
"antlr4ts": "^0.5.0-alpha.3",
"archiver": "^5.3.1",
"async": "^3.2.3",
"axios": "^0.27.2",
"base64-js": "^1.3.1",
"bitmap-sdf": "^1.0.3",
@ -804,6 +805,7 @@
"@testing-library/user-event": "^13.5.0",
"@types/apidoc": "^0.22.3",
"@types/archiver": "^5.3.1",
"@types/async": "^3.2.3",
"@types/babel__core": "^7.1.19",
"@types/babel__generator": "^7.6.4",
"@types/babel__helper-plugin-utils": "^7.10.0",

View file

@ -5,18 +5,22 @@
* 2.0.
*/
import { chunk } from 'lodash';
import { queue } from 'async';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { i18n } from '@kbn/i18n';
import { asyncForEach } from '@kbn/std';
import type { IRouter } from '@kbn/core/server';
import { KBN_FIELD_TYPES } from '@kbn/field-types';
import type { Logger } from '@kbn/logging';
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
import { streamFactory } from '@kbn/aiops-utils';
import type { ChangePoint, NumericChartData, NumericHistogramField } from '@kbn/ml-agg-utils';
import type {
ChangePoint,
ChangePointGroup,
NumericChartData,
NumericHistogramField,
} from '@kbn/ml-agg-utils';
import { fetchHistogramsForFields } from '@kbn/ml-agg-utils';
import { stringHash } from '@kbn/ml-string-hash';
@ -209,6 +213,8 @@ export const defineExplainLogRateSpikesRoute = (
loaded += LOADED_FIELD_CANDIDATES;
const fieldCandidatesCount = fieldCandidates.length;
push(
updateLoadingStateAction({
ccsWarning: false,
@ -219,14 +225,14 @@ export const defineExplainLogRateSpikesRoute = (
defaultMessage:
'Identified {fieldCandidatesCount, plural, one {# field candidate} other {# field candidates}}.',
values: {
fieldCandidatesCount: fieldCandidates.length,
fieldCandidatesCount,
},
}
),
})
);
if (fieldCandidates.length === 0) {
if (fieldCandidatesCount === 0) {
endWithUpdatedLoadingState();
} else if (shouldStop) {
logDebugMessage('shouldStop after fetching field candidates.');
@ -239,24 +245,20 @@ export const defineExplainLogRateSpikesRoute = (
// Don't use more than 10 here otherwise Kibana will emit an error
// regarding a limit of abort signal listeners of more than 10.
const CHUNK_SIZE = 10;
let chunkCount = 0;
const fieldCandidatesChunks = chunk(fieldCandidates, CHUNK_SIZE);
const MAX_CONCURRENT_QUERIES = 10;
logDebugMessage('Fetch p-values.');
for (const fieldCandidatesChunk of fieldCandidatesChunks) {
chunkCount++;
logDebugMessage(
`Fetch p-values. Chunk ${chunkCount} of ${fieldCandidatesChunks.length}`
);
const pValuesQueue = queue(async function (fieldCandidate: string) {
loaded += (1 / fieldCandidatesCount) * PROGRESS_STEP_P_VALUES;
let pValues: Awaited<ReturnType<typeof fetchChangePointPValues>>;
try {
pValues = await fetchChangePointPValues(
client,
request.body,
fieldCandidatesChunk,
[fieldCandidate],
logger,
sampleProbability,
pushError,
@ -265,13 +267,11 @@ export const defineExplainLogRateSpikesRoute = (
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch p-values for ${JSON.stringify(
fieldCandidatesChunk
)}, got: \n${e.toString()}`
`Failed to fetch p-values for '${fieldCandidate}', got: \n${e.toString()}`
);
pushError(`Failed to fetch p-values for ${JSON.stringify(fieldCandidatesChunk)}.`);
} // Still continue the analysis even if chunks of p-value queries fail.
continue;
pushError(`Failed to fetch p-values for '${fieldCandidate}'.`);
}
return;
}
if (pValues.length > 0) {
@ -279,12 +279,10 @@ export const defineExplainLogRateSpikesRoute = (
fieldsToSample.add(d.fieldName);
});
changePoints.push(...pValues);
}
loaded += (1 / fieldCandidatesChunks.length) * PROGRESS_STEP_P_VALUES;
if (pValues.length > 0) {
push(addChangePointsAction(pValues));
}
push(
updateLoadingStateAction({
ccsWarning: false,
@ -304,10 +302,13 @@ export const defineExplainLogRateSpikesRoute = (
if (shouldStop) {
logDebugMessage('shouldStop fetching p-values.');
pValuesQueue.kill();
end();
return;
}
}
}, MAX_CONCURRENT_QUERIES);
pValuesQueue.push(fieldCandidates);
await pValuesQueue.drain();
if (changePoints.length === 0) {
logDebugMessage('Stopping analysis, did not find change points.');
@ -572,84 +573,84 @@ export const defineExplainLogRateSpikesRoute = (
logDebugMessage(`Fetch ${changePointGroups.length} group histograms.`);
const changePointGroupsChunks = chunk(changePointGroups, CHUNK_SIZE);
for (const changePointGroupsChunk of changePointGroupsChunks) {
const groupHistogramQueue = queue(async function (cpg: ChangePointGroup) {
if (shouldStop) {
logDebugMessage('shouldStop abort fetching group histograms.');
groupHistogramQueue.kill();
end();
return;
}
await asyncForEach(changePointGroupsChunk, async (cpg) => {
if (overallTimeSeries !== undefined) {
const histogramQuery = getHistogramQuery(
request.body,
cpg.group.map((d) => ({
term: { [d.fieldName]: d.fieldValue },
}))
);
if (overallTimeSeries !== undefined) {
const histogramQuery = getHistogramQuery(
request.body,
cpg.group.map((d) => ({
term: { [d.fieldName]: d.fieldValue },
}))
);
let cpgTimeSeries: NumericChartData;
try {
cpgTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
abortSignal
)) as [NumericChartData]
)[0];
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch the histogram data for group #${
cpg.id
}, got: \n${e.toString()}`
);
pushError(`Failed to fetch the histogram data for group #${cpg.id}.`);
}
return;
let cpgTimeSeries: NumericChartData;
try {
cpgTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
abortSignal
)) as [NumericChartData]
)[0];
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch the histogram data for group #${
cpg.id
}, got: \n${e.toString()}`
);
pushError(`Failed to fetch the histogram data for group #${cpg.id}.`);
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpgTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
push(
addChangePointsGroupHistogramAction([
{
id: cpg.id,
histogram,
},
])
);
return;
}
});
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpgTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
push(
addChangePointsGroupHistogramAction([
{
id: cpg.id,
histogram,
},
])
);
}
}, MAX_CONCURRENT_QUERIES);
groupHistogramQueue.push(changePointGroups);
await groupHistogramQueue.drain();
}
} catch (e) {
if (!isRequestAbortedError(e)) {
@ -667,90 +668,90 @@ export const defineExplainLogRateSpikesRoute = (
// time series filtered by fields
if (changePoints.length > 0 && overallTimeSeries !== undefined) {
const changePointsChunks = chunk(changePoints, CHUNK_SIZE);
for (const changePointsChunk of changePointsChunks) {
const fieldValueHistogramQueue = queue(async function (cp: ChangePoint) {
if (shouldStop) {
logDebugMessage('shouldStop abort fetching field/value histograms.');
fieldValueHistogramQueue.kill();
end();
return;
}
await asyncForEach(changePointsChunk, async (cp) => {
if (overallTimeSeries !== undefined) {
const histogramQuery = getHistogramQuery(request.body, [
{
term: { [cp.fieldName]: cp.fieldValue },
},
]);
if (overallTimeSeries !== undefined) {
const histogramQuery = getHistogramQuery(request.body, [
{
term: { [cp.fieldName]: cp.fieldValue },
},
]);
let cpTimeSeries: NumericChartData;
let cpTimeSeries: NumericChartData;
try {
cpTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
abortSignal
)) as [NumericChartData]
)[0];
} catch (e) {
logger.error(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${
cp.fieldValue
}", got: \n${e.toString()}`
);
pushError(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`
);
return;
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
const { fieldName, fieldValue } = cp;
loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS;
pushHistogramDataLoadingState();
push(
addChangePointsHistogramAction([
{
fieldName,
fieldValue,
histogram,
},
])
try {
cpTimeSeries = (
(await fetchHistogramsForFields(
client,
request.body.index,
histogramQuery,
// fields
[
{
fieldName: request.body.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
abortSignal
)) as [NumericChartData]
)[0];
} catch (e) {
logger.error(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${
cp.fieldValue
}", got: \n${e.toString()}`
);
pushError(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`
);
return;
}
});
}
const histogram =
overallTimeSeries.data.map((o, i) => {
const current = cpTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_change_point: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
const { fieldName, fieldValue } = cp;
loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS;
pushHistogramDataLoadingState();
push(
addChangePointsHistogramAction([
{
fieldName,
fieldValue,
histogram,
},
])
);
}
}, MAX_CONCURRENT_QUERIES);
fieldValueHistogramQueue.push(changePoints);
await fieldValueHistogramQueue.drain();
}
endWithUpdatedLoadingState();

View file

@ -34,8 +34,8 @@ export default ({ getService }: FtrProviderContext) => {
};
const expected = {
chunksLength: 13,
actionsLength: 12,
chunksLength: 34,
actionsLength: 33,
noIndexChunksLength: 4,
noIndexActionsLength: 3,
changePointFilter: 'add_change_points',

View file

@ -6140,6 +6140,11 @@
resolved "https://registry.yarnpkg.com/@types/aria-query/-/aria-query-4.2.0.tgz#14264692a9d6e2fa4db3df5e56e94b5e25647ac0"
integrity sha512-iIgQNzCm0v7QMhhe4Jjn9uRh+I6GoPmt03CbEtwx3ao8/EfoQcmgtqH4vQ5Db/lxiIGaWDv6nwvunuh0RyX0+A==
"@types/async@^3.2.3":
version "3.2.15"
resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.15.tgz#26d4768fdda0e466f18d6c9918ca28cc89a4e1fe"
integrity sha512-PAmPfzvFA31mRoqZyTVsgJMsvbynR429UTTxhmfsUCrWGh3/fxOrzqBtaTPJsn4UtzTv4Vb0+/O7CARWb69N4g==
"@types/babel__core@*", "@types/babel__core@^7.1.14", "@types/babel__core@^7.1.19":
version "7.1.19"
resolved "https://registry.yarnpkg.com/@types/babel__core/-/babel__core-7.1.19.tgz#7b497495b7d1b4812bdb9d02804d0576f43ee460"