[ML] AIOps: Refactors function argument structure for Log Rate Analysis. (#187669)

## Summary

Refactors the function argument structure of code used on Kibana server
for Log Rate Analysis from individual arguments to single objects that
contain all options. The options structure looks like this:

```
{
  // "meta" args like dependencies, general callbacks etc. on the outer most level
  esClient,
  abortSignal,
  ...
  // within "arguments" we pass in actual options that necessary for the logic of the function
  arguments: {
    start,
    end,
    query,
    fields,
    ...
  }
}
```

The main benefit is that code where these functions are used become
easier to read. Instead of the strict order of args that sometimes
included `undefined` or just a value where it's hard to guess for which
argument it's used for, this enforces to have the names of options show
up in the consuming code. Here's an example:

Before:

```
await fetchHistogramsForFields(
                client,
                requestBody.index,
                histogramQuery,
                [
                  {
                    fieldName: requestBody.timeFieldName,
                    type: KBN_FIELD_TYPES.DATE,
                    interval: overallTimeSeries.interval,
                    min: overallTimeSeries.stats[0],
                    max: overallTimeSeries.stats[1],
                  },
                ],
                -1,
                undefined,
                abortSignal,
                stateHandler.sampleProbability(),
                RANDOM_SAMPLER_SEED
              )
```

After:

```
                (await fetchHistogramsForFields({
                  esClient,
                  abortSignal,
                  arguments: {
                    indexPattern: requestBody.index,
                    query: histogramQuery,
                    fields: [
                      {
                        fieldName: requestBody.timeFieldName,
                        type: KBN_FIELD_TYPES.DATE,
                        interval: overallTimeSeries.interval,
                        min: overallTimeSeries.stats[0],
                        max: overallTimeSeries.stats[1],
                      },
                    ],
                    samplerShardSize: -1,
                    randomSamplerProbability: stateHandler.sampleProbability(),
                    randomSamplerSeed: RANDOM_SAMPLER_SEED,
                  },
                })) as [NumericChartData]
```


### Checklist

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
Walter Rafelsberger 2024-07-08 20:06:40 +02:00 committed by GitHub
parent f99f83428c
commit 94cab93977
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 375 additions and 269 deletions

View file

@ -6,7 +6,7 @@
*/
export { buildSamplerAggregation } from './src/build_sampler_aggregation';
export { fetchAggIntervals } from './src/fetch_agg_intervals';
export { fetchAggIntervals, type FetchAggIntervalsParams } from './src/fetch_agg_intervals';
export { fetchHistogramsForFields } from './src/fetch_histograms_for_fields';
export { DEFAULT_SAMPLER_SHARD_SIZE } from './src/field_histograms';
export { getSamplerAggregationsResponsePath } from './src/get_sampler_aggregations_response_path';

View file

@ -22,30 +22,51 @@ import type { HistogramField, NumericColumnStatsMap } from './types';
const MAX_CHART_COLUMNS = 20;
/**
* Returns aggregation intervals for the supplied document fields.
* Interface for the parameters required to fetch aggregation intervals.
*/
export interface FetchAggIntervalsParams {
/** The Elasticsearch client to use for the query. */
esClient: ElasticsearchClient;
/** An optional abort signal to cancel the request. */
abortSignal?: AbortSignal;
/** The arguments for the aggregation query. */
arguments: {
/** The index pattern to query against. */
indexPattern: string;
/** The query to filter documents. */
query: estypes.QueryDslQueryContainer;
/** The fields to aggregate on. */
fields: HistogramField[];
/** The size of the sampler shard. */
samplerShardSize: number;
/** Optional runtime mappings for the query. */
runtimeMappings?: estypes.MappingRuntimeFields;
/** Optional probability for random sampling. */
randomSamplerProbability?: number;
/** Optional seed for random sampling. */
randomSamplerSeed?: number;
};
}
/**
* Asynchronously fetches aggregation intervals from an Elasticsearch client.
*
* @param client - The Elasticsearch client.
* @param indexPattern - The index pattern to search.
* @param query - The query to filter documents.
* @param fields - An array of field definitions for which aggregation intervals are requested.
* @param samplerShardSize - The shard size parameter for sampling aggregations. A value less than 1 indicates no sampling.
* @param runtimeMappings - Optional runtime mappings to apply.
* @param abortSignal - Optional AbortSignal for canceling the request.
* @param randomSamplerProbability - Optional probability value for random sampling.
* @param randomSamplerSeed - Optional seed value for random sampling.
* @returns A promise that resolves to a map of aggregation intervals for the specified fields.
* @param params - The parameters for fetching aggregation intervals.
* @returns A promise that resolves to a map of numeric column statistics.
*/
export const fetchAggIntervals = async (
client: ElasticsearchClient,
indexPattern: string,
query: estypes.QueryDslQueryContainer,
fields: HistogramField[],
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal,
randomSamplerProbability?: number,
randomSamplerSeed?: number
params: FetchAggIntervalsParams
): Promise<NumericColumnStatsMap> => {
const { esClient, abortSignal, arguments: args } = params;
const {
indexPattern,
query,
fields,
samplerShardSize,
runtimeMappings,
randomSamplerProbability,
randomSamplerSeed,
} = args;
if (
samplerShardSize >= 1 &&
randomSamplerProbability !== undefined &&
@ -77,7 +98,7 @@ export const fetchAggIntervals = async (
seed: randomSamplerSeed,
});
const body = await client.search(
const body = await esClient.search(
{
index: indexPattern,
size: 0,

View file

@ -167,32 +167,48 @@ export type FieldsForHistograms = Array<
| UnsupportedHistogramField
>;
interface FetchHistogramsForFieldsParams {
/** The Elasticsearch client to use for the query. */
esClient: ElasticsearchClient;
/** An optional abort signal to cancel the request. */
abortSignal?: AbortSignal;
/** The arguments for the aggregation query. */
arguments: {
/** The index pattern to query against. */
indexPattern: string;
/** The query to filter documents. */
query: any;
/** The fields for which histograms are to be fetched. */
fields: FieldsForHistograms;
/** The size of the sampler shard. */
samplerShardSize: number;
/** Optional runtime mappings for the query. */
runtimeMappings?: estypes.MappingRuntimeFields;
/** Optional probability for random sampling. */
randomSamplerProbability?: number;
/** Optional seed for random sampling. */
randomSamplerSeed?: number;
};
}
/**
* Fetches data to be used in mini histogram charts. Supports auto-identifying
* the histogram interval and min/max values.
* Asynchronously fetches histograms for specified fields from an Elasticsearch client.
*
* @param client Elasticsearch Client
* @param indexPattern index pattern to be queried
* @param query Elasticsearch query
* @param fields the fields the histograms should be generated for
* @param samplerShardSize shard_size parameter of the sampler aggregation
* @param runtimeMappings optional runtime mappings
* @param abortSignal optional abort signal
* @param randomSamplerProbability optional random sampler probability
* @param randomSamplerSeed optional random sampler seed
* @returns an array of histogram data for each supplied field
* @param params The parameters for fetching histograms.
* @returns A promise that resolves with the fetched histograms.
*/
export const fetchHistogramsForFields = async (
client: ElasticsearchClient,
indexPattern: string,
query: any,
fields: FieldsForHistograms,
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal,
randomSamplerProbability?: number,
randomSamplerSeed?: number
) => {
export const fetchHistogramsForFields = async (params: FetchHistogramsForFieldsParams) => {
const { esClient, abortSignal, arguments: args } = params;
const {
indexPattern,
query,
fields,
samplerShardSize,
runtimeMappings,
randomSamplerProbability,
randomSamplerSeed,
} = args;
if (
samplerShardSize >= 1 &&
randomSamplerProbability !== undefined &&
@ -202,17 +218,19 @@ export const fetchHistogramsForFields = async (
}
const aggIntervals = {
...(await fetchAggIntervals(
client,
indexPattern,
query,
fields.filter((f) => !isNumericHistogramFieldWithColumnStats(f)),
samplerShardSize,
runtimeMappings,
...(await fetchAggIntervals({
esClient,
abortSignal,
randomSamplerProbability,
randomSamplerSeed
)),
arguments: {
indexPattern,
query,
fields: fields.filter((f) => !isNumericHistogramFieldWithColumnStats(f)),
samplerShardSize,
runtimeMappings,
randomSamplerProbability,
randomSamplerSeed,
},
})),
...fields.filter(isNumericHistogramFieldWithColumnStats).reduce((p, field) => {
const { interval, min, max, fieldName } = field;
p[stringHash(fieldName)] = { interval, min, max };
@ -259,7 +277,7 @@ export const fetchHistogramsForFields = async (
seed: randomSamplerSeed,
});
const body = await client.search(
const body = await esClient.search(
{
index: indexPattern,
size: 0,

View file

@ -97,10 +97,10 @@ export const fetchCategories = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
fieldNames: string[],
logger: Logger,
logger?: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchCategoriesResponse[]> => {
const randomSamplerWrapper = createRandomSamplerWrapper({
@ -122,14 +122,19 @@ export const fetchCategories = async (
function reportError(fieldName: string, error: unknown) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}
if (emitError) {
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
}
}
}

View file

@ -75,8 +75,8 @@ export const fetchCategoryCounts = async (
categories: FetchCategoriesResponse,
from: number | undefined,
to: number | undefined,
logger: Logger,
emitError: (m: string) => void,
logger?: Logger,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchCategoriesResponse> => {
const updatedCategories = cloneDeep(categories);
@ -101,14 +101,19 @@ export const fetchCategoryCounts = async (
);
} catch (error) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch category counts for field name "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch category counts for field name "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch category counts for field name "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}
if (emitError) {
emitError(`Failed to fetch category counts for field name "${fieldName}".`);
}
}
return updatedCategories;
}
@ -118,14 +123,19 @@ export const fetchCategoryCounts = async (
updatedCategories.categories[index].count =
(resp.hits.total as estypes.SearchTotalHits).value ?? 0;
} else {
logger.error(
`Failed to fetch category count for category "${
updatedCategories.categories[index].key
}", got: \n${JSON.stringify(resp, null, 2)}`
);
emitError(
`Failed to fetch category count for category "${updatedCategories.categories[index].key}".`
);
if (logger) {
logger.error(
`Failed to fetch category count for category "${
updatedCategories.categories[index].key
}", got: \n${JSON.stringify(resp, null, 2)}`
);
}
if (emitError) {
emitError(
`Failed to fetch category count for category "${updatedCategories.categories[index].key}".`
);
}
}
}

View file

@ -80,20 +80,38 @@ export function getFrequentItemSetsAggFields(significantItems: SignificantItem[]
);
}
export async function fetchFrequentItemSets(
client: ElasticsearchClient,
index: string,
searchQuery: estypes.QueryDslQueryContainer,
significantItems: SignificantItem[],
timeFieldName: string,
deviationMin: number,
deviationMax: number,
logger: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchFrequentItemSetsResponse> {
export async function fetchFrequentItemSets({
esClient,
abortSignal,
emitError,
logger,
arguments: args,
}: {
esClient: ElasticsearchClient;
emitError: (m: string) => void;
abortSignal?: AbortSignal;
logger: Logger;
arguments: {
index: string;
searchQuery: estypes.QueryDslQueryContainer;
significantItems: SignificantItem[];
timeFieldName: string;
deviationMin: number;
deviationMax: number;
sampleProbability?: number;
};
}): Promise<FetchFrequentItemSetsResponse> {
const {
index,
searchQuery,
significantItems,
timeFieldName,
deviationMin,
deviationMax,
// The default value of 1 means no sampling will be used
sampleProbability = 1,
} = args;
// Sort significant terms by ascending p-value, necessary to apply the field limit correctly.
const sortedSignificantItems = significantItems.slice().sort((a, b) => {
return (a.pValue ?? 0) - (b.pValue ?? 0);
@ -140,7 +158,7 @@ export async function fetchFrequentItemSets(
track_total_hits: true,
};
const body = await client.search<
const body = await esClient.search<
unknown,
{ sample: FrequentItemSetsAggregation } | FrequentItemSetsAggregation
>(

View file

@ -46,7 +46,7 @@ describe('fetch_index_info', () => {
} as unknown as ElasticsearchClient;
const { baselineTotalDocCount, deviationTotalDocCount, fieldCandidates } =
await fetchIndexInfo(esClientMock, paramsSearchQueryMock);
await fetchIndexInfo({ esClient: esClientMock, arguments: paramsSearchQueryMock });
expect(fieldCandidates).toEqual(['myIpFieldName', 'myKeywordFieldName']);
expect(baselineTotalDocCount).toEqual(5000000);
@ -76,7 +76,7 @@ describe('fetch_index_info', () => {
deviationTotalDocCount,
fieldCandidates,
textFieldCandidates,
} = await fetchIndexInfo(esClientMock, paramsSearchQueryMock);
} = await fetchIndexInfo({ esClient: esClientMock, arguments: paramsSearchQueryMock });
expect(fieldCandidates).toEqual([
'_metadata.elastic_apm_trace_id',
@ -258,7 +258,7 @@ describe('fetch_index_info', () => {
deviationTotalDocCount,
fieldCandidates,
textFieldCandidates,
} = await fetchIndexInfo(esClientMock, paramsSearchQueryMock);
} = await fetchIndexInfo({ esClient: esClientMock, arguments: paramsSearchQueryMock });
expect(fieldCandidates).toEqual([
'category.keyword',
@ -315,7 +315,7 @@ describe('fetch_index_info', () => {
deviationTotalDocCount,
fieldCandidates,
textFieldCandidates,
} = await fetchIndexInfo(esClientMock, paramsSearchQueryMock);
} = await fetchIndexInfo({ esClient: esClientMock, arguments: paramsSearchQueryMock });
expect(fieldCandidates).toEqual(['items']);
expect(textFieldCandidates).toEqual([]);

View file

@ -39,12 +39,18 @@ interface IndexInfo {
zeroDocsFallback: boolean;
}
export const fetchIndexInfo = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
textFieldCandidatesOverrides: string[] = [],
abortSignal?: AbortSignal
): Promise<IndexInfo> => {
export const fetchIndexInfo = async ({
esClient,
abortSignal,
arguments: args,
}: {
esClient: ElasticsearchClient;
abortSignal?: AbortSignal;
arguments: AiopsLogRateAnalysisSchema & {
textFieldCandidatesOverrides?: string[];
};
}): Promise<IndexInfo> => {
const { textFieldCandidatesOverrides = [], ...params } = args;
const { index } = params;
// Get all supported fields
const respMapping = await esClient.fieldCaps(

View file

@ -32,16 +32,22 @@ const getCategoriesTestData = (categories: Category[]): Histogram[] => {
const getCategoriesTotalCount = (categories: Category[]): number =>
categories.reduce((p, c) => p + c.count, 0);
export const fetchSignificantCategories = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
fieldNames: string[],
logger: Logger,
export const fetchSignificantCategories = async ({
esClient,
abortSignal,
emitError,
logger,
arguments: args,
}: {
esClient: ElasticsearchClient;
abortSignal?: AbortSignal;
emitError?: (m: string) => void;
logger?: Logger;
arguments: AiopsLogRateAnalysisSchema & { fieldNames: string[]; sampleProbability?: number };
}) => {
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
) => {
const { fieldNames, sampleProbability = 1, ...params } = args;
const categoriesOverall = await fetchCategories(
esClient,
params,

View file

@ -111,16 +111,25 @@ interface Aggs extends estypes.AggregationsSignificantLongTermsAggregate {
buckets: estypes.AggregationsSignificantLongTermsBucket[];
}
export const fetchSignificantTermPValues = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
fieldNames: string[],
logger: Logger,
export const fetchSignificantTermPValues = async ({
esClient,
abortSignal,
logger,
emitError,
arguments: args,
}: {
esClient: ElasticsearchClient;
abortSignal?: AbortSignal;
logger?: Logger;
emitError?: (m: string) => void;
arguments: AiopsLogRateAnalysisSchema & {
fieldNames: string[];
sampleProbability?: number;
};
}): Promise<SignificantItem[]> => {
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
): Promise<SignificantItem[]> => {
const { fieldNames, sampleProbability = 1, ...params } = args;
const randomSamplerWrapper = createRandomSamplerWrapper({
probability: sampleProbability,
seed: RANDOM_SAMPLER_SEED,
@ -139,14 +148,19 @@ export const fetchSignificantTermPValues = async (
function reportError(fieldName: string, error: unknown) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}
if (emitError) {
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
}
}
}

View file

@ -40,7 +40,7 @@ import type { ResponseStreamFetchOptions } from '../response_stream_factory';
export const groupingHandlerFactory =
<T extends ApiVersion>({
abortSignal,
client,
esClient,
requestBody,
responseStream,
logDebugMessage,
@ -81,24 +81,26 @@ export const groupingHandlerFactory =
);
try {
const { fields, itemSets } = await fetchFrequentItemSets(
client,
requestBody.index,
JSON.parse(requestBody.searchQuery) as estypes.QueryDslQueryContainer,
significantTerms,
requestBody.timeFieldName,
requestBody.deviationMin,
requestBody.deviationMax,
const { fields, itemSets } = await fetchFrequentItemSets({
esClient,
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
);
emitError: responseStream.pushError,
abortSignal,
arguments: {
index: requestBody.index,
searchQuery: JSON.parse(requestBody.searchQuery) as estypes.QueryDslQueryContainer,
significantItems: significantTerms,
timeFieldName: requestBody.timeFieldName,
deviationMin: requestBody.deviationMin,
deviationMax: requestBody.deviationMax,
sampleProbability: stateHandler.sampleProbability(),
},
});
if (significantCategories.length > 0 && significantTerms.length > 0) {
const { fields: significantCategoriesFields, itemSets: significantCategoriesItemSets } =
await fetchTerms2CategoriesCounts(
client,
esClient,
requestBody,
JSON.parse(requestBody.searchQuery) as estypes.QueryDslQueryContainer,
significantTerms,
@ -161,27 +163,26 @@ export const groupingHandlerFactory =
let cpgTimeSeries: NumericChartData;
try {
cpgTimeSeries = (
(await fetchHistogramsForFields(
client,
requestBody.index,
histogramQuery,
// fields
[
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
(await fetchHistogramsForFields({
esClient,
abortSignal,
stateHandler.sampleProbability(),
RANDOM_SAMPLER_SEED
)) as [NumericChartData]
arguments: {
indexPattern: requestBody.index,
query: histogramQuery,
fields: [
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
},
})) as [NumericChartData]
)[0];
} catch (e) {
if (!isRequestAbortedError(e)) {

View file

@ -35,7 +35,7 @@ import type { ResponseStreamFetchOptions } from '../response_stream_factory';
export const histogramHandlerFactory =
<T extends ApiVersion>({
abortSignal,
client,
esClient,
logDebugMessage,
logger,
requestBody,
@ -90,27 +90,26 @@ export const histogramHandlerFactory =
try {
cpTimeSeries = (
(await fetchHistogramsForFields(
client,
requestBody.index,
histogramQuery,
// fields
[
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
(await fetchHistogramsForFields({
esClient,
abortSignal,
stateHandler.sampleProbability(),
RANDOM_SAMPLER_SEED
)) as [NumericChartData]
arguments: {
indexPattern: requestBody.index,
query: histogramQuery,
fields: [
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
},
})) as [NumericChartData]
)[0];
} catch (e) {
logger.error(
@ -183,27 +182,26 @@ export const histogramHandlerFactory =
try {
catTimeSeries = (
(await fetchHistogramsForFields(
client,
requestBody.index,
histogramQuery,
// fields
[
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
// samplerShardSize
-1,
undefined,
(await fetchHistogramsForFields({
esClient,
abortSignal,
stateHandler.sampleProbability(),
RANDOM_SAMPLER_SEED
)) as [NumericChartData]
arguments: {
indexPattern: requestBody.index,
query: histogramQuery,
fields: [
{
fieldName: requestBody.timeFieldName,
type: KBN_FIELD_TYPES.DATE,
interval: overallTimeSeries.interval,
min: overallTimeSeries.stats[0],
max: overallTimeSeries.stats[1],
},
],
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
},
})) as [NumericChartData]
)[0];
} catch (e) {
logger.error(

View file

@ -24,7 +24,7 @@ export const indexInfoHandlerFactory =
async () => {
const {
abortSignal,
client,
esClient,
logDebugMessage,
logger,
requestBody,
@ -56,12 +56,14 @@ export const indexInfoHandlerFactory =
);
try {
const indexInfo = await fetchIndexInfo(
client,
requestBody,
['message', 'error.message'],
abortSignal
);
const indexInfo = await fetchIndexInfo({
esClient,
abortSignal,
arguments: {
...requestBody,
textFieldCandidatesOverrides: ['message', 'error.message'],
},
});
logDebugMessage(`Baseline document count: ${indexInfo.baselineTotalDocCount}`);
logDebugMessage(`Deviation document count: ${indexInfo.deviationTotalDocCount}`);

View file

@ -22,7 +22,7 @@ import type { ResponseStreamFetchOptions } from '../response_stream_factory';
export const overallHistogramHandlerFactory =
<T extends ApiVersion>({
abortSignal,
client,
esClient,
requestBody,
logDebugMessage,
logger,
@ -42,19 +42,18 @@ export const overallHistogramHandlerFactory =
try {
overallTimeSeries = (
(await fetchHistogramsForFields(
client,
requestBody.index,
overallHistogramQuery,
// fields
histogramFields,
// samplerShardSize
-1,
undefined,
(await fetchHistogramsForFields({
esClient,
abortSignal,
stateHandler.sampleProbability(),
RANDOM_SAMPLER_SEED
)) as [NumericChartData]
arguments: {
indexPattern: requestBody.index,
query: overallHistogramQuery,
fields: histogramFields,
samplerShardSize: -1,
randomSamplerProbability: stateHandler.sampleProbability(),
randomSamplerSeed: RANDOM_SAMPLER_SEED,
},
})) as [NumericChartData]
)[0];
} catch (e) {
if (!isRequestAbortedError(e)) {

View file

@ -46,7 +46,7 @@ type Candidate = FieldCandidate | TextFieldCandidate;
export const significantItemsHandlerFactory =
<T extends ApiVersion>({
abortSignal,
client,
esClient,
logDebugMessage,
logger,
requestBody,
@ -110,15 +110,17 @@ export const significantItemsHandlerFactory =
let pValues: Awaited<ReturnType<typeof fetchSignificantTermPValues>>;
try {
pValues = await fetchSignificantTermPValues(
client,
requestBody,
[fieldCandidate],
pValues = await fetchSignificantTermPValues({
esClient,
abortSignal,
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
);
emitError: responseStream.pushError,
arguments: {
...requestBody,
fieldNames: [fieldCandidate],
sampleProbability: stateHandler.sampleProbability(),
},
});
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(
@ -144,15 +146,17 @@ export const significantItemsHandlerFactory =
} else if (isTextFieldCandidate(payload)) {
const { textFieldCandidate } = payload;
const significantCategoriesForField = await fetchSignificantCategories(
client,
requestBody,
[textFieldCandidate],
const significantCategoriesForField = await fetchSignificantCategories({
esClient,
logger,
stateHandler.sampleProbability(),
responseStream.pushError,
abortSignal
);
emitError: responseStream.pushError,
abortSignal,
arguments: {
...requestBody,
fieldNames: [textFieldCandidate],
sampleProbability: stateHandler.sampleProbability(),
},
});
if (significantCategoriesForField.length > 0) {
significantCategories.push(...significantCategoriesForField);

View file

@ -33,7 +33,7 @@ import type { ResponseStreamFetchOptions } from '../response_stream_factory';
export const topItemsHandlerFactory =
<T extends ApiVersion>({
abortSignal,
client,
esClient,
logDebugMessage,
logger,
requestBody,
@ -64,7 +64,7 @@ export const topItemsHandlerFactory =
if (textFieldCandidates.length > 0) {
topCategories.push(
...(await fetchTopCategories(
client,
esClient,
requestBody,
textFieldCandidates,
logger,
@ -113,7 +113,7 @@ export const topItemsHandlerFactory =
try {
fetchedTopTerms = await fetchTopTerms(
client,
esClient,
requestBody,
[fieldCandidate],
logger,

View file

@ -39,7 +39,7 @@ import { streamPushPingWithTimeoutFactory } from './response_stream_utils/stream
*/
export interface ResponseStreamOptions<T extends ApiVersion> {
version: T;
client: ElasticsearchClient;
esClient: ElasticsearchClient;
requestBody: AiopsLogRateAnalysisSchema<T>;
events: KibanaRequestEvents;
headers: Headers;

View file

@ -58,14 +58,14 @@ export function routeHandlerFactory<T extends ApiVersion>(
return response.forbidden();
}
const client = (await context.core).elasticsearch.client.asCurrentUser;
const esClient = (await context.core).elasticsearch.client.asCurrentUser;
const executionContext = createExecutionContext(coreStart, AIOPS_PLUGIN_ID, request.route.path);
return await coreStart.executionContext.withContext(executionContext, () => {
const { analysis, logDebugMessage, stateHandler, responseStream, responseWithHeaders } =
responseStreamFactory<T>({
version,
client,
esClient,
requestBody: request.body,
events: request.events,
headers: request.headers,

View file

@ -217,14 +217,16 @@ export class DataVisualizer {
samplerShardSize: number,
runtimeMappings?: RuntimeMappings
): Promise<any> {
return await fetchHistogramsForFields(
this._asCurrentUser,
indexPattern,
query,
fields,
samplerShardSize,
runtimeMappings
);
return await fetchHistogramsForFields({
esClient: this._asCurrentUser,
arguments: {
indexPattern,
query,
fields,
samplerShardSize,
runtimeMappings,
},
});
}
// Obtains statistics for supplied list of fields. The statistics for each field in the

View file

@ -23,14 +23,16 @@ export const routeHandler: RequestHandler<
try {
const esClient = (await ctx.core).elasticsearch.client;
const resp = await fetchHistogramsForFields(
esClient.asCurrentUser,
dataViewTitle,
query,
fields,
samplerShardSize,
runtimeMappings
);
const resp = await fetchHistogramsForFields({
esClient: esClient.asCurrentUser,
arguments: {
indexPattern: dataViewTitle,
query,
fields,
samplerShardSize,
runtimeMappings,
},
});
return res.ok({ body: resp });
} catch (e) {