[ML] AIOps: Chunk groups of field candidates into single queries for top items and histograms. (#189155)

## Summary

Follow up to #188137.
Part of #187684.

- Groups chunks of terms aggregations for field candidates when running
the fallback to get top terms instead of significant terms when either
baseline or deviation time range contains no documents.
- Groups chunks of histogram aggregations for the data for the mini
histogram charts. Previously we reused the code for the transform/dfa
data grid mini histograms for this, it's now refactored to an optimized
version for log rate analysis.
- Adds `withSpan` wrappers to group log rate analysis steps for APM
(magenta bars in the "after" screenshot).
- Removes some no longer used code from API version 1.
- Disables support for `boolean` fields, it doesn't work properly with
the `frequent_item_sets` aggregations.
- Fixes the loading step sizes to correct the loading progress bar going
from 0-100%.

Before:

<img width="480" alt="image"
src="https://github.com/user-attachments/assets/dc316166-8f2b-4b0f-84a4-6813f69cd10a">

After:

<img width="500" alt="image"
src="https://github.com/user-attachments/assets/4c532c76-42a0-4321-a261-3b7cf9bbd361">


### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
Walter Rafelsberger 2024-08-05 10:12:20 +02:00 committed by GitHub
parent a00085e57d
commit 22ac46c799
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 787 additions and 527 deletions

View file

@ -14,11 +14,12 @@ export { numberValidator } from './src/validate_number';
export type {
FieldsForHistograms,
NumericDataItem,
NumericChartData,
NumericHistogramField,
} from './src/fetch_histograms_for_fields';
export { isMultiBucketAggregate } from './src/is_multi_bucket_aggregate';
export { isSignificantItem } from './src/type_guards';
export { isSignificantItem, isSignificantItemGroup } from './src/type_guards';
export { SIGNIFICANT_ITEM_TYPE } from './src/types';
export type {
AggCardinality,

View file

@ -31,6 +31,11 @@ interface AggHistogram {
histogram: {
field: string;
interval: number;
min_doc_count?: number;
extended_bounds?: {
min: number;
max: number;
};
};
}
@ -45,7 +50,7 @@ interface AggTerms {
* Represents an item in numeric data.
* @interface
*/
interface NumericDataItem {
export interface NumericDataItem {
/**
* The numeric key.
*/

View file

@ -7,7 +7,7 @@
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import type { SignificantItem } from './types';
import type { SignificantItem, SignificantItemGroup } from './types';
/**
* Type guard for a significant item.
@ -15,7 +15,7 @@ import type { SignificantItem } from './types';
* for a p-value based variant, not a generic significant terms
* aggregation type.
* @param arg The unknown type to be evaluated
* @returns whether arg is of type SignificantItem
* @returns Return whether arg is of type SignificantItem
*/
export function isSignificantItem(arg: unknown): arg is SignificantItem {
return isPopulatedObject(arg, [
@ -32,3 +32,11 @@ export function isSignificantItem(arg: unknown): arg is SignificantItem {
'normalizedScore',
]);
}
/**
* Type guard to check if the given argument is a SignificantItemGroup.
* @param arg The unknown type to be evaluated
* @returns Return whether arg is of type SignificantItemGroup
*/
export function isSignificantItemGroup(arg: unknown): arg is SignificantItemGroup {
return isPopulatedObject(arg, ['id', 'group', 'docCount', 'pValue']);
}

View file

@ -180,25 +180,13 @@ interface SignificantItemHistogramItemBase {
}
/**
* @deprecated since version 2 of internal log rate analysis REST API endpoint
* Represents a data item in a significant term histogram.
*/
interface SignificantItemHistogramItemV1 extends SignificantItemHistogramItemBase {
/** The document count for this item in the significant term context. */
doc_count_significant_term: number;
}
interface SignificantItemHistogramItemV2 extends SignificantItemHistogramItemBase {
export interface SignificantItemHistogramItem extends SignificantItemHistogramItemBase {
/** The document count for this histogram item in the significant item context. */
doc_count_significant_item: number;
}
/**
* Represents a data item in a significant term histogram.
*/
export type SignificantItemHistogramItem =
| SignificantItemHistogramItemV1
| SignificantItemHistogramItemV2;
/**
* Represents histogram data for a field/value pair.
* @interface

View file

@ -8,7 +8,27 @@
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
import { paramsMock } from './__mocks__/params_match_all';
import { getBaselineOrDeviationFilter, getCategoryRequest } from './fetch_categories';
import {
getBaselineOrDeviationFilter,
getCategoryRequest,
isMsearchResponseItemWithAggs,
} from './fetch_categories';
describe('isMsearchResponseItemWithAggs', () => {
it('returns true if the argument is an MsearchMultiSearchItem with aggregations', () => {
const arg = {
aggregations: {},
};
expect(isMsearchResponseItemWithAggs(arg)).toBe(true);
});
it('returns false if the argument is not an MsearchMultiSearchItem with aggregations', () => {
const arg = {};
expect(isMsearchResponseItemWithAggs(arg)).toBe(false);
});
});
describe('getBaselineOrDeviationFilter', () => {
it('returns a filter that matches both baseline and deviation time range', () => {

View file

@ -10,6 +10,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { Logger } from '@kbn/logging';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import {
createRandomSamplerWrapper,
type RandomSamplerWrapper,
@ -23,6 +24,10 @@ import type { AiopsLogRateAnalysisSchema } from '../api/schema';
import { getQueryWithParams } from './get_query_with_params';
export const isMsearchResponseItemWithAggs = (
arg: unknown
): arg is estypes.MsearchMultiSearchItem => isPopulatedObject(arg, ['aggregations']);
// Filter that includes docs from both the baseline and deviation time range.
export const getBaselineOrDeviationFilter = (
params: AiopsLogRateAnalysisSchema
@ -111,21 +116,21 @@ export const fetchCategories = async (
const result: FetchCategoriesResponse[] = [];
const settledPromises = await Promise.allSettled(
fieldNames.map((fieldName) => {
const request = getCategoryRequest(params, fieldName, randomSamplerWrapper);
return esClient.search(request, {
signal: abortSignal,
maxRetries: 0,
});
})
);
const searches: estypes.MsearchRequestItem[] = fieldNames.flatMap((fieldName) => [
{ index: params.index },
getCategoryRequest(params, fieldName, randomSamplerWrapper)
.body as estypes.MsearchMultisearchBody,
]);
function reportError(fieldName: string, error: unknown) {
let mSearchResponse;
try {
mSearchResponse = await esClient.msearch({ searches }, { signal: abortSignal, maxRetries: 0 });
} catch (error) {
if (!isRequestAbortedError(error)) {
if (logger) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
`Failed to fetch category aggregation for field names ${fieldNames.join()}, got: \n${JSON.stringify(
error,
null,
2
@ -134,56 +139,60 @@ export const fetchCategories = async (
}
if (emitError) {
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
emitError(`Failed to fetch category aggregation for field names "${fieldNames.join()}".`);
}
}
return result;
}
for (const [index, resp] of mSearchResponse.responses.entries()) {
const fieldName = fieldNames[index];
if (isMsearchResponseItemWithAggs(resp)) {
const { aggregations } = resp;
const {
categories: { buckets },
} = randomSamplerWrapper.unwrap(
aggregations as unknown as Record<string, estypes.AggregationsAggregate>
) as CategoriesAgg;
const categories: Category[] = buckets.map((b) => {
const sparkline =
b.sparkline === undefined
? {}
: b.sparkline.buckets.reduce<Record<number, number>>((acc2, cur2) => {
acc2[cur2.key] = cur2.doc_count;
return acc2;
}, {});
return {
key: b.key,
count: b.doc_count,
examples: b.examples.hits.hits.map((h) => get(h._source, fieldName)),
sparkline,
regex: b.regex,
};
});
result.push({
categories,
});
} else {
if (logger) {
logger.error(
`Failed to fetch category aggregation for field "${fieldName}", got: \n${JSON.stringify(
resp,
null,
2
)}`
);
}
if (emitError) {
emitError(`Failed to fetch category aggregation for field "${fieldName}".`);
}
}
}
for (const [index, settledPromise] of settledPromises.entries()) {
const fieldName = fieldNames[index];
if (settledPromise.status === 'rejected') {
reportError(fieldName, settledPromise.reason);
// Still continue the analysis even if individual category queries fail.
continue;
}
const resp = settledPromise.value;
const { aggregations } = resp;
if (aggregations === undefined) {
reportError(fieldName, resp);
// Still continue the analysis even if individual category queries fail.
continue;
}
const {
categories: { buckets },
} = randomSamplerWrapper.unwrap(
aggregations as unknown as Record<string, estypes.AggregationsAggregate>
) as CategoriesAgg;
const categories: Category[] = buckets.map((b) => {
const sparkline =
b.sparkline === undefined
? {}
: b.sparkline.buckets.reduce<Record<number, number>>((acc2, cur2) => {
acc2[cur2.key] = cur2.doc_count;
return acc2;
}, {});
return {
key: b.key,
count: b.doc_count,
examples: b.examples.hits.hits.map((h) => get(h._source, fieldName)),
sparkline,
regex: b.regex,
};
});
result.push({
categories,
});
}
return result;
};

View file

@ -92,13 +92,7 @@ export const fetchCategoryCounts = async (
let mSearchresponse;
try {
mSearchresponse = await esClient.msearch(
{ searches },
{
signal: abortSignal,
maxRetries: 0,
}
);
mSearchresponse = await esClient.msearch({ searches }, { signal: abortSignal, maxRetries: 0 });
} catch (error) {
if (!isRequestAbortedError(error)) {
if (logger) {

View file

@ -133,7 +133,6 @@ describe('fetchFieldCandidates', () => {
'event.type',
'fileset.name',
'host.architecture',
'host.containerized',
'host.hostname',
'host.ip',
'host.mac',

View file

@ -20,7 +20,8 @@ export const TEXT_FIELD_SAFE_LIST = ['message', 'error.message'];
export const SUPPORTED_ES_FIELD_TYPES = [
ES_FIELD_TYPES.KEYWORD,
ES_FIELD_TYPES.IP,
ES_FIELD_TYPES.BOOLEAN,
// Disabled boolean support because it causes problems with the `frequent_item_sets` aggregation
// ES_FIELD_TYPES.BOOLEAN,
];
export const SUPPORTED_ES_FIELD_TYPES_TEXT = [ES_FIELD_TYPES.TEXT, ES_FIELD_TYPES.MATCH_ONLY_TEXT];

View file

@ -97,7 +97,7 @@ export const fetchIndexInfo = async ({
return {
keywordFieldCandidates: fieldCandidates?.selectedKeywordFieldCandidates.sort() ?? [],
textFieldCandidates: fieldCandidates?.textFieldCandidates.sort() ?? [],
textFieldCandidates: fieldCandidates?.selectedTextFieldCandidates.sort() ?? [],
baselineTotalDocCount,
deviationTotalDocCount,
zeroDocsFallback: baselineTotalDocCount === 0 || deviationTotalDocCount === 0,

View file

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { Logger } from '@kbn/logging';
import type {
NumericChartData,
SignificantItemGroup,
SignificantItemGroupHistogram,
} from '@kbn/ml-agg-utils';
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
import { isRequestAbortedError } from '@kbn/aiops-common/is_request_aborted_error';
import { RANDOM_SAMPLER_SEED } from '../constants';
import type { AiopsLogRateAnalysisSchema } from '../api/schema';
import { getGroupFilter } from './get_group_filter';
import { getHistogramQuery } from './get_histogram_query';
import {
getMiniHistogramDataFromAggResponse,
getMiniHistogramAgg,
MINI_HISTOGRAM_AGG_PREFIX,
type MiniHistogramAgg,
} from './mini_histogram_utils';
export const fetchMiniHistogramsForSignificantGroups = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
significantGroups: SignificantItemGroup[],
overallTimeSeries: NumericChartData['data'],
logger: Logger,
// The default value of 1 means no sampling will be used
randomSamplerProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
): Promise<SignificantItemGroupHistogram[]> => {
const histogramQuery = getHistogramQuery(params);
const histogramAggs = significantGroups.reduce<
Record<string, estypes.AggregationsAggregationContainer>
>((aggs, significantGroup, index) => {
aggs[`${MINI_HISTOGRAM_AGG_PREFIX}${index}`] = {
filter: {
bool: { filter: getGroupFilter(significantGroup) },
},
aggs: getMiniHistogramAgg(params),
};
return aggs;
}, {});
const { wrap, unwrap } = createRandomSamplerWrapper({
probability: randomSamplerProbability,
seed: RANDOM_SAMPLER_SEED,
});
const resp = await esClient.search(
{
index: params.index,
size: 0,
body: {
query: histogramQuery,
aggs: wrap(histogramAggs),
size: 0,
},
},
{ signal: abortSignal, maxRetries: 0 }
);
if (resp.aggregations === undefined) {
if (!isRequestAbortedError(resp)) {
if (logger) {
logger.error(
`Failed to fetch the histogram data chunk, got: \n${JSON.stringify(resp, null, 2)}`
);
}
if (emitError) {
emitError(`Failed to fetch the histogram data chunk.`);
}
}
return [];
}
const unwrappedResp = unwrap(resp.aggregations) as Record<string, MiniHistogramAgg>;
return significantGroups.map((significantGroup, index) => ({
id: significantGroup.id,
histogram: getMiniHistogramDataFromAggResponse(overallTimeSeries, unwrappedResp, index),
}));
};

View file

@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { Logger } from '@kbn/logging';
import type {
NumericChartData,
SignificantItem,
SignificantItemHistogram,
} from '@kbn/ml-agg-utils';
import { isSignificantItem } from '@kbn/ml-agg-utils';
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
import { isRequestAbortedError } from '@kbn/aiops-common/is_request_aborted_error';
import { getCategoryQuery } from '@kbn/aiops-log-pattern-analysis/get_category_query';
import { RANDOM_SAMPLER_SEED } from '../constants';
import type { AiopsLogRateAnalysisSchema } from '../api/schema';
import { getHistogramQuery } from './get_histogram_query';
import {
getMiniHistogramDataFromAggResponse,
getMiniHistogramAgg,
MINI_HISTOGRAM_AGG_PREFIX,
type MiniHistogramAgg,
} from './mini_histogram_utils';
export const fetchMiniHistogramsForSignificantItems = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
significantItems: SignificantItem[],
overallTimeSeries: NumericChartData['data'],
logger: Logger,
// The default value of 1 means no sampling will be used
randomSamplerProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
): Promise<SignificantItemHistogram[]> => {
const histogramQuery = getHistogramQuery(params);
const histogramAggs = significantItems.reduce<
Record<string, estypes.AggregationsAggregationContainer>
>((aggs, significantItem, index) => {
let filter;
if (isSignificantItem(significantItem) && significantItem.type === 'keyword') {
filter = {
term: { [significantItem.fieldName]: significantItem.fieldValue },
};
} else if (isSignificantItem(significantItem) && significantItem.type === 'log_pattern') {
filter = getCategoryQuery(significantItem.fieldName, [
{
key: `${significantItem.key}`,
count: significantItem.doc_count,
examples: [],
regex: '',
},
]);
} else {
throw new Error('Invalid significant item type.');
}
aggs[`${MINI_HISTOGRAM_AGG_PREFIX}${index}`] = {
filter,
aggs: getMiniHistogramAgg(params),
};
return aggs;
}, {});
const { wrap, unwrap } = createRandomSamplerWrapper({
probability: randomSamplerProbability,
seed: RANDOM_SAMPLER_SEED,
});
const resp = await esClient.search(
{
index: params.index,
size: 0,
body: {
query: histogramQuery,
aggs: wrap(histogramAggs),
size: 0,
},
},
{ signal: abortSignal, maxRetries: 0 }
);
if (resp.aggregations === undefined) {
if (!isRequestAbortedError(resp)) {
if (logger) {
logger.error(
`Failed to fetch the histogram data chunk, got: \n${JSON.stringify(resp, null, 2)}`
);
}
if (emitError) {
emitError(`Failed to fetch the histogram data chunk.`);
}
}
return [];
}
const unwrappedResp = unwrap(resp.aggregations) as Record<string, MiniHistogramAgg>;
return significantItems.map((significantItem, index) => ({
fieldName: significantItem.fieldName,
fieldValue: significantItem.fieldValue,
histogram: getMiniHistogramDataFromAggResponse(overallTimeSeries, unwrappedResp, index),
}));
};

View file

@ -5,9 +5,10 @@
* 2.0.
*/
import { uniqBy } from 'lodash';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { Logger } from '@kbn/logging';
import { type SignificantItem, SIGNIFICANT_ITEM_TYPE } from '@kbn/ml-agg-utils';
import {

View file

@ -25,9 +25,11 @@ import { getRequestBase } from './get_request_base';
// TODO Consolidate with duplicate `fetchDurationFieldCandidates` in
// `x-pack/plugins/observability_solution/apm/server/routes/correlations/queries/fetch_failed_events_correlation_p_values.ts`
const TOP_TERM_AGG_PREFIX = 'top_terms_';
export const getTopTermRequest = (
params: AiopsLogRateAnalysisSchema,
fieldName: string,
fieldNames: string[],
{ wrap }: RandomSamplerWrapper
): estypes.SearchRequest => {
const query = getQueryWithParams({
@ -55,19 +57,24 @@ export const getTopTermRequest = (
];
}
const termAgg: Record<'log_rate_top_terms', estypes.AggregationsAggregationContainer> = {
log_rate_top_terms: {
terms: {
field: fieldName,
size: LOG_RATE_ANALYSIS_SETTINGS.TOP_TERMS_FALLBACK_SIZE,
},
const termAggs = fieldNames.reduce<Record<string, estypes.AggregationsAggregationContainer>>(
(aggs, fieldName, index) => {
aggs[`${TOP_TERM_AGG_PREFIX}${index}`] = {
terms: {
field: fieldName,
size: LOG_RATE_ANALYSIS_SETTINGS.TOP_TERMS_FALLBACK_SIZE,
},
};
return aggs;
},
};
{}
);
const body = {
query,
size: 0,
aggs: wrap(termAgg),
aggs: wrap(termAggs),
};
return {
@ -97,50 +104,36 @@ export const fetchTopTerms = async (
const result: SignificantItem[] = [];
const settledPromises = await Promise.allSettled(
fieldNames.map((fieldName) =>
esClient.search(getTopTermRequest(params, fieldName, randomSamplerWrapper), {
signal: abortSignal,
maxRetries: 0,
})
)
);
const resp = await esClient.search(getTopTermRequest(params, fieldNames, randomSamplerWrapper), {
signal: abortSignal,
maxRetries: 0,
});
function reportError(fieldName: string, error: unknown) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch term aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch term aggregation for fieldName "${fieldName}".`);
if (resp.aggregations === undefined) {
if (!isRequestAbortedError(resp)) {
if (logger) {
logger.error(
`Failed to fetch terms aggregation for field names ${fieldNames.join()}, got: \n${JSON.stringify(
resp,
null,
2
)}`
);
}
if (emitError) {
emitError(`Failed to fetch terms aggregation for field names ${fieldNames.join()}.`);
}
}
return result;
}
for (const [index, settledPromise] of settledPromises.entries()) {
const fieldName = fieldNames[index];
const unwrappedResp = randomSamplerWrapper.unwrap(resp.aggregations) as Record<string, Aggs>;
if (settledPromise.status === 'rejected') {
reportError(fieldName, settledPromise.reason);
// Still continue the analysis even if individual p-value queries fail.
continue;
}
for (const [index, fieldName] of fieldNames.entries()) {
const termBuckets = unwrappedResp[`${TOP_TERM_AGG_PREFIX}${index}`];
const resp = settledPromise.value;
if (resp.aggregations === undefined) {
reportError(fieldName, resp);
// Still continue the analysis even if individual p-value queries fail.
continue;
}
const overallResult = (
randomSamplerWrapper.unwrap(resp.aggregations) as Record<'log_rate_top_terms', Aggs>
).log_rate_top_terms;
for (const bucket of overallResult.buckets) {
for (const bucket of termBuckets.buckets) {
result.push({
key: `${fieldName}:${String(bucket.key)}`,
type: SIGNIFICANT_ITEM_TYPE.KEYWORD,

View file

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { NumericChartData } from '@kbn/ml-agg-utils';
import { getDateHistogramBuckets } from '../__mocks__/date_histogram';
import { paramsMock } from './__mocks__/params_match_all';
import {
getMiniHistogramAgg,
getMiniHistogramDataFromAggResponse,
type MiniHistogramAgg,
} from './mini_histogram_utils';
describe('getMiniHistogramAgg', () => {
it('returns DSL for a mini histogram aggregation', () => {
expect(getMiniHistogramAgg(paramsMock)).toStrictEqual({
mini_histogram: {
histogram: {
extended_bounds: {
max: 50,
min: 0,
},
field: 'the-time-field-name',
interval: 2.6315789473684212,
min_doc_count: 0,
},
},
});
});
});
describe('getMiniHistogramDataFromAggResponse', () => {
it('returns data for a mini histogram chart', () => {
// overall time series mock
const numericChartDataMock: NumericChartData['data'] = Object.entries(
getDateHistogramBuckets()
).map(([key, value]) => ({
doc_count: value,
key: parseInt(key, 10),
key_as_string: new Date(parseInt(key, 10)).toISOString(),
}));
// aggregation response mock
const aggResponseMock: Record<string, MiniHistogramAgg> = {
mini_histogram_0: {
doc_count: 0,
mini_histogram: {
buckets: Object.entries(getDateHistogramBuckets()).map(([key, value]) => ({
doc_count: Math.round(value / 10),
key: parseInt(key, 10),
key_as_string: new Date(parseInt(key, 10)).toISOString(),
})),
},
},
};
// we'll only check the first element to the returned array to avoid a large snapshot
expect(
getMiniHistogramDataFromAggResponse(numericChartDataMock, aggResponseMock, 0)[0]
).toStrictEqual({
// response should correctly calculate values based on the overall time series and the aggregation response
doc_count_overall: 4436,
doc_count_significant_item: 493,
key: 1654566600000,
key_as_string: '2022-06-07T01:50:00.000Z',
});
});
});

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { NumericChartData, SignificantItemHistogramItem } from '@kbn/ml-agg-utils';
import type { AiopsLogRateAnalysisSchema } from '../api/schema';
export interface MiniHistogramAgg extends estypes.AggregationsSingleBucketAggregateBase {
doc_count: number;
mini_histogram: {
buckets: Array<
estypes.AggregationsSingleBucketAggregateBase & estypes.AggregationsHistogramBucketKeys
>;
};
}
export const MINI_HISTOGRAM_AGG_PREFIX = 'mini_histogram_';
export const MINI_HISTOGRAM_BAR_TARGET = 20;
export const getMiniHistogramAgg = (params: AiopsLogRateAnalysisSchema) => {
return {
mini_histogram: {
histogram: {
field: params.timeFieldName,
interval: (params.end - params.start) / (MINI_HISTOGRAM_BAR_TARGET - 1),
min_doc_count: 0,
extended_bounds: {
min: params.start,
max: params.end,
},
},
},
};
};
export const getMiniHistogramDataFromAggResponse = (
overallTimeSeries: NumericChartData['data'],
aggReponse: Record<string, MiniHistogramAgg>,
index: number
): SignificantItemHistogramItem[] =>
overallTimeSeries.map((overallTimeSeriesItem) => {
const current = aggReponse[`${MINI_HISTOGRAM_AGG_PREFIX}${index}`].mini_histogram.buckets.find(
(bucket) => bucket.key_as_string === overallTimeSeriesItem.key_as_string
) ?? {
doc_count: 0,
};
return {
key: overallTimeSeriesItem.key,
key_as_string: overallTimeSeriesItem.key_as_string ?? '',
doc_count_significant_item: current.doc_count,
doc_count_overall: Math.max(0, overallTimeSeriesItem.doc_count - current.doc_count),
};
}) ?? [];

View file

@ -193,6 +193,8 @@ export const LogRateAnalysisResults: FC<LogRateAnalysisResultsProps> = ({
'Baseline rate',
'Deviation rate',
]);
// null is used as the uninitialized state to identify the first load.
const [skippedFields, setSkippedFields] = useState<string[] | null>(null);
const onGroupResultsToggle = (optionId: string) => {
setToggleIdSelected(optionId);
@ -207,20 +209,27 @@ export const LogRateAnalysisResults: FC<LogRateAnalysisResultsProps> = ({
fieldFilterSkippedItems,
keywordFieldCandidates,
textFieldCandidates,
selectedKeywordFieldCandidates,
selectedTextFieldCandidates,
} = fieldCandidates;
const fieldFilterButtonDisabled =
isRunning || fieldCandidates.isLoading || fieldFilterUniqueItems.length === 0;
const onFieldsFilterChange = (skippedFields: string[]) => {
// Set skipped fields only on first load, otherwise we'd overwrite the user's selection.
useEffect(() => {
if (skippedFields === null && fieldFilterSkippedItems.length > 0)
setSkippedFields(fieldFilterSkippedItems);
}, [fieldFilterSkippedItems, skippedFields]);
const onFieldsFilterChange = (skippedFieldsUpdate: string[]) => {
dispatch(resetResults());
setSkippedFields(skippedFieldsUpdate);
setOverrides({
loaded: 0,
remainingKeywordFieldCandidates: keywordFieldCandidates.filter(
(d) => !skippedFields.includes(d)
(d) => !skippedFieldsUpdate.includes(d)
),
remainingTextFieldCandidates: textFieldCandidates.filter(
(d) => !skippedFieldsUpdate.includes(d)
),
remainingTextFieldCandidates: textFieldCandidates.filter((d) => !skippedFields.includes(d)),
regroupOnly: false,
});
startHandler(true, false);
@ -287,8 +296,12 @@ export const LogRateAnalysisResults: FC<LogRateAnalysisResultsProps> = ({
if (!continueAnalysis) {
dispatch(resetResults());
setOverrides({
remainingKeywordFieldCandidates: selectedKeywordFieldCandidates,
remainingTextFieldCandidates: selectedTextFieldCandidates,
remainingKeywordFieldCandidates: keywordFieldCandidates.filter(
(d) => skippedFields === null || !skippedFields.includes(d)
),
remainingTextFieldCandidates: textFieldCandidates.filter(
(d) => skippedFields === null || !skippedFields.includes(d)
),
});
}

View file

@ -5,21 +5,19 @@
* 2.0.
*/
import { chunk } from 'lodash';
import { queue } from 'async';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { KBN_FIELD_TYPES } from '@kbn/field-types';
import { i18n } from '@kbn/i18n';
import {
fetchHistogramsForFields,
type SignificantItem,
type SignificantItemGroup,
type SignificantItemHistogramItem,
type NumericChartData,
import type {
SignificantItem,
SignificantItemGroup,
SignificantItemGroupHistogram,
NumericChartData,
} from '@kbn/ml-agg-utils';
import { RANDOM_SAMPLER_SEED } from '@kbn/aiops-log-rate-analysis/constants';
import { QUEUE_CHUNKING_SIZE } from '@kbn/aiops-log-rate-analysis/queue_field_candidates';
import {
addSignificantItemsGroup,
addSignificantItemsGroupHistogram,
@ -27,14 +25,19 @@ import {
} from '@kbn/aiops-log-rate-analysis/api/stream_reducer';
import type { AiopsLogRateAnalysisApiVersion as ApiVersion } from '@kbn/aiops-log-rate-analysis/api/schema';
import { isRequestAbortedError } from '@kbn/aiops-common/is_request_aborted_error';
import { fetchFrequentItemSets } from '@kbn/aiops-log-rate-analysis/queries/fetch_frequent_item_sets';
import { fetchTerms2CategoriesCounts } from '@kbn/aiops-log-rate-analysis/queries/fetch_terms_2_categories_counts';
import { getGroupFilter } from '@kbn/aiops-log-rate-analysis/queries/get_group_filter';
import { getHistogramQuery } from '@kbn/aiops-log-rate-analysis/queries/get_histogram_query';
import { getSignificantItemGroups } from '@kbn/aiops-log-rate-analysis/queries/get_significant_item_groups';
import { fetchMiniHistogramsForSignificantGroups } from '@kbn/aiops-log-rate-analysis/queries/fetch_mini_histograms_for_significant_groups';
import { MAX_CONCURRENT_QUERIES, PROGRESS_STEP_GROUPING } from '../response_stream_utils/constants';
import {
MAX_CONCURRENT_QUERIES,
LOADED_FIELD_CANDIDATES,
PROGRESS_STEP_P_VALUES,
PROGRESS_STEP_GROUPING,
PROGRESS_STEP_HISTOGRAMS,
PROGRESS_STEP_HISTOGRAMS_GROUPS,
} from '../response_stream_utils/constants';
import type { ResponseStreamFetchOptions } from '../response_stream_factory';
export const groupingHandlerFactory =
@ -50,7 +53,7 @@ export const groupingHandlerFactory =
async (
significantCategories: SignificantItem[],
significantTerms: SignificantItem[],
overallTimeSeries?: NumericChartData
overallTimeSeries?: NumericChartData['data']
) => {
logDebugMessage('Group results.');
@ -138,7 +141,12 @@ export const groupingHandlerFactory =
responseStream.push(addSignificantItemsGroup(significantItemGroups));
}
stateHandler.loaded(PROGRESS_STEP_GROUPING, false);
stateHandler.loaded(
LOADED_FIELD_CANDIDATES +
PROGRESS_STEP_P_VALUES +
PROGRESS_STEP_HISTOGRAMS +
PROGRESS_STEP_GROUPING
);
pushHistogramDataLoadingState();
if (stateHandler.shouldStop()) {
@ -149,7 +157,11 @@ export const groupingHandlerFactory =
logDebugMessage(`Fetch ${significantItemGroups.length} group histograms.`);
const groupHistogramQueue = queue(async function (cpg: SignificantItemGroup) {
const groupHistogramQueueChunks = chunk(significantItemGroups, QUEUE_CHUNKING_SIZE);
const loadingStepSize =
(1 / groupHistogramQueueChunks.length) * PROGRESS_STEP_HISTOGRAMS_GROUPS;
const groupHistogramQueue = queue(async function (payload: SignificantItemGroup[]) {
if (stateHandler.shouldStop()) {
logDebugMessage('shouldStop abort fetching group histograms.');
groupHistogramQueue.kill();
@ -158,71 +170,34 @@ export const groupingHandlerFactory =
}
if (overallTimeSeries !== undefined) {
const histogramQuery = getHistogramQuery(requestBody, getGroupFilter(cpg));
let histograms: SignificantItemGroupHistogram[];
let cpgTimeSeries: NumericChartData;
try {
cpgTimeSeries = (
(await fetchHistogramsForFields({
esClient,
abortSignal,
arguments: {
indexPattern: requestBody.index,
query: histogramQuery,
fields: [
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
},
})) as [NumericChartData]
)[0];
histograms = await fetchMiniHistogramsForSignificantGroups(
esClient,
requestBody,
payload,
overallTimeSeries,
logger,
stateHandler.sampleProbability(),
() => {},
abortSignal
);
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch the histogram data for group #${cpg.id}, got: \n${e.toString()}`
);
responseStream.pushError(
`Failed to fetch the histogram data for group #${cpg.id}.`
);
}
logger.error(
`Failed to fetch the histogram data chunk for groups, got: \n${e.toString()}`
);
responseStream.pushError(`Failed to fetch the histogram data chunk for groups.`);
return;
}
const histogram: SignificantItemHistogramItem[] =
overallTimeSeries.data.map((o) => {
const current = cpgTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_significant_item: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
responseStream.push(
addSignificantItemsGroupHistogram([
{
id: cpg.id,
histogram,
},
])
);
stateHandler.loaded(loadingStepSize, false);
pushHistogramDataLoadingState();
responseStream.push(addSignificantItemsGroupHistogram(histograms));
}
}, MAX_CONCURRENT_QUERIES);
await groupHistogramQueue.push(significantItemGroups);
await groupHistogramQueue.push(groupHistogramQueueChunks);
await groupHistogramQueue.drain();
}
} catch (e) {

View file

@ -5,26 +5,22 @@
* 2.0.
*/
import { chunk } from 'lodash';
import { queue } from 'async';
import { i18n } from '@kbn/i18n';
import { KBN_FIELD_TYPES } from '@kbn/field-types';
import type {
SignificantItem,
SignificantItemHistogramItem,
SignificantItemHistogram,
NumericChartData,
} from '@kbn/ml-agg-utils';
import { fetchHistogramsForFields } from '@kbn/ml-agg-utils';
import { RANDOM_SAMPLER_SEED } from '@kbn/aiops-log-rate-analysis/constants';
import { QUEUE_CHUNKING_SIZE } from '@kbn/aiops-log-rate-analysis/queue_field_candidates';
import {
addSignificantItemsHistogram,
updateLoadingState,
} from '@kbn/aiops-log-rate-analysis/api/stream_reducer';
import { fetchMiniHistogramsForSignificantItems } from '@kbn/aiops-log-rate-analysis/queries/fetch_mini_histograms_for_significant_items';
import type { AiopsLogRateAnalysisApiVersion as ApiVersion } from '@kbn/aiops-log-rate-analysis/api/schema';
import { getCategoryQuery } from '@kbn/aiops-log-pattern-analysis/get_category_query';
import { getHistogramQuery } from '@kbn/aiops-log-rate-analysis/queries/get_histogram_query';
import {
MAX_CONCURRENT_QUERIES,
@ -46,7 +42,7 @@ export const histogramHandlerFactory =
fieldValuePairsCount: number,
significantCategories: SignificantItem[],
significantTerms: SignificantItem[],
overallTimeSeries?: NumericChartData
overallTimeSeries?: NumericChartData['data']
) => {
function pushHistogramDataLoadingState() {
responseStream.push(
@ -67,11 +63,18 @@ export const histogramHandlerFactory =
// time series filtered by fields
if (
significantTerms.length > 0 &&
(significantTerms.length > 0 || significantCategories.length > 0) &&
overallTimeSeries !== undefined &&
!requestBody.overrides?.regroupOnly
) {
const fieldValueHistogramQueue = queue(async function (cp: SignificantItem) {
const fieldValueHistogramQueueChunks = [
...chunk(significantTerms, QUEUE_CHUNKING_SIZE),
...chunk(significantCategories, QUEUE_CHUNKING_SIZE),
];
const loadingStepSize =
(1 / fieldValueHistogramQueueChunks.length) * PROGRESS_STEP_HISTOGRAMS;
const fieldValueHistogramQueue = queue(async function (payload: SignificantItem[]) {
if (stateHandler.shouldStop()) {
logDebugMessage('shouldStop abort fetching field/value histograms.');
fieldValueHistogramQueue.kill();
@ -80,170 +83,32 @@ export const histogramHandlerFactory =
}
if (overallTimeSeries !== undefined) {
const histogramQuery = getHistogramQuery(requestBody, [
{
term: { [cp.fieldName]: cp.fieldValue },
},
]);
let cpTimeSeries: NumericChartData;
let histograms: SignificantItemHistogram[];
try {
cpTimeSeries = (
(await fetchHistogramsForFields({
esClient,
abortSignal,
arguments: {
indexPattern: requestBody.index,
query: histogramQuery,
fields: [
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
},
})) as [NumericChartData]
)[0];
histograms = await fetchMiniHistogramsForSignificantItems(
esClient,
requestBody,
payload,
overallTimeSeries,
logger,
stateHandler.sampleProbability(),
() => {},
abortSignal
);
} catch (e) {
logger.error(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${
cp.fieldValue
}", got: \n${e.toString()}`
);
responseStream.pushError(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`
);
logger.error(`Failed to fetch the histogram data chunk, got: \n${e.toString()}`);
responseStream.pushError(`Failed to fetch the histogram data chunk.`);
return;
}
const histogram: SignificantItemHistogramItem[] =
overallTimeSeries.data.map((o) => {
const current = cpTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_significant_item: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
const { fieldName, fieldValue } = cp;
stateHandler.loaded((1 / fieldValuePairsCount) * PROGRESS_STEP_HISTOGRAMS, false);
stateHandler.loaded(loadingStepSize, false);
pushHistogramDataLoadingState();
responseStream.push(
addSignificantItemsHistogram([
{
fieldName,
fieldValue,
histogram,
},
])
);
responseStream.push(addSignificantItemsHistogram(histograms));
}
}, MAX_CONCURRENT_QUERIES);
await fieldValueHistogramQueue.push(significantTerms);
await fieldValueHistogramQueue.push(fieldValueHistogramQueueChunks);
await fieldValueHistogramQueue.drain();
}
// histograms for text field patterns
if (
overallTimeSeries !== undefined &&
significantCategories.length > 0 &&
!requestBody.overrides?.regroupOnly
) {
const significantCategoriesHistogramQueries = significantCategories.map((d) => {
const histogramQuery = getHistogramQuery(requestBody);
const categoryQuery = getCategoryQuery(d.fieldName, [
{ key: `${d.key}`, count: d.doc_count, examples: [], regex: '' },
]);
if (Array.isArray(histogramQuery.bool?.filter)) {
histogramQuery.bool?.filter?.push(categoryQuery);
}
return histogramQuery;
});
for (const [i, histogramQuery] of significantCategoriesHistogramQueries.entries()) {
const cp = significantCategories[i];
let catTimeSeries: NumericChartData;
try {
catTimeSeries = (
(await fetchHistogramsForFields({
esClient,
abortSignal,
arguments: {
indexPattern: requestBody.index,
query: histogramQuery,
fields: [
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
},
})) as [NumericChartData]
)[0];
} catch (e) {
logger.error(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${
cp.fieldValue
}", got: \n${e.toString()}`
);
responseStream.pushError(
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`
);
return;
}
const histogram: SignificantItemHistogramItem[] =
overallTimeSeries.data.map((o) => {
const current = catTimeSeries.data.find(
(d1) => d1.key_as_string === o.key_as_string
) ?? {
doc_count: 0,
};
return {
key: o.key,
key_as_string: o.key_as_string ?? '',
doc_count_significant_item: current.doc_count,
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
};
}) ?? [];
const { fieldName, fieldValue } = cp;
stateHandler.loaded((1 / fieldValuePairsCount) * PROGRESS_STEP_HISTOGRAMS, false);
pushHistogramDataLoadingState();
responseStream.push(
addSignificantItemsHistogram([
{
fieldName,
fieldValue,
histogram,
},
])
);
}
}
};

View file

@ -6,7 +6,6 @@
*/
import { i18n } from '@kbn/i18n';
import {
updateLoadingState,
setZeroDocsFallback,

View file

@ -5,17 +5,16 @@
* 2.0.
*/
import { KBN_FIELD_TYPES } from '@kbn/field-types';
import {
fetchHistogramsForFields,
type NumericChartData,
type NumericHistogramField,
} from '@kbn/ml-agg-utils';
import { type NumericChartData } from '@kbn/ml-agg-utils';
import { RANDOM_SAMPLER_SEED } from '@kbn/aiops-log-rate-analysis/constants';
import type { AiopsLogRateAnalysisApiVersion as ApiVersion } from '@kbn/aiops-log-rate-analysis/api/schema';
import { isRequestAbortedError } from '@kbn/aiops-common/is_request_aborted_error';
import { getHistogramQuery } from '@kbn/aiops-log-rate-analysis/queries/get_histogram_query';
import {
getMiniHistogramAgg,
type MiniHistogramAgg,
} from '@kbn/aiops-log-rate-analysis/queries/mini_histogram_utils';
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
import type { ResponseStreamFetchOptions } from '../response_stream_factory';
@ -29,32 +28,32 @@ export const overallHistogramHandlerFactory =
responseStream,
stateHandler,
}: ResponseStreamFetchOptions<T>) =>
async () => {
const histogramFields: [NumericHistogramField] = [
{ fieldName: requestBody.timeFieldName, type: KBN_FIELD_TYPES.DATE },
];
async (): Promise<NumericChartData['data']> => {
logDebugMessage('Fetch overall histogram.');
let overallTimeSeries: NumericChartData | undefined;
const overallHistogramQuery = getHistogramQuery(requestBody);
const miniHistogramAgg = getMiniHistogramAgg(requestBody);
const { wrap, unwrap } = createRandomSamplerWrapper({
probability: stateHandler.sampleProbability() ?? 1,
seed: RANDOM_SAMPLER_SEED,
});
let resp;
try {
overallTimeSeries = (
(await fetchHistogramsForFields({
esClient,
abortSignal,
arguments: {
indexPattern: requestBody.index,
resp = await esClient.search(
{
index: requestBody.index,
size: 0,
body: {
query: overallHistogramQuery,
fields: histogramFields,
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
aggs: wrap(miniHistogramAgg),
size: 0,
},
})) as [NumericChartData]
)[0];
},
{ signal: abortSignal, maxRetries: 0 }
);
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch the overall histogram data, got: \n${e.toString()}`);
@ -66,8 +65,22 @@ export const overallHistogramHandlerFactory =
if (stateHandler.shouldStop()) {
logDebugMessage('shouldStop after fetching overall histogram.');
responseStream.end();
return;
return [];
}
return overallTimeSeries;
if (resp?.aggregations === undefined) {
if (!isRequestAbortedError(resp)) {
if (logger) {
logger.error(
`Failed to fetch the histogram data chunk, got: \n${JSON.stringify(resp, null, 2)}`
);
}
responseStream.pushError(`Failed to fetch the histogram data chunk.`);
}
return [];
}
const unwrappedResp = unwrap(resp.aggregations) as MiniHistogramAgg;
return unwrappedResp.mini_histogram.buckets;
};

View file

@ -53,9 +53,6 @@ export const significantItemsHandlerFactory =
keywordFieldCandidates: string[];
textFieldCandidates: string[];
}) => {
let keywordFieldCandidatesCount = keywordFieldCandidates.length;
const textFieldCandidatesCount = textFieldCandidates.length;
// This will store the combined count of detected significant log patterns and keywords
let fieldValuePairsCount = 0;
@ -83,7 +80,7 @@ export const significantItemsHandlerFactory =
loadingStepSizePValues =
LOADED_FIELD_CANDIDATES + PROGRESS_STEP_P_VALUES - requestBody.overrides?.loaded;
} else {
loadingStepSizePValues = LOADED_FIELD_CANDIDATES;
loadingStepSizePValues = PROGRESS_STEP_P_VALUES;
}
if (version === '2') {
@ -93,7 +90,6 @@ export const significantItemsHandlerFactory =
if (Array.isArray(overridesRemainingFieldCandidates)) {
keywordFieldCandidates.push(...overridesRemainingFieldCandidates);
remainingKeywordFieldCandidates = overridesRemainingFieldCandidates;
keywordFieldCandidatesCount = keywordFieldCandidates.length;
} else {
remainingKeywordFieldCandidates = keywordFieldCandidates;
}
@ -107,7 +103,6 @@ export const significantItemsHandlerFactory =
if (Array.isArray(overridesRemainingKeywordFieldCandidates)) {
keywordFieldCandidates.push(...overridesRemainingKeywordFieldCandidates);
remainingKeywordFieldCandidates = overridesRemainingKeywordFieldCandidates;
keywordFieldCandidatesCount = keywordFieldCandidates.length;
} else {
remainingKeywordFieldCandidates = keywordFieldCandidates;
}
@ -125,15 +120,17 @@ export const significantItemsHandlerFactory =
logDebugMessage('Fetch p-values.');
const loadingStep =
(1 / (keywordFieldCandidatesCount + textFieldCandidatesCount)) * loadingStepSizePValues;
const pValuesQueueChunks = [
...chunk(textFieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({ textFieldCandidates: d })),
...chunk(keywordFieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({
keywordFieldCandidates: d,
})),
];
const loadingStepSize = (1 / pValuesQueueChunks.length) * loadingStepSizePValues;
const pValuesQueue = queue(async function (payload: QueueFieldCandidate) {
let queueItemLoadingStep = 0;
if (isKeywordFieldCandidates(payload)) {
const { keywordFieldCandidates: fieldNames } = payload;
queueItemLoadingStep = loadingStep * fieldNames.length;
let pValues: Awaited<ReturnType<typeof fetchSignificantTermPValues>>;
try {
@ -169,7 +166,6 @@ export const significantItemsHandlerFactory =
}
} else if (isTextFieldCandidates(payload)) {
const { textFieldCandidates: fieldNames } = payload;
queueItemLoadingStep = loadingStep * fieldNames.length;
let significantCategoriesForField: Awaited<ReturnType<typeof fetchSignificantCategories>>;
@ -206,7 +202,7 @@ export const significantItemsHandlerFactory =
}
}
stateHandler.loaded(queueItemLoadingStep, false);
stateHandler.loaded(loadingStepSize, false);
responseStream.push(
updateLoadingState({
@ -232,26 +228,18 @@ export const significantItemsHandlerFactory =
// to the async queue for processing. Each chunk will be part of a single
// query using multiple aggs for each candidate. For many candidates,
// on top of that the async queue will process multiple queries concurrently.
pValuesQueue.push(
[
...chunk(textFieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({ textFieldCandidates: d })),
...chunk(keywordFieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({
keywordFieldCandidates: d,
})),
],
(err) => {
if (err) {
logger.error(`Failed to fetch p-values.', got: \n${err.toString()}`);
responseStream.pushError(`Failed to fetch p-values.`);
pValuesQueue.kill();
responseStream.end();
} else if (stateHandler.shouldStop()) {
logDebugMessage('shouldStop fetching p-values.');
pValuesQueue.kill();
responseStream.end();
}
pValuesQueue.push(pValuesQueueChunks, (err) => {
if (err) {
logger.error(`Failed to fetch p-values.', got: \n${err.toString()}`);
responseStream.pushError(`Failed to fetch p-values.`);
pValuesQueue.kill();
responseStream.end();
} else if (stateHandler.shouldStop()) {
logDebugMessage('shouldStop fetching p-values.');
pValuesQueue.kill();
responseStream.end();
}
);
});
await pValuesQueue.drain();
fieldValuePairsCount = significantCategories.length + significantTerms.length;

View file

@ -6,6 +6,7 @@
*/
import { queue } from 'async';
import { chunk } from 'lodash';
import { SIGNIFICANT_ITEM_TYPE, type SignificantItem } from '@kbn/ml-agg-utils';
import { i18n } from '@kbn/i18n';
@ -22,6 +23,12 @@ import type {
import { isRequestAbortedError } from '@kbn/aiops-common/is_request_aborted_error';
import { fetchTopCategories } from '@kbn/aiops-log-rate-analysis/queries/fetch_top_categories';
import { fetchTopTerms } from '@kbn/aiops-log-rate-analysis/queries/fetch_top_terms';
import type { QueueFieldCandidate } from '@kbn/aiops-log-rate-analysis/queue_field_candidates';
import {
isKeywordFieldCandidates,
isTextFieldCandidates,
QUEUE_CHUNKING_SIZE,
} from '@kbn/aiops-log-rate-analysis/queue_field_candidates';
import {
LOADED_FIELD_CANDIDATES,
@ -48,8 +55,6 @@ export const topItemsHandlerFactory =
keywordFieldCandidates: string[];
textFieldCandidates: string[];
}) => {
let keywordFieldCandidatesCount = keywordFieldCandidates.length;
// This will store the combined count of detected log patterns and keywords
let fieldValuePairsCount = 0;
@ -70,25 +75,6 @@ export const topItemsHandlerFactory =
) ?? [])
);
// Get categories of text fields
if (textFieldCandidates.length > 0) {
topCategories.push(
...(await fetchTopCategories(
esClient,
requestBody,
textFieldCandidates,
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
))
);
if (topCategories.length > 0) {
responseStream.push(addSignificantItems(topCategories));
}
}
const topTerms: SignificantItem[] = [];
topTerms.push(
@ -107,7 +93,6 @@ export const topItemsHandlerFactory =
if (Array.isArray(overridesRemainingFieldCandidates)) {
keywordFieldCandidates.push(...overridesRemainingFieldCandidates);
remainingKeywordFieldCandidates = overridesRemainingFieldCandidates;
keywordFieldCandidatesCount = keywordFieldCandidates.length;
loadingStepSizeTopTerms =
LOADED_FIELD_CANDIDATES +
PROGRESS_STEP_P_VALUES -
@ -123,7 +108,6 @@ export const topItemsHandlerFactory =
if (Array.isArray(overridesRemainingKeywordFieldCandidates)) {
keywordFieldCandidates.push(...overridesRemainingKeywordFieldCandidates);
remainingKeywordFieldCandidates = overridesRemainingKeywordFieldCandidates;
keywordFieldCandidatesCount = keywordFieldCandidates.length;
loadingStepSizeTopTerms =
LOADED_FIELD_CANDIDATES +
PROGRESS_STEP_P_VALUES -
@ -135,58 +119,91 @@ export const topItemsHandlerFactory =
logDebugMessage('Fetch top items.');
const topTermsQueue = queue(async function (fieldCandidate: string) {
stateHandler.loaded((1 / keywordFieldCandidatesCount) * loadingStepSizeTopTerms, false);
const topTermsQueueChunks = [
...chunk(keywordFieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({
keywordFieldCandidates: d,
})),
...chunk(textFieldCandidates, QUEUE_CHUNKING_SIZE).map((d) => ({ textFieldCandidates: d })),
];
const loadingStepSize = (1 / topTermsQueueChunks.length) * loadingStepSizeTopTerms;
let fetchedTopTerms: Awaited<ReturnType<typeof fetchTopTerms>>;
const topTermsQueue = queue(async function (payload: QueueFieldCandidate) {
if (isKeywordFieldCandidates(payload)) {
const { keywordFieldCandidates: fieldNames } = payload;
let fetchedTopTerms: Awaited<ReturnType<typeof fetchTopTerms>>;
try {
fetchedTopTerms = await fetchTopTerms(
try {
fetchedTopTerms = await fetchTopTerms(
esClient,
requestBody,
fieldNames,
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
);
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
`Failed to fetch top items for ${fieldNames.join()}, got: \n${e.toString()}`
);
responseStream.pushError(`Failed to fetch top items for ${fieldNames.join()}.`);
}
return;
}
remainingKeywordFieldCandidates = remainingKeywordFieldCandidates.filter(
(d) => !fieldNames.includes(d)
);
if (fetchedTopTerms.length > 0) {
topTerms.push(...fetchedTopTerms);
responseStream.push(addSignificantItems(fetchedTopTerms));
}
stateHandler.loaded(loadingStepSize, false);
responseStream.push(
updateLoadingState({
ccsWarning: false,
loaded: stateHandler.loaded(),
loadingState: i18n.translate(
'xpack.aiops.logRateAnalysis.loadingState.identifiedFieldValuePairs',
{
defaultMessage:
'Identified {fieldValuePairsCount, plural, one {# significant field/value pair} other {# significant field/value pairs}}.',
values: {
fieldValuePairsCount,
},
}
),
remainingKeywordFieldCandidates,
})
);
} else if (isTextFieldCandidates(payload)) {
const { textFieldCandidates: fieldNames } = payload;
const topCategoriesForField = await await fetchTopCategories(
esClient,
requestBody,
[fieldCandidate],
fieldNames,
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
);
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch p-values for '${fieldCandidate}', got: \n${e.toString()}`);
responseStream.pushError(`Failed to fetch p-values for '${fieldCandidate}'.`);
if (topCategoriesForField.length > 0) {
topCategories.push(...topCategoriesForField);
responseStream.push(addSignificantItems(topCategoriesForField));
fieldValuePairsCount += topCategoriesForField.length;
}
return;
stateHandler.loaded(loadingStepSize, false);
}
remainingKeywordFieldCandidates = remainingKeywordFieldCandidates.filter(
(d) => d !== fieldCandidate
);
if (fetchedTopTerms.length > 0) {
topTerms.push(...fetchedTopTerms);
responseStream.push(addSignificantItems(fetchedTopTerms));
}
responseStream.push(
updateLoadingState({
ccsWarning: false,
loaded: stateHandler.loaded(),
loadingState: i18n.translate(
'xpack.aiops.logRateAnalysis.loadingState.identifiedFieldValuePairs',
{
defaultMessage:
'Identified {fieldValuePairsCount, plural, one {# significant field/value pair} other {# significant field/value pairs}}.',
values: {
fieldValuePairsCount,
},
}
),
remainingKeywordFieldCandidates,
})
);
}, MAX_CONCURRENT_QUERIES);
topTermsQueue.push(keywordFieldCandidates, (err) => {
topTermsQueue.push(topTermsQueueChunks, (err) => {
if (err) {
logger.error(`Failed to fetch p-values.', got: \n${err.toString()}`);
responseStream.pushError(`Failed to fetch p-values.`);

View file

@ -43,9 +43,9 @@ export const stateHandlerFactory = (overrides: Partial<StreamState>) => {
function loaded(d?: number, replace = true) {
if (typeof d === 'number') {
if (replace) {
state.loaded = d;
state.loaded = Math.round(d * 100) / 100;
} else {
state.loaded += d;
state.loaded += Math.round(d * 100) / 100;
}
} else {
return state.loaded;

View file

@ -12,6 +12,7 @@ import type {
RequestHandler,
KibanaResponseFactory,
} from '@kbn/core/server';
import { withSpan } from '@kbn/apm-utils';
import type { Logger } from '@kbn/logging';
import { createExecutionContext } from '@kbn/ml-route-utils';
import type { UsageCounter } from '@kbn/usage-collection-plugin/server';
@ -27,7 +28,6 @@ import { trackAIOpsRouteUsage } from '../../lib/track_route_usage';
import type { AiopsLicense } from '../../types';
import { responseStreamFactory } from './response_stream_factory';
import { PROGRESS_STEP_HISTOGRAMS_GROUPS } from './response_stream_utils/constants';
/**
* The log rate analysis route handler sets up `responseStreamFactory`
@ -82,7 +82,10 @@ export function routeHandlerFactory<T extends ApiVersion>(
responseStream.pushPingWithTimeout();
// Step 1: Index Info: Field candidates and zero docs fallback flag
const indexInfo = await analysis.indexInfoHandler();
const indexInfo = await withSpan(
{ name: 'fetch_index_info', type: 'aiops-log-rate-analysis' },
() => analysis.indexInfoHandler()
);
if (!indexInfo) {
return;
@ -90,8 +93,13 @@ export function routeHandlerFactory<T extends ApiVersion>(
// Step 2: Significant categories and terms
const significantItemsObj = indexInfo.zeroDocsFallback
? await analysis.topItemsHandler(indexInfo)
: await analysis.significantItemsHandler(indexInfo);
? await withSpan({ name: 'fetch_top_items', type: 'aiops-log-rate-analysis' }, () =>
analysis.topItemsHandler(indexInfo)
)
: await withSpan(
{ name: 'fetch_significant_items', type: 'aiops-log-rate-analysis' },
() => analysis.significantItemsHandler(indexInfo)
);
if (!significantItemsObj) {
return;
@ -101,27 +109,32 @@ export function routeHandlerFactory<T extends ApiVersion>(
significantItemsObj;
// Step 3: Fetch overall histogram
const overallTimeSeries = await analysis.overallHistogramHandler();
const overallTimeSeries = await withSpan(
{ name: 'fetch_overall_timeseries', type: 'aiops-log-rate-analysis' },
() => analysis.overallHistogramHandler()
);
// Step 4: Smart gropuing
// Step 4: Histograms
await withSpan(
{ name: 'significant-item-histograms', type: 'aiops-log-rate-analysis' },
() =>
analysis.histogramHandler(
fieldValuePairsCount,
significantCategories,
significantTerms,
overallTimeSeries
)
);
// Step 5: Smart grouping
if (stateHandler.groupingEnabled()) {
await analysis.groupingHandler(
significantCategories,
significantTerms,
overallTimeSeries
await withSpan(
{ name: 'grouping-with-histograms', type: 'aiops-log-rate-analysis' },
() =>
analysis.groupingHandler(significantCategories, significantTerms, overallTimeSeries)
);
}
stateHandler.loaded(PROGRESS_STEP_HISTOGRAMS_GROUPS, false);
// Step 5: Histograms
await analysis.histogramHandler(
fieldValuePairsCount,
significantCategories,
significantTerms,
overallTimeSeries
);
responseStream.endWithUpdatedLoadingState();
} catch (e) {
if (!isRequestAbortedError(e)) {

View file

@ -78,6 +78,7 @@
"@kbn/usage-collection-plugin",
"@kbn/utility-types",
"@kbn/observability-ai-assistant-plugin",
"@kbn/apm-utils",
],
"exclude": [
"target/**/*",

View file

@ -76,7 +76,10 @@ export default ({ getService }: FtrProviderContext) => {
const histogramActions = getHistogramActions(data);
const histograms = histogramActions.flatMap((d) => d.payload);
// for each significant term we should get a histogram
expect(histogramActions.length).to.be(significantItems.length);
expect(histogramActions.length).to.eql(
testData.expected.histogramActionsLength,
`Expected histogram actions length to be ${testData.expected.histogramActionsLength}, got ${histogramActions.length}`
);
// each histogram should have a length of 20 items.
histograms.forEach((h, index) => {
expect(h.histogram.length).to.eql(

View file

@ -117,6 +117,7 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
},
],
groups: [],
histogramActionsLength: 1,
histogramLength: 20,
fieldCandidates: {
isECS: false,
@ -147,6 +148,7 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
noIndexActionsLength: 3,
significantItems: artificialLogSignificantTerms,
groups: artificialLogsSignificantItemGroups,
histogramActionsLength: 1,
histogramLength: 20,
fieldCandidates: expectedArtificialLogsFieldCandidates,
},
@ -171,6 +173,7 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
noIndexActionsLength: 3,
significantItems: topTerms,
groups: topTermsGroups,
histogramActionsLength: 1,
histogramLength: 20,
fieldCandidates: expectedArtificialLogsFieldCandidates,
},
@ -195,6 +198,7 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
noIndexActionsLength: 3,
significantItems: topTerms,
groups: topTermsGroups,
histogramActionsLength: 1,
histogramLength: 20,
fieldCandidates: expectedArtificialLogsFieldCandidates,
},
@ -219,6 +223,7 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
noIndexActionsLength: 3,
significantItems: [...artificialLogSignificantTerms, ...artificialLogSignificantLogPatterns],
groups: artificialLogsSignificantItemGroupsTextfield,
histogramActionsLength: 2,
histogramLength: 20,
fieldCandidates: expectedArtificialLogsFieldCandidatesWithTextfield,
},
@ -243,6 +248,7 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
noIndexActionsLength: 3,
significantItems: artificialLogSignificantTerms,
groups: artificialLogsSignificantItemGroups,
histogramActionsLength: 1,
histogramLength: 20,
fieldCandidates: expectedArtificialLogsFieldCandidates,
},
@ -267,6 +273,7 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
noIndexActionsLength: 3,
significantItems: [...artificialLogSignificantTerms, ...artificialLogSignificantLogPatterns],
groups: artificialLogsSignificantItemGroupsTextfield,
histogramActionsLength: 2,
histogramLength: 20,
fieldCandidates: expectedArtificialLogsFieldCandidatesWithTextfield,
},
@ -291,7 +298,8 @@ export const getLogRateAnalysisTestData = <T extends ApiVersion>(): Array<TestDa
noIndexActionsLength: 3,
groups: frequentItemSetsLargeArraysGroups,
significantItems: frequentItemSetsLargeArraysSignificantItems,
histogramLength: 1,
histogramActionsLength: 2,
histogramLength: 20,
fieldCandidates: {
isECS: false,
keywordFieldCandidates: ['items'],

View file

@ -24,6 +24,7 @@ export interface TestData<T extends ApiVersion> {
noIndexActionsLength: number;
significantItems: SignificantItem[];
groups: SignificantItemGroup[];
histogramActionsLength: number;
histogramLength: number;
fieldCandidates: FetchFieldCandidatesResponse;
};