[ML] AIOps Log Rate Analysis: Merge fetch queue for keyword and text field candidates. (#183649)

## Summary

Part of #181111.

So far we identified significant `text` field candidates before
`keyword` field candidates. There were 2 problems with this: 1) The
loading behaviour for `text` fields was not considered in the progress
bar. 2) Analysing a `text` field can be quite slow we weren't taking
advantage of the same queue we used to fetch `keyword` fields. If it was
taking quite a while to analysis the `text` fields, it would appear the
progress bar was "stuck". And we had to wait to finish those slow
queries before we started fetching p-values for `keyword` fields.

This PR improves the behaviour by adding `text` field fetching to the
same queue that fetches `keyword` fields. This means while a maybe
slower `text` field is analysed, we can continue to fetch `keyword`
based fields in parallel given the max concurrency.

It's a bit tricky to test because with the local datasets the analysis
might be so fast you won't notice a difference. So for testing you could
do the following: In the file `significant_items_handler.ts` at the top
add:

```
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
```

And further down around line `110` add the following delay:

```
   const pValuesQueue = queue(async function (payload: Candidate) {
      await delay(Math.random() * 10000);

      if (isFieldCandidate(payload)) {
 ...
```

This will randomly delay the next call for the text/keyword field so you
should see the progress bar moving. The difference to before would be
that the progress bar would not move while it analyses the text fields.

### Checklist

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
Walter Rafelsberger 2024-05-21 11:34:31 +02:00 committed by GitHub
parent d43c8f94f3
commit 4c9d65b767
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 85 additions and 53 deletions

View file

@ -36,6 +36,7 @@ export const indexInfoHandlerFactory =
let fieldCandidatesCount = fieldCandidates.length;
const textFieldCandidates: string[] = [];
let textFieldCandidatesCount = textFieldCandidates.length;
let zeroDocsFallback = false;
@ -68,6 +69,7 @@ export const indexInfoHandlerFactory =
fieldCandidates.push(...indexInfo.fieldCandidates);
fieldCandidatesCount = fieldCandidates.length;
textFieldCandidates.push(...indexInfo.textFieldCandidates);
textFieldCandidatesCount = textFieldCandidates.length;
zeroDocsFallback = indexInfo.zeroDocsFallback;
} catch (e) {
if (!isRequestAbortedError(e)) {
@ -92,7 +94,7 @@ export const indexInfoHandlerFactory =
defaultMessage:
'Identified {fieldCandidatesCount, plural, one {# field candidate} other {# field candidates}}.',
values: {
fieldCandidatesCount,
fieldCandidatesCount: fieldCandidatesCount + textFieldCandidatesCount,
},
}
),

View file

@ -18,9 +18,9 @@ import type {
AiopsLogRateAnalysisApiVersion as ApiVersion,
} from '@kbn/aiops-log-rate-analysis/api/schema';
import { isRequestAbortedError } from '@kbn/aiops-common/is_request_aborted_error';
import { fetchSignificantCategories } from '@kbn/aiops-log-rate-analysis/queries/fetch_significant_categories';
import { fetchSignificantTermPValues } from '@kbn/aiops-log-rate-analysis/queries/fetch_significant_term_p_values';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import {
LOADED_FIELD_CANDIDATES,
@ -29,6 +29,20 @@ import {
} from '../response_stream_utils/constants';
import type { ResponseStreamFetchOptions } from '../response_stream_factory';
interface FieldCandidate {
fieldCandidate: string;
}
const isFieldCandidate = (d: unknown): d is FieldCandidate =>
isPopulatedObject(d, ['fieldCandidate']);
interface TextFieldCandidate {
textFieldCandidate: string;
}
const isTextFieldCandidate = (d: unknown): d is FieldCandidate =>
isPopulatedObject(d, ['textFieldCandidate']);
type Candidate = FieldCandidate | TextFieldCandidate;
export const significantItemsHandlerFactory =
<T extends ApiVersion>({
abortSignal,
@ -47,6 +61,7 @@ export const significantItemsHandlerFactory =
textFieldCandidates: string[];
}) => {
let fieldCandidatesCount = fieldCandidates.length;
const textFieldCandidatesCount = textFieldCandidates.length;
// This will store the combined count of detected significant log patterns and keywords
let fieldValuePairsCount = 0;
@ -59,25 +74,6 @@ export const significantItemsHandlerFactory =
) ?? [])
);
// Get significant categories of text fields
if (textFieldCandidates.length > 0) {
significantCategories.push(
...(await fetchSignificantCategories(
client,
requestBody,
textFieldCandidates,
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
))
);
if (significantCategories.length > 0) {
responseStream.push(addSignificantItemsAction(significantCategories));
}
}
const significantTerms: SignificantItem[] = [];
significantTerms.push(
@ -105,39 +101,67 @@ export const significantItemsHandlerFactory =
logDebugMessage('Fetch p-values.');
const pValuesQueue = queue(async function (fieldCandidate: string) {
stateHandler.loaded((1 / fieldCandidatesCount) * loadingStepSizePValues, false);
const loadingStep =
(1 / (fieldCandidatesCount + textFieldCandidatesCount)) * loadingStepSizePValues;
let pValues: Awaited<ReturnType<typeof fetchSignificantTermPValues>>;
const pValuesQueue = queue(async function (payload: Candidate) {
if (isFieldCandidate(payload)) {
const { fieldCandidate } = payload;
let pValues: Awaited<ReturnType<typeof fetchSignificantTermPValues>>;
try {
pValues = await fetchSignificantTermPValues(
try {
pValues = await fetchSignificantTermPValues(
client,
requestBody,
[fieldCandidate],
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
);
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch p-values for '${fieldCandidate}', got: \n${e.toString()}`
);
responseStream.pushError(`Failed to fetch p-values for '${fieldCandidate}'.`);
}
return;
}
remainingFieldCandidates = remainingFieldCandidates.filter((d) => d !== fieldCandidate);
if (pValues.length > 0) {
pValues.forEach((d) => {
fieldsToSample.add(d.fieldName);
});
significantTerms.push(...pValues);
responseStream.push(addSignificantItemsAction(pValues));
fieldValuePairsCount += pValues.length;
}
} else if (isTextFieldCandidate(payload)) {
const { textFieldCandidate } = payload;
const significantCategoriesForField = await fetchSignificantCategories(
client,
requestBody,
[fieldCandidate],
[textFieldCandidate],
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
);
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch p-values for '${fieldCandidate}', got: \n${e.toString()}`);
responseStream.pushError(`Failed to fetch p-values for '${fieldCandidate}'.`);
if (significantCategoriesForField.length > 0) {
significantCategories.push(...significantCategoriesForField);
responseStream.push(addSignificantItemsAction(significantCategoriesForField));
fieldValuePairsCount += significantCategoriesForField.length;
}
return;
}
remainingFieldCandidates = remainingFieldCandidates.filter((d) => d !== fieldCandidate);
if (pValues.length > 0) {
pValues.forEach((d) => {
fieldsToSample.add(d.fieldName);
});
significantTerms.push(...pValues);
responseStream.push(addSignificantItemsAction(pValues));
}
stateHandler.loaded(loadingStep, false);
responseStream.push(
updateLoadingStateAction({
@ -158,18 +182,24 @@ export const significantItemsHandlerFactory =
);
}, MAX_CONCURRENT_QUERIES);
pValuesQueue.push(fieldCandidates, (err) => {
if (err) {
logger.error(`Failed to fetch p-values.', got: \n${err.toString()}`);
responseStream.pushError(`Failed to fetch p-values.`);
pValuesQueue.kill();
responseStream.end();
} else if (stateHandler.shouldStop()) {
logDebugMessage('shouldStop fetching p-values.');
pValuesQueue.kill();
responseStream.end();
pValuesQueue.push(
[
...textFieldCandidates.map((d) => ({ textFieldCandidate: d })),
...fieldCandidates.map((d) => ({ fieldCandidate: d })),
],
(err) => {
if (err) {
logger.error(`Failed to fetch p-values.', got: \n${err.toString()}`);
responseStream.pushError(`Failed to fetch p-values.`);
pValuesQueue.kill();
responseStream.end();
} else if (stateHandler.shouldStop()) {
logDebugMessage('shouldStop fetching p-values.');
pValuesQueue.kill();
responseStream.end();
}
}
});
);
await pValuesQueue.drain();
fieldValuePairsCount = significantCategories.length + significantTerms.length;