mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[ML] Random sampler utils package (#154520)
- Refactors individual helpers to manage the `random_sampler` aggregation into a single reusable wrapper. The helper's factory can be supplied with either a sample probability right away or a document count to generate a dynamic sample probability based on it. - Applies random sampling to the main date histogram chart.
This commit is contained in:
parent
7daa791c32
commit
239a981a10
38 changed files with 423 additions and 271 deletions
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
|
@ -459,6 +459,7 @@ x-pack/packages/ml/local_storage @elastic/ml-ui
|
|||
x-pack/packages/ml/nested_property @elastic/ml-ui
|
||||
x-pack/plugins/ml @elastic/ml-ui
|
||||
x-pack/packages/ml/query_utils @elastic/ml-ui
|
||||
x-pack/packages/ml/random_sampler_utils @elastic/ml-ui
|
||||
x-pack/packages/ml/route_utils @elastic/ml-ui
|
||||
x-pack/packages/ml/string_hash @elastic/ml-ui
|
||||
x-pack/packages/ml/trained_models_utils @elastic/ml-ui
|
||||
|
|
|
@ -474,6 +474,7 @@
|
|||
"@kbn/ml-nested-property": "link:x-pack/packages/ml/nested_property",
|
||||
"@kbn/ml-plugin": "link:x-pack/plugins/ml",
|
||||
"@kbn/ml-query-utils": "link:x-pack/packages/ml/query_utils",
|
||||
"@kbn/ml-random-sampler-utils": "link:x-pack/packages/ml/random_sampler_utils",
|
||||
"@kbn/ml-route-utils": "link:x-pack/packages/ml/route_utils",
|
||||
"@kbn/ml-string-hash": "link:x-pack/packages/ml/string_hash",
|
||||
"@kbn/ml-trained-models-utils": "link:x-pack/packages/ml/trained_models_utils",
|
||||
|
|
|
@ -912,6 +912,8 @@
|
|||
"@kbn/ml-plugin/*": ["x-pack/plugins/ml/*"],
|
||||
"@kbn/ml-query-utils": ["x-pack/packages/ml/query_utils"],
|
||||
"@kbn/ml-query-utils/*": ["x-pack/packages/ml/query_utils/*"],
|
||||
"@kbn/ml-random-sampler-utils": ["x-pack/packages/ml/random_sampler_utils"],
|
||||
"@kbn/ml-random-sampler-utils/*": ["x-pack/packages/ml/random_sampler_utils/*"],
|
||||
"@kbn/ml-route-utils": ["x-pack/packages/ml/route_utils"],
|
||||
"@kbn/ml-route-utils/*": ["x-pack/packages/ml/route_utils/*"],
|
||||
"@kbn/ml-string-hash": ["x-pack/packages/ml/string_hash"],
|
||||
|
|
|
@ -5,11 +5,9 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
export { RANDOM_SAMPLER_SEED } from './src/constants';
|
||||
export { buildSamplerAggregation } from './src/build_sampler_aggregation';
|
||||
export { fetchAggIntervals } from './src/fetch_agg_intervals';
|
||||
export { fetchHistogramsForFields } from './src/fetch_histograms_for_fields';
|
||||
export { getSampleProbability } from './src/get_sample_probability';
|
||||
export { getSamplerAggregationsResponsePath } from './src/get_sampler_aggregations_response_path';
|
||||
export { numberValidator } from './src/validate_number';
|
||||
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { buildRandomSamplerAggregation } from './build_random_sampler_aggregation';
|
||||
|
||||
describe('buildRandomSamplerAggregation', () => {
|
||||
const testAggs = {
|
||||
bytes_stats: {
|
||||
stats: { field: 'bytes' },
|
||||
},
|
||||
};
|
||||
|
||||
test('returns wrapped random sampler aggregation for probability of 0.01', () => {
|
||||
expect(buildRandomSamplerAggregation(testAggs, 0.01)).toEqual({
|
||||
sample: {
|
||||
random_sampler: {
|
||||
probability: 0.01,
|
||||
seed: 3867412,
|
||||
},
|
||||
aggs: testAggs,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('returns un-sampled aggregation as-is for probability of 1', () => {
|
||||
expect(buildRandomSamplerAggregation(testAggs, 1)).toEqual(testAggs);
|
||||
});
|
||||
});
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
|
||||
import { RANDOM_SAMPLER_SEED } from './constants';
|
||||
|
||||
/**
|
||||
* Wraps the supplied aggregations in a random sampler aggregation.
|
||||
* A supplied sample probability of 1 indicates no sampling, and the aggs are returned as-is.
|
||||
*/
|
||||
export function buildRandomSamplerAggregation(
|
||||
aggs: any,
|
||||
sampleProbability: number
|
||||
): Record<string, estypes.AggregationsAggregationContainer> {
|
||||
if (sampleProbability === 1) {
|
||||
return aggs;
|
||||
}
|
||||
|
||||
return {
|
||||
sample: {
|
||||
// @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer`
|
||||
random_sampler: {
|
||||
probability: sampleProbability,
|
||||
seed: RANDOM_SAMPLER_SEED,
|
||||
},
|
||||
aggs,
|
||||
},
|
||||
};
|
||||
}
|
|
@ -14,16 +14,16 @@ import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
|||
*/
|
||||
export function buildSamplerAggregation(
|
||||
aggs: any,
|
||||
samplerShardSize: number
|
||||
shardSize: number
|
||||
): Record<string, estypes.AggregationsAggregationContainer> {
|
||||
if (samplerShardSize < 1) {
|
||||
if (shardSize <= 0) {
|
||||
return aggs;
|
||||
}
|
||||
|
||||
return {
|
||||
sample: {
|
||||
sampler: {
|
||||
shard_size: samplerShardSize,
|
||||
shard_size: shardSize,
|
||||
},
|
||||
aggs,
|
||||
},
|
||||
|
|
|
@ -13,10 +13,9 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
|||
import { KBN_FIELD_TYPES } from '@kbn/field-types';
|
||||
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
|
||||
import { stringHash } from '@kbn/ml-string-hash';
|
||||
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
|
||||
|
||||
import { buildRandomSamplerAggregation } from './build_random_sampler_aggregation';
|
||||
import { buildSamplerAggregation } from './build_sampler_aggregation';
|
||||
import { getRandomSamplerAggregationsResponsePath } from './get_random_sampler_aggregations_response_path';
|
||||
import { getSamplerAggregationsResponsePath } from './get_sampler_aggregations_response_path';
|
||||
import type { HistogramField, NumericColumnStatsMap } from './types';
|
||||
|
||||
|
@ -33,7 +32,8 @@ export const fetchAggIntervals = async (
|
|||
samplerShardSize: number,
|
||||
runtimeMappings?: estypes.MappingRuntimeFields,
|
||||
abortSignal?: AbortSignal,
|
||||
randomSamplerProbability?: number
|
||||
randomSamplerProbability?: number,
|
||||
randomSamplerSeed?: number
|
||||
): Promise<NumericColumnStatsMap> => {
|
||||
if (
|
||||
samplerShardSize >= 1 &&
|
||||
|
@ -61,6 +61,11 @@ export const fetchAggIntervals = async (
|
|||
return aggs;
|
||||
}, {} as Record<string, object>);
|
||||
|
||||
const { wrap, unwrap } = createRandomSamplerWrapper({
|
||||
probability: randomSamplerProbability ?? 1,
|
||||
seed: randomSamplerSeed,
|
||||
});
|
||||
|
||||
const body = await client.search(
|
||||
{
|
||||
index: indexPattern,
|
||||
|
@ -70,7 +75,7 @@ export const fetchAggIntervals = async (
|
|||
aggs:
|
||||
randomSamplerProbability === undefined
|
||||
? buildSamplerAggregation(minMaxAggs, samplerShardSize)
|
||||
: buildRandomSamplerAggregation(minMaxAggs, randomSamplerProbability),
|
||||
: wrap(minMaxAggs),
|
||||
size: 0,
|
||||
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
|
||||
},
|
||||
|
@ -81,10 +86,19 @@ export const fetchAggIntervals = async (
|
|||
const aggsPath =
|
||||
randomSamplerProbability === undefined
|
||||
? getSamplerAggregationsResponsePath(samplerShardSize)
|
||||
: getRandomSamplerAggregationsResponsePath(randomSamplerProbability);
|
||||
const aggregations = aggsPath.length > 0 ? get(body.aggregations, aggsPath) : body.aggregations;
|
||||
: [];
|
||||
const aggregations =
|
||||
aggsPath.length > 0
|
||||
? get(body.aggregations, aggsPath)
|
||||
: randomSamplerProbability !== undefined && body.aggregations !== undefined
|
||||
? unwrap(body.aggregations)
|
||||
: body.aggregations;
|
||||
|
||||
return Object.keys(aggregations).reduce((p, aggName) => {
|
||||
if (aggregations === undefined) {
|
||||
return p;
|
||||
}
|
||||
|
||||
const stats = [aggregations[aggName].min, aggregations[aggName].max];
|
||||
if (!stats.includes(null)) {
|
||||
const delta = aggregations[aggName].max - aggregations[aggName].min;
|
||||
|
|
|
@ -13,11 +13,10 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
|||
import { KBN_FIELD_TYPES } from '@kbn/field-types';
|
||||
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
|
||||
import { stringHash } from '@kbn/ml-string-hash';
|
||||
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
|
||||
|
||||
import { buildRandomSamplerAggregation } from './build_random_sampler_aggregation';
|
||||
import { buildSamplerAggregation } from './build_sampler_aggregation';
|
||||
import { fetchAggIntervals } from './fetch_agg_intervals';
|
||||
import { getRandomSamplerAggregationsResponsePath } from './get_random_sampler_aggregations_response_path';
|
||||
import { getSamplerAggregationsResponsePath } from './get_sampler_aggregations_response_path';
|
||||
import type {
|
||||
AggCardinality,
|
||||
|
@ -141,6 +140,7 @@ export type FieldsForHistograms = Array<
|
|||
* @param samplerShardSize shard_size parameter of the sampler aggregation
|
||||
* @param runtimeMappings optional runtime mappings
|
||||
* @param randomSamplerProbability optional random sampler probability
|
||||
* @param randomSamplerSeed optional random sampler seed
|
||||
* @returns an array of histogram data for each supplied field
|
||||
*/
|
||||
export const fetchHistogramsForFields = async (
|
||||
|
@ -151,7 +151,8 @@ export const fetchHistogramsForFields = async (
|
|||
samplerShardSize: number,
|
||||
runtimeMappings?: estypes.MappingRuntimeFields,
|
||||
abortSignal?: AbortSignal,
|
||||
randomSamplerProbability?: number
|
||||
randomSamplerProbability?: number,
|
||||
randomSamplerSeed?: number
|
||||
) => {
|
||||
if (
|
||||
samplerShardSize >= 1 &&
|
||||
|
@ -170,7 +171,8 @@ export const fetchHistogramsForFields = async (
|
|||
samplerShardSize,
|
||||
runtimeMappings,
|
||||
abortSignal,
|
||||
randomSamplerProbability
|
||||
randomSamplerProbability,
|
||||
randomSamplerSeed
|
||||
)),
|
||||
...fields.filter(isNumericHistogramFieldWithColumnStats).reduce((p, field) => {
|
||||
const { interval, min, max, fieldName } = field;
|
||||
|
@ -213,6 +215,11 @@ export const fetchHistogramsForFields = async (
|
|||
return [];
|
||||
}
|
||||
|
||||
const { wrap, unwrap } = createRandomSamplerWrapper({
|
||||
probability: randomSamplerProbability ?? 1,
|
||||
seed: randomSamplerSeed,
|
||||
});
|
||||
|
||||
const body = await client.search(
|
||||
{
|
||||
index: indexPattern,
|
||||
|
@ -222,7 +229,7 @@ export const fetchHistogramsForFields = async (
|
|||
aggs:
|
||||
randomSamplerProbability === undefined
|
||||
? buildSamplerAggregation(chartDataAggs, samplerShardSize)
|
||||
: buildRandomSamplerAggregation(chartDataAggs, randomSamplerProbability),
|
||||
: wrap(chartDataAggs),
|
||||
size: 0,
|
||||
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
|
||||
},
|
||||
|
@ -233,8 +240,13 @@ export const fetchHistogramsForFields = async (
|
|||
const aggsPath =
|
||||
randomSamplerProbability === undefined
|
||||
? getSamplerAggregationsResponsePath(samplerShardSize)
|
||||
: getRandomSamplerAggregationsResponsePath(randomSamplerProbability);
|
||||
const aggregations = aggsPath.length > 0 ? get(body.aggregations, aggsPath) : body.aggregations;
|
||||
: [];
|
||||
const aggregations =
|
||||
aggsPath.length > 0
|
||||
? get(body.aggregations, aggsPath)
|
||||
: randomSamplerProbability !== undefined && body.aggregations !== undefined
|
||||
? unwrap(body.aggregations)
|
||||
: body.aggregations;
|
||||
|
||||
return fields.map((field) => {
|
||||
const id = stringHash(field.fieldName);
|
||||
|
|
|
@ -1,18 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { getRandomSamplerAggregationsResponsePath } from './get_random_sampler_aggregations_response_path';
|
||||
|
||||
describe('getRandomSamplerAggregationsResponsePath', () => {
|
||||
test('returns correct path for random sampler probability of 0.01', () => {
|
||||
expect(getRandomSamplerAggregationsResponsePath(0.01)).toEqual(['sample']);
|
||||
});
|
||||
|
||||
test('returns correct path for random sampler probability of 1', () => {
|
||||
expect(getRandomSamplerAggregationsResponsePath(1)).toEqual([]);
|
||||
});
|
||||
});
|
|
@ -1,17 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
// Returns the path of aggregations in the elasticsearch response, as an array,
|
||||
// depending on whether random sampling is being used.
|
||||
// A supplied randomSamplerProbability
|
||||
// (the probability parameter of the random sampler aggregation)
|
||||
// of 1 indicates no random sampling, and an empty array is returned.
|
||||
export function getRandomSamplerAggregationsResponsePath(
|
||||
randomSamplerProbability: number
|
||||
): string[] {
|
||||
return randomSamplerProbability < 1 ? ['sample'] : [];
|
||||
}
|
|
@ -16,7 +16,8 @@
|
|||
"@kbn/field-types",
|
||||
"@kbn/ml-is-populated-object",
|
||||
"@kbn/ml-string-hash",
|
||||
"@kbn/data-views-plugin"
|
||||
"@kbn/data-views-plugin",
|
||||
"@kbn/ml-random-sampler-utils"
|
||||
],
|
||||
"exclude": [
|
||||
"target/**/*",
|
||||
|
|
5
x-pack/packages/ml/random_sampler_utils/README.md
Normal file
5
x-pack/packages/ml/random_sampler_utils/README.md
Normal file
|
@ -0,0 +1,5 @@
|
|||
# @kbn/ml-random-sampler-utils
|
||||
|
||||
This package includes utility functions related to creating elasticsearch aggregations with random sampling.
|
||||
|
||||
https://docs.elastic.dev/kibana-dev-docs/api/kbn-ml-random-sampling-utils
|
|
@ -5,6 +5,8 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
// For the technical preview of Explain Log Rate Spikes we use a hard coded seed.
|
||||
// In future versions we might use a user specific seed or let the user costumise it.
|
||||
export const RANDOM_SAMPLER_SEED = 3867412;
|
||||
export { getSampleProbability } from './src/get_sample_probability';
|
||||
export {
|
||||
createRandomSamplerWrapper,
|
||||
type RandomSamplerWrapper,
|
||||
} from './src/random_sampler_wrapper';
|
12
x-pack/packages/ml/random_sampler_utils/jest.config.js
Normal file
12
x-pack/packages/ml/random_sampler_utils/jest.config.js
Normal file
|
@ -0,0 +1,12 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
module.exports = {
|
||||
preset: '@kbn/test',
|
||||
rootDir: '../../../..',
|
||||
roots: ['<rootDir>/x-pack/packages/ml/random_sampler_utils'],
|
||||
};
|
5
x-pack/packages/ml/random_sampler_utils/kibana.jsonc
Normal file
5
x-pack/packages/ml/random_sampler_utils/kibana.jsonc
Normal file
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"type": "shared-common",
|
||||
"id": "@kbn/ml-random-sampler-utils",
|
||||
"owner": "@elastic/ml-ui"
|
||||
}
|
6
x-pack/packages/ml/random_sampler_utils/package.json
Normal file
6
x-pack/packages/ml/random_sampler_utils/package.json
Normal file
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"name": "@kbn/ml-random-sampler-utils",
|
||||
"private": true,
|
||||
"version": "1.0.0",
|
||||
"license": "Elastic License 2.0"
|
||||
}
|
|
@ -22,10 +22,10 @@ describe('getSampleProbability', () => {
|
|||
expect(getSampleProbability(100000)).toEqual(0.5);
|
||||
});
|
||||
test('returns sample probability based on total docs ratio', () => {
|
||||
expect(getSampleProbability(100001)).toEqual(0.4999950000499995);
|
||||
expect(getSampleProbability(100001)).toEqual(0.5);
|
||||
expect(getSampleProbability(1000000)).toEqual(0.05);
|
||||
expect(getSampleProbability(1000001)).toEqual(0.04999995000005);
|
||||
expect(getSampleProbability(2000000)).toEqual(0.025);
|
||||
expect(getSampleProbability(1000001)).toEqual(0.05);
|
||||
expect(getSampleProbability(2000000)).toEqual(0.03);
|
||||
expect(getSampleProbability(5000000)).toEqual(0.01);
|
||||
expect(getSampleProbability(10000000)).toEqual(0.005);
|
||||
expect(getSampleProbability(100000000)).toEqual(0.0005);
|
|
@ -7,6 +7,16 @@
|
|||
|
||||
const SAMPLE_PROBABILITY_MIN_DOC_COUNT = 50000;
|
||||
|
||||
// Trims the sample probability to the first non-zero digit.
|
||||
function trimSampleProbability(d: number): number {
|
||||
return +d.toFixed(Math.max(-Math.log10(d) + 1, 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a dynamic sample probability to be used with the `random_sampler` aggregation.
|
||||
* @param {number} totalDocCount The total document count to derive the sample probability from.
|
||||
* @returns {number} sample probability
|
||||
*/
|
||||
export function getSampleProbability(totalDocCount: number) {
|
||||
let sampleProbability = 1;
|
||||
|
||||
|
@ -14,5 +24,5 @@ export function getSampleProbability(totalDocCount: number) {
|
|||
sampleProbability = Math.min(0.5, SAMPLE_PROBABILITY_MIN_DOC_COUNT / totalDocCount);
|
||||
}
|
||||
|
||||
return sampleProbability;
|
||||
return trimSampleProbability(sampleProbability);
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { createRandomSamplerWrapper } from './random_sampler_wrapper';
|
||||
|
||||
describe('createRandomSamplerWrapper', () => {
|
||||
const testAggs = {
|
||||
bytes_stats: {
|
||||
stats: { field: 'bytes' },
|
||||
},
|
||||
};
|
||||
|
||||
const wrappedTestAggs = {
|
||||
sample: {
|
||||
random_sampler: {
|
||||
probability: 0.01,
|
||||
},
|
||||
aggs: testAggs,
|
||||
},
|
||||
};
|
||||
|
||||
it('returns the un-sampled aggregation as-is for a probability of 1', () => {
|
||||
expect(createRandomSamplerWrapper({ probability: 1 }).wrap(testAggs)).toEqual(testAggs);
|
||||
});
|
||||
|
||||
it('returns wrapped random sampler aggregation for probability of 0.01', () => {
|
||||
expect(createRandomSamplerWrapper({ probability: 0.01 }).wrap(testAggs)).toEqual(
|
||||
wrappedTestAggs
|
||||
);
|
||||
});
|
||||
|
||||
it('returns probability of 1 and does not wrap when used for 10 docs', () => {
|
||||
const randomSamplerWrapper = createRandomSamplerWrapper({ totalNumDocs: 10 });
|
||||
expect(randomSamplerWrapper.probability).toBe(1);
|
||||
expect(randomSamplerWrapper.wrap(testAggs)).toEqual(testAggs);
|
||||
});
|
||||
|
||||
it('returns probability of 0.01 and does not wrap when used for 5000000 docs', () => {
|
||||
const randomSamplerWrapper = createRandomSamplerWrapper({ totalNumDocs: 5000000 });
|
||||
expect(randomSamplerWrapper.probability).toBe(0.01);
|
||||
expect(randomSamplerWrapper.wrap(testAggs)).toEqual(wrappedTestAggs);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { get } from 'lodash';
|
||||
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
|
||||
import { getSampleProbability } from './get_sample_probability';
|
||||
|
||||
const DEFAULT_AGG_NAME = 'sample';
|
||||
|
||||
interface RandomSamplerOptionsBase {
|
||||
aggName?: string;
|
||||
seed?: number;
|
||||
}
|
||||
|
||||
interface RandomSamplerOptionProbability extends RandomSamplerOptionsBase {
|
||||
probability: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Type guard for RandomSamplerOptionProbability
|
||||
*/
|
||||
function isRandomSamplerOptionProbability(arg: unknown): arg is RandomSamplerOptionProbability {
|
||||
return isPopulatedObject(arg, ['probability']);
|
||||
}
|
||||
|
||||
interface RandomSamplerOptionTotalNumDocs extends RandomSamplerOptionsBase {
|
||||
totalNumDocs: number;
|
||||
}
|
||||
|
||||
type RandomSamplerOptions = RandomSamplerOptionProbability | RandomSamplerOptionTotalNumDocs;
|
||||
|
||||
/**
|
||||
* Check if a given probability is suitable for the `random_sampler` aggregation.
|
||||
* @param {unknown} p The probability to be tested.
|
||||
* @returns {boolean}
|
||||
*/
|
||||
export function isValidProbability(p: unknown): p is number {
|
||||
return typeof p === 'number' && p > 0 && p < 0.5;
|
||||
}
|
||||
|
||||
/**
|
||||
* The return type of the `createRandomSamplerWrapper` factory.
|
||||
*/
|
||||
export type RandomSamplerWrapper = ReturnType<typeof createRandomSamplerWrapper>;
|
||||
|
||||
/**
|
||||
* Factory to create the random sampler wrapper utility.
|
||||
* @param {RandomSamplerOptions} options RandomSamplerOptions
|
||||
* @returns {RandomSamplerWrapper} random sampler wrapper utility
|
||||
*/
|
||||
export const createRandomSamplerWrapper = (options: RandomSamplerOptions) => {
|
||||
const probability = isRandomSamplerOptionProbability(options)
|
||||
? options.probability
|
||||
: getSampleProbability(options.totalNumDocs);
|
||||
|
||||
const aggName = options.aggName ?? DEFAULT_AGG_NAME;
|
||||
|
||||
const wrap = <T extends Record<string, estypes.AggregationsAggregationContainer>>(
|
||||
aggs: T
|
||||
): T | Record<string, estypes.AggregationsAggregationContainer> => {
|
||||
if (!isValidProbability(probability)) {
|
||||
return aggs;
|
||||
}
|
||||
|
||||
return {
|
||||
[aggName]: {
|
||||
// @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer`
|
||||
random_sampler: {
|
||||
probability,
|
||||
...(options.seed ? { seed: options.seed } : {}),
|
||||
},
|
||||
aggs,
|
||||
},
|
||||
} as Record<string, estypes.AggregationsAggregationContainer>;
|
||||
};
|
||||
|
||||
const unwrap = <T extends Exclude<estypes.SearchResponse['aggregations'], undefined>>(
|
||||
responseAggs: T
|
||||
) => (!isValidProbability(probability) ? responseAggs : get(responseAggs, [aggName]));
|
||||
|
||||
return { wrap, unwrap, probability };
|
||||
};
|
21
x-pack/packages/ml/random_sampler_utils/tsconfig.json
Normal file
21
x-pack/packages/ml/random_sampler_utils/tsconfig.json
Normal file
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"extends": "../../../../tsconfig.base.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "target/types",
|
||||
"types": [
|
||||
"jest",
|
||||
"node",
|
||||
"react"
|
||||
]
|
||||
},
|
||||
"include": [
|
||||
"**/*.ts",
|
||||
"**/*.tsx",
|
||||
],
|
||||
"exclude": [
|
||||
"target/**/*"
|
||||
],
|
||||
"kbn_references": [
|
||||
"@kbn/ml-is-populated-object",
|
||||
]
|
||||
}
|
|
@ -33,6 +33,8 @@ export const aiopsExplainLogRateSpikesSchema = schema.object({
|
|||
significantTerms: schema.maybe(schema.arrayOf(schema.any())),
|
||||
})
|
||||
),
|
||||
/** Probability used for the random sampler aggregations */
|
||||
sampleProbability: schema.maybe(schema.number()),
|
||||
});
|
||||
|
||||
export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;
|
||||
|
|
|
@ -6,3 +6,7 @@
|
|||
*/
|
||||
|
||||
export const SPIKE_ANALYSIS_THRESHOLD = 0.02;
|
||||
|
||||
// For the technical preview of Explain Log Rate Spikes we use a hard coded seed.
|
||||
// In future versions we might use a user specific seed or let the user costumise it.
|
||||
export const RANDOM_SAMPLER_SEED = 3867412;
|
||||
|
|
|
@ -31,6 +31,7 @@ export interface DocumentCountContentProps {
|
|||
documentCountStatsSplit?: DocumentCountStats;
|
||||
documentCountStatsSplitLabel?: string;
|
||||
totalCount: number;
|
||||
sampleProbability: number;
|
||||
windowParameters?: WindowParameters;
|
||||
}
|
||||
|
||||
|
@ -41,6 +42,7 @@ export const DocumentCountContent: FC<DocumentCountContentProps> = ({
|
|||
documentCountStatsSplit,
|
||||
documentCountStatsSplitLabel = '',
|
||||
totalCount,
|
||||
sampleProbability,
|
||||
windowParameters,
|
||||
}) => {
|
||||
const [isBrushCleared, setIsBrushCleared] = useState(true);
|
||||
|
@ -62,7 +64,9 @@ export const DocumentCountContent: FC<DocumentCountContentProps> = ({
|
|||
timeRangeEarliest === undefined ||
|
||||
timeRangeLatest === undefined
|
||||
) {
|
||||
return totalCount !== undefined ? <TotalCountHeader totalCount={totalCount} /> : null;
|
||||
return totalCount !== undefined ? (
|
||||
<TotalCountHeader totalCount={totalCount} sampleProbability={sampleProbability} />
|
||||
) : null;
|
||||
}
|
||||
|
||||
const chartPoints: DocumentCountChartPoint[] = Object.entries(documentCountStats.buckets).map(
|
||||
|
@ -98,7 +102,7 @@ export const DocumentCountContent: FC<DocumentCountContentProps> = ({
|
|||
<>
|
||||
<EuiFlexGroup gutterSize="xs">
|
||||
<EuiFlexItem>
|
||||
<TotalCountHeader totalCount={totalCount} />
|
||||
<TotalCountHeader totalCount={totalCount} sampleProbability={sampleProbability} />
|
||||
</EuiFlexItem>
|
||||
{!isBrushCleared && (
|
||||
<EuiFlexItem grow={false}>
|
||||
|
|
|
@ -12,7 +12,13 @@ import { EuiFlexItem, EuiText } from '@elastic/eui';
|
|||
import { FormattedMessage } from '@kbn/i18n-react';
|
||||
import React from 'react';
|
||||
|
||||
export const TotalCountHeader = ({ totalCount }: { totalCount: number }) => (
|
||||
export const TotalCountHeader = ({
|
||||
sampleProbability,
|
||||
totalCount,
|
||||
}: {
|
||||
sampleProbability?: number;
|
||||
totalCount: number;
|
||||
}) => (
|
||||
<EuiFlexItem>
|
||||
<EuiText size="s" data-test-subj="aiopsTotalDocCountHeader">
|
||||
<FormattedMessage
|
||||
|
@ -30,6 +36,26 @@ export const TotalCountHeader = ({ totalCount }: { totalCount: number }) => (
|
|||
),
|
||||
}}
|
||||
/>
|
||||
{sampleProbability !== undefined && sampleProbability < 1 && (
|
||||
<>
|
||||
{' '}
|
||||
<FormattedMessage
|
||||
id="xpack.aiops.searchPanel.sampleProbabilityLabel"
|
||||
defaultMessage="Sampling probability: {strongSamplingProbability}"
|
||||
values={{
|
||||
strongSamplingProbability: (
|
||||
<strong data-test-subj="aiopsSamplingProbability">
|
||||
<FormattedMessage
|
||||
id="xpack.aiops.searchPanel.sampleProbabilityNumber"
|
||||
defaultMessage="{sampleProbability, plural, one {#} other {#}}"
|
||||
values={{ sampleProbability }}
|
||||
/>
|
||||
</strong>
|
||||
),
|
||||
}}
|
||||
/>
|
||||
</>
|
||||
)}
|
||||
</EuiText>
|
||||
</EuiFlexItem>
|
||||
);
|
||||
|
|
|
@ -65,6 +65,8 @@ interface ExplainLogRateSpikesAnalysisProps {
|
|||
windowParameters: WindowParameters;
|
||||
/** The search query to be applied to the analysis as a filter */
|
||||
searchQuery: Query['query'];
|
||||
/** Sample probability to be applied to random sampler aggregations */
|
||||
sampleProbability: number;
|
||||
}
|
||||
|
||||
export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps> = ({
|
||||
|
@ -73,6 +75,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
|
|||
latest,
|
||||
windowParameters,
|
||||
searchQuery,
|
||||
sampleProbability,
|
||||
}) => {
|
||||
const { http } = useAiopsAppContext();
|
||||
const basePath = http.basePath.get() ?? '';
|
||||
|
@ -114,6 +117,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
|
|||
flushFix: true,
|
||||
...windowParameters,
|
||||
overrides,
|
||||
sampleProbability,
|
||||
},
|
||||
{ reducer: streamReducer, initialState }
|
||||
);
|
||||
|
|
|
@ -118,7 +118,8 @@ export const ExplainLogRateSpikesPage: FC = () => {
|
|||
currentSelectedGroup
|
||||
);
|
||||
|
||||
const { totalCount, documentCountStats, documentCountStatsCompare } = documentStats;
|
||||
const { sampleProbability, totalCount, documentCountStats, documentCountStatsCompare } =
|
||||
documentStats;
|
||||
|
||||
useEffect(
|
||||
// TODO: Consolidate this hook/function with with Data visualizer's
|
||||
|
@ -198,6 +199,7 @@ export const ExplainLogRateSpikesPage: FC = () => {
|
|||
currentSelectedGroup
|
||||
)}
|
||||
totalCount={totalCount}
|
||||
sampleProbability={sampleProbability}
|
||||
windowParameters={windowParameters}
|
||||
/>
|
||||
</EuiPanel>
|
||||
|
@ -212,6 +214,7 @@ export const ExplainLogRateSpikesPage: FC = () => {
|
|||
latest={latest}
|
||||
windowParameters={windowParameters}
|
||||
searchQuery={searchQuery}
|
||||
sampleProbability={sampleProbability}
|
||||
/>
|
||||
)}
|
||||
{windowParameters === undefined && (
|
||||
|
|
|
@ -12,6 +12,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
|||
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
|
||||
import type { SignificantTerm } from '@kbn/ml-agg-utils';
|
||||
import type { Query } from '@kbn/es-query';
|
||||
import type { RandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
|
||||
|
||||
import { buildExtendedBaseFilterCriteria } from './application/utils/build_extended_base_filter_criteria';
|
||||
import { GroupTableItem } from './components/spike_analysis_table/types';
|
||||
|
@ -36,9 +37,14 @@ export interface DocumentStatsSearchStrategyParams {
|
|||
selectedSignificantTerm?: SignificantTerm;
|
||||
includeSelectedSignificantTerm?: boolean;
|
||||
selectedGroup?: GroupTableItem | null;
|
||||
trackTotalHits?: boolean;
|
||||
}
|
||||
|
||||
export const getDocumentCountStatsRequest = (params: DocumentStatsSearchStrategyParams) => {
|
||||
export const getDocumentCountStatsRequest = (
|
||||
params: DocumentStatsSearchStrategyParams,
|
||||
randomSamplerWrapper?: RandomSamplerWrapper,
|
||||
skipAggs = false
|
||||
) => {
|
||||
const {
|
||||
index,
|
||||
timeFieldName,
|
||||
|
@ -51,6 +57,7 @@ export const getDocumentCountStatsRequest = (params: DocumentStatsSearchStrategy
|
|||
selectedSignificantTerm,
|
||||
includeSelectedSignificantTerm,
|
||||
selectedGroup,
|
||||
trackTotalHits,
|
||||
} = params;
|
||||
|
||||
const size = 0;
|
||||
|
@ -64,33 +71,41 @@ export const getDocumentCountStatsRequest = (params: DocumentStatsSearchStrategy
|
|||
selectedGroup
|
||||
);
|
||||
|
||||
// Don't use the sampler aggregation as this can lead to some potentially
|
||||
// confusing date histogram results depending on the date range of data amongst shards.
|
||||
const aggs = {
|
||||
const rawAggs: Record<string, estypes.AggregationsAggregationContainer> = {
|
||||
eventRate: {
|
||||
date_histogram: {
|
||||
field: timeFieldName,
|
||||
fixed_interval: `${intervalMs}ms`,
|
||||
min_doc_count: 0,
|
||||
extended_bounds: {
|
||||
min: earliestMs,
|
||||
max: latestMs,
|
||||
},
|
||||
...(earliestMs !== undefined && latestMs !== undefined
|
||||
? {
|
||||
extended_bounds: {
|
||||
min: earliestMs,
|
||||
max: latestMs,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const aggs = randomSamplerWrapper ? randomSamplerWrapper.wrap(rawAggs) : rawAggs;
|
||||
|
||||
const searchBody = {
|
||||
query: {
|
||||
bool: {
|
||||
filter: filterCriteria,
|
||||
},
|
||||
},
|
||||
...(!fieldsToFetch && timeFieldName !== undefined && intervalMs !== undefined && intervalMs > 0
|
||||
...(!fieldsToFetch &&
|
||||
!skipAggs &&
|
||||
timeFieldName !== undefined &&
|
||||
intervalMs !== undefined &&
|
||||
intervalMs > 0
|
||||
? { aggs }
|
||||
: {}),
|
||||
...(isPopulatedObject(runtimeFieldMap) ? { runtime_mappings: runtimeFieldMap } : {}),
|
||||
track_total_hits: true,
|
||||
track_total_hits: trackTotalHits === true,
|
||||
size,
|
||||
};
|
||||
return {
|
||||
|
@ -101,7 +116,8 @@ export const getDocumentCountStatsRequest = (params: DocumentStatsSearchStrategy
|
|||
|
||||
export const processDocumentCountStats = (
|
||||
body: estypes.SearchResponse | undefined,
|
||||
params: DocumentStatsSearchStrategyParams
|
||||
params: DocumentStatsSearchStrategyParams,
|
||||
randomSamplerWrapper?: RandomSamplerWrapper
|
||||
): DocumentCountStats | undefined => {
|
||||
if (!body) return undefined;
|
||||
|
||||
|
@ -118,8 +134,10 @@ export const processDocumentCountStats = (
|
|||
}
|
||||
const buckets: { [key: string]: number } = {};
|
||||
const dataByTimeBucket: Array<{ key: string; doc_count: number }> = get(
|
||||
body,
|
||||
['aggregations', 'eventRate', 'buckets'],
|
||||
randomSamplerWrapper && body.aggregations !== undefined
|
||||
? randomSamplerWrapper.unwrap(body.aggregations)
|
||||
: body.aggregations,
|
||||
['eventRate', 'buckets'],
|
||||
[]
|
||||
);
|
||||
each(dataByTimeBucket, (dataForTime) => {
|
||||
|
|
|
@ -11,6 +11,9 @@ import { lastValueFrom } from 'rxjs';
|
|||
import { i18n } from '@kbn/i18n';
|
||||
import type { ToastsStart } from '@kbn/core/public';
|
||||
import { stringHash } from '@kbn/ml-string-hash';
|
||||
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
|
||||
|
||||
import { RANDOM_SAMPLER_SEED } from '../../common/constants';
|
||||
|
||||
import { extractErrorProperties } from '../application/utils/error_utils';
|
||||
import {
|
||||
|
@ -23,6 +26,7 @@ import {
|
|||
import { useAiopsAppContext } from './use_aiops_app_context';
|
||||
|
||||
export interface DocumentStats {
|
||||
sampleProbability: number;
|
||||
totalCount: number;
|
||||
documentCountStats?: DocumentCountStats;
|
||||
documentCountStatsCompare?: DocumentCountStats;
|
||||
|
@ -67,6 +71,7 @@ export function useDocumentCountStats<TParams extends DocumentStatsSearchStrateg
|
|||
const abortCtrl = useRef(new AbortController());
|
||||
|
||||
const [documentStats, setDocumentStats] = useState<DocumentStats>({
|
||||
sampleProbability: 1,
|
||||
totalCount: 0,
|
||||
});
|
||||
|
||||
|
@ -87,19 +92,48 @@ export function useDocumentCountStats<TParams extends DocumentStatsSearchStrateg
|
|||
try {
|
||||
abortCtrl.current = new AbortController();
|
||||
|
||||
const totalHitsParams = {
|
||||
...searchParams,
|
||||
selectedSignificantTerm: undefined,
|
||||
trackTotalHits: true,
|
||||
};
|
||||
|
||||
const totalHitsResp = await lastValueFrom(
|
||||
data.search.search(
|
||||
{
|
||||
params: getDocumentCountStatsRequest(totalHitsParams, undefined, true),
|
||||
},
|
||||
{ abortSignal: abortCtrl.current.signal }
|
||||
)
|
||||
);
|
||||
const totalHitsStats = processDocumentCountStats(totalHitsResp?.rawResponse, searchParams);
|
||||
const totalCount = totalHitsStats?.totalCount ?? 0;
|
||||
|
||||
const randomSamplerWrapper = createRandomSamplerWrapper({
|
||||
totalNumDocs: totalCount,
|
||||
seed: RANDOM_SAMPLER_SEED,
|
||||
});
|
||||
|
||||
const resp = await lastValueFrom(
|
||||
data.search.search(
|
||||
{
|
||||
params: getDocumentCountStatsRequest(searchParams),
|
||||
params: getDocumentCountStatsRequest(
|
||||
{ ...searchParams, trackTotalHits: false },
|
||||
randomSamplerWrapper
|
||||
),
|
||||
},
|
||||
{ abortSignal: abortCtrl.current.signal }
|
||||
)
|
||||
);
|
||||
|
||||
const documentCountStats = processDocumentCountStats(resp?.rawResponse, searchParams);
|
||||
const totalCount = documentCountStats?.totalCount ?? 0;
|
||||
const documentCountStats = processDocumentCountStats(
|
||||
resp?.rawResponse,
|
||||
searchParams,
|
||||
randomSamplerWrapper
|
||||
);
|
||||
|
||||
const newStats: DocumentStats = {
|
||||
sampleProbability: randomSamplerWrapper.probability,
|
||||
documentCountStats,
|
||||
totalCount,
|
||||
};
|
||||
|
@ -108,7 +142,10 @@ export function useDocumentCountStats<TParams extends DocumentStatsSearchStrateg
|
|||
const respCompare = await lastValueFrom(
|
||||
data.search.search(
|
||||
{
|
||||
params: getDocumentCountStatsRequest(searchParamsCompare),
|
||||
params: getDocumentCountStatsRequest(
|
||||
{ ...searchParamsCompare, trackTotalHits: false },
|
||||
randomSamplerWrapper
|
||||
),
|
||||
},
|
||||
{ abortSignal: abortCtrl.current.signal }
|
||||
)
|
||||
|
@ -116,12 +153,12 @@ export function useDocumentCountStats<TParams extends DocumentStatsSearchStrateg
|
|||
|
||||
const documentCountStatsCompare = processDocumentCountStats(
|
||||
respCompare?.rawResponse,
|
||||
searchParamsCompare
|
||||
searchParamsCompare,
|
||||
randomSamplerWrapper
|
||||
);
|
||||
const totalCountCompare = documentCountStatsCompare?.totalCount ?? 0;
|
||||
|
||||
newStats.documentCountStatsCompare = documentCountStatsCompare;
|
||||
newStats.totalCount = totalCount + totalCountCompare;
|
||||
newStats.totalCount = totalCount;
|
||||
}
|
||||
|
||||
setDocumentStats(newStats);
|
||||
|
|
|
@ -24,6 +24,7 @@ import type {
|
|||
import { fetchHistogramsForFields } from '@kbn/ml-agg-utils';
|
||||
import { createExecutionContext } from '@kbn/ml-route-utils';
|
||||
|
||||
import { RANDOM_SAMPLER_SEED } from '../../common/constants';
|
||||
import {
|
||||
addSignificantTermsAction,
|
||||
addSignificantTermsGroupAction,
|
||||
|
@ -93,6 +94,7 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
logDebugMessage('Starting analysis.');
|
||||
|
||||
const groupingEnabled = !!request.body.grouping;
|
||||
const sampleProbability = request.body.sampleProbability ?? 1;
|
||||
|
||||
const controller = new AbortController();
|
||||
const abortSignal = controller.signal;
|
||||
|
@ -189,7 +191,6 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
[];
|
||||
let fieldCandidatesCount = fieldCandidates.length;
|
||||
|
||||
let sampleProbability = 1;
|
||||
let totalDocCount = 0;
|
||||
|
||||
if (!request.body.overrides?.remainingFieldCandidates) {
|
||||
|
@ -211,7 +212,6 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
const indexInfo = await fetchIndexInfo(client, request.body, abortSignal);
|
||||
fieldCandidates.push(...indexInfo.fieldCandidates);
|
||||
fieldCandidatesCount = fieldCandidates.length;
|
||||
sampleProbability = indexInfo.sampleProbability;
|
||||
totalDocCount = indexInfo.totalDocCount;
|
||||
} catch (e) {
|
||||
if (!isRequestAbortedError(e)) {
|
||||
|
@ -382,7 +382,8 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
-1,
|
||||
undefined,
|
||||
abortSignal,
|
||||
sampleProbability
|
||||
sampleProbability,
|
||||
RANDOM_SAMPLER_SEED
|
||||
)) as [NumericChartData]
|
||||
)[0];
|
||||
} catch (e) {
|
||||
|
@ -511,7 +512,8 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
-1,
|
||||
undefined,
|
||||
abortSignal,
|
||||
sampleProbability
|
||||
sampleProbability,
|
||||
RANDOM_SAMPLER_SEED
|
||||
)) as [NumericChartData]
|
||||
)[0];
|
||||
} catch (e) {
|
||||
|
@ -607,7 +609,8 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
-1,
|
||||
undefined,
|
||||
abortSignal,
|
||||
sampleProbability
|
||||
sampleProbability,
|
||||
RANDOM_SAMPLER_SEED
|
||||
)) as [NumericChartData]
|
||||
)[0];
|
||||
} catch (e) {
|
||||
|
|
|
@ -12,9 +12,10 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
|||
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import { type SignificantTerm, RANDOM_SAMPLER_SEED } from '@kbn/ml-agg-utils';
|
||||
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
|
||||
import { type SignificantTerm } from '@kbn/ml-agg-utils';
|
||||
import { createRandomSamplerWrapper } from '@kbn/ml-random-sampler-utils';
|
||||
|
||||
import { RANDOM_SAMPLER_SEED } from '../../../common/constants';
|
||||
import type { SignificantTermDuplicateGroup, ItemsetResult } from '../../../common/types';
|
||||
|
||||
interface FrequentItemSetsAggregation extends estypes.AggregationsSamplerAggregation {
|
||||
|
@ -23,14 +24,6 @@ interface FrequentItemSetsAggregation extends estypes.AggregationsSamplerAggrega
|
|||
};
|
||||
}
|
||||
|
||||
interface RandomSamplerAggregation {
|
||||
sample: FrequentItemSetsAggregation;
|
||||
}
|
||||
|
||||
function isRandomSamplerAggregation(arg: unknown): arg is RandomSamplerAggregation {
|
||||
return isPopulatedObject(arg, ['sample']);
|
||||
}
|
||||
|
||||
export function groupDuplicates(
|
||||
cps: SignificantTerm[],
|
||||
uniqueFields: Array<keyof SignificantTerm>
|
||||
|
@ -117,21 +110,14 @@ export async function fetchFrequentItemSets(
|
|||
},
|
||||
};
|
||||
|
||||
// frequent items can be slow, so sample and use 10% min_support
|
||||
const randomSamplerAgg: Record<string, estypes.AggregationsAggregationContainer> = {
|
||||
sample: {
|
||||
// @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer`
|
||||
random_sampler: {
|
||||
probability: sampleProbability,
|
||||
seed: RANDOM_SAMPLER_SEED,
|
||||
},
|
||||
aggs: frequentItemSetsAgg,
|
||||
},
|
||||
};
|
||||
const { wrap, unwrap } = createRandomSamplerWrapper({
|
||||
probability: sampleProbability,
|
||||
seed: RANDOM_SAMPLER_SEED,
|
||||
});
|
||||
|
||||
const esBody = {
|
||||
query,
|
||||
aggs: sampleProbability < 1 ? randomSamplerAgg : frequentItemSetsAgg,
|
||||
aggs: wrap(frequentItemSetsAgg),
|
||||
size: 0,
|
||||
track_total_hits: true,
|
||||
};
|
||||
|
@ -160,17 +146,17 @@ export async function fetchFrequentItemSets(
|
|||
|
||||
const totalDocCountFi = (body.hits.total as estypes.SearchTotalHits).value;
|
||||
|
||||
const frequentItemSets = isRandomSamplerAggregation(body.aggregations)
|
||||
? body.aggregations.sample.fi
|
||||
: body.aggregations.fi;
|
||||
const frequentItemSets = unwrap(
|
||||
body.aggregations as Record<string, estypes.AggregationsAggregate>
|
||||
) as FrequentItemSetsAggregation;
|
||||
|
||||
const shape = frequentItemSets.buckets.length;
|
||||
const shape = frequentItemSets.fi.buckets.length;
|
||||
let maximum = shape;
|
||||
if (maximum > 50000) {
|
||||
maximum = 50000;
|
||||
}
|
||||
|
||||
const fiss = frequentItemSets.buckets;
|
||||
const fiss = frequentItemSets.fi.buckets;
|
||||
fiss.length = maximum;
|
||||
|
||||
const results: ItemsetResult[] = [];
|
||||
|
|
|
@ -105,13 +105,9 @@ describe('fetch_index_info', () => {
|
|||
search: esClientSearchMock,
|
||||
} as unknown as ElasticsearchClient;
|
||||
|
||||
const { totalDocCount, sampleProbability, fieldCandidates } = await fetchIndexInfo(
|
||||
esClientMock,
|
||||
params
|
||||
);
|
||||
const { totalDocCount, fieldCandidates } = await fetchIndexInfo(esClientMock, params);
|
||||
|
||||
expect(fieldCandidates).toEqual(['myIpFieldName', 'myKeywordFieldName']);
|
||||
expect(sampleProbability).toEqual(0.01);
|
||||
expect(totalDocCount).toEqual(5000000);
|
||||
expect(esClientFieldCapsMock).toHaveBeenCalledTimes(1);
|
||||
expect(esClientSearchMock).toHaveBeenCalledTimes(1);
|
||||
|
|
|
@ -9,7 +9,6 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
|||
|
||||
import { ES_FIELD_TYPES } from '@kbn/field-types';
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { getSampleProbability } from '@kbn/ml-agg-utils';
|
||||
|
||||
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
|
||||
|
||||
|
@ -51,7 +50,7 @@ export const fetchIndexInfo = async (
|
|||
esClient: ElasticsearchClient,
|
||||
params: AiopsExplainLogRateSpikesSchema,
|
||||
abortSignal?: AbortSignal
|
||||
): Promise<{ fieldCandidates: string[]; sampleProbability: number; totalDocCount: number }> => {
|
||||
): Promise<{ fieldCandidates: string[]; totalDocCount: number }> => {
|
||||
const { index } = params;
|
||||
// Get all supported fields
|
||||
const respMapping = await esClient.fieldCaps(
|
||||
|
@ -96,7 +95,6 @@ export const fetchIndexInfo = async (
|
|||
});
|
||||
|
||||
const totalDocCount = (resp.hits.total as estypes.SearchTotalHits).value;
|
||||
const sampleProbability = getSampleProbability(totalDocCount);
|
||||
|
||||
return { fieldCandidates: [...finalFieldCandidates], sampleProbability, totalDocCount };
|
||||
return { fieldCandidates: [...finalFieldCandidates], totalDocCount };
|
||||
};
|
||||
|
|
|
@ -9,9 +9,13 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
|||
import { ElasticsearchClient } from '@kbn/core/server';
|
||||
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import { type SignificantTerm, RANDOM_SAMPLER_SEED } from '@kbn/ml-agg-utils';
|
||||
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
|
||||
import { SPIKE_ANALYSIS_THRESHOLD } from '../../../common/constants';
|
||||
import { type SignificantTerm } from '@kbn/ml-agg-utils';
|
||||
import {
|
||||
createRandomSamplerWrapper,
|
||||
type RandomSamplerWrapper,
|
||||
} from '@kbn/ml-random-sampler-utils';
|
||||
|
||||
import { SPIKE_ANALYSIS_THRESHOLD, RANDOM_SAMPLER_SEED } from '../../../common/constants';
|
||||
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
|
||||
|
||||
import { isRequestAbortedError } from '../../lib/is_request_aborted_error';
|
||||
|
@ -25,8 +29,7 @@ import { getRequestBase } from './get_request_base';
|
|||
export const getSignificantTermRequest = (
|
||||
params: AiopsExplainLogRateSpikesSchema,
|
||||
fieldName: string,
|
||||
// The default value of 1 means no sampling will be used
|
||||
sampleProbability: number = 1
|
||||
{ wrap }: RandomSamplerWrapper
|
||||
): estypes.SearchRequest => {
|
||||
const query = getQueryWithParams({
|
||||
params,
|
||||
|
@ -53,7 +56,7 @@ export const getSignificantTermRequest = (
|
|||
];
|
||||
}
|
||||
|
||||
const pValueAgg: Record<string, estypes.AggregationsAggregationContainer> = {
|
||||
const pValueAgg: Record<'change_point_p_value', estypes.AggregationsAggregationContainer> = {
|
||||
change_point_p_value: {
|
||||
significant_terms: {
|
||||
field: fieldName,
|
||||
|
@ -80,21 +83,10 @@ export const getSignificantTermRequest = (
|
|||
},
|
||||
};
|
||||
|
||||
const randomSamplerAgg: Record<string, estypes.AggregationsAggregationContainer> = {
|
||||
sample: {
|
||||
// @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer`
|
||||
random_sampler: {
|
||||
probability: sampleProbability,
|
||||
seed: RANDOM_SAMPLER_SEED,
|
||||
},
|
||||
aggs: pValueAgg,
|
||||
},
|
||||
};
|
||||
|
||||
const body = {
|
||||
query,
|
||||
size: 0,
|
||||
aggs: sampleProbability < 1 ? randomSamplerAgg : pValueAgg,
|
||||
aggs: wrap(pValueAgg),
|
||||
};
|
||||
|
||||
return {
|
||||
|
@ -109,18 +101,6 @@ interface Aggs extends estypes.AggregationsSignificantLongTermsAggregate {
|
|||
buckets: estypes.AggregationsSignificantLongTermsBucket[];
|
||||
}
|
||||
|
||||
interface PValuesAggregation extends estypes.AggregationsSamplerAggregation {
|
||||
change_point_p_value: Aggs;
|
||||
}
|
||||
|
||||
interface RandomSamplerAggregation {
|
||||
sample: PValuesAggregation;
|
||||
}
|
||||
|
||||
function isRandomSamplerAggregation(arg: unknown): arg is RandomSamplerAggregation {
|
||||
return isPopulatedObject(arg, ['sample']);
|
||||
}
|
||||
|
||||
export const fetchSignificantTermPValues = async (
|
||||
esClient: ElasticsearchClient,
|
||||
params: AiopsExplainLogRateSpikesSchema,
|
||||
|
@ -131,14 +111,19 @@ export const fetchSignificantTermPValues = async (
|
|||
emitError: (m: string) => void,
|
||||
abortSignal?: AbortSignal
|
||||
): Promise<SignificantTerm[]> => {
|
||||
const randomSamplerWrapper = createRandomSamplerWrapper({
|
||||
probability: sampleProbability,
|
||||
seed: RANDOM_SAMPLER_SEED,
|
||||
});
|
||||
|
||||
const result: SignificantTerm[] = [];
|
||||
|
||||
const settledPromises = await Promise.allSettled(
|
||||
fieldNames.map((fieldName) =>
|
||||
esClient.search<unknown, { sample: PValuesAggregation } | { change_point_p_value: Aggs }>(
|
||||
getSignificantTermRequest(params, fieldName, sampleProbability),
|
||||
{ signal: abortSignal, maxRetries: 0 }
|
||||
)
|
||||
esClient.search(getSignificantTermRequest(params, fieldName, randomSamplerWrapper), {
|
||||
signal: abortSignal,
|
||||
maxRetries: 0,
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -172,9 +157,9 @@ export const fetchSignificantTermPValues = async (
|
|||
continue;
|
||||
}
|
||||
|
||||
const overallResult = isRandomSamplerAggregation(resp.aggregations)
|
||||
? resp.aggregations.sample.change_point_p_value
|
||||
: resp.aggregations.change_point_p_value;
|
||||
const overallResult = (
|
||||
randomSamplerWrapper.unwrap(resp.aggregations) as Record<'change_point_p_value', Aggs>
|
||||
).change_point_p_value;
|
||||
|
||||
for (const bucket of overallResult.buckets) {
|
||||
const pValue = Math.exp(-bucket.score);
|
||||
|
|
|
@ -49,6 +49,7 @@
|
|||
"@kbn/ml-query-utils",
|
||||
"@kbn/ml-is-defined",
|
||||
"@kbn/ml-route-utils",
|
||||
"@kbn/ml-random-sampler-utils",
|
||||
],
|
||||
"exclude": [
|
||||
"target/**/*",
|
||||
|
|
|
@ -59,45 +59,3 @@ export function buildAggregationWithSamplingOption(
|
|||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the supplied aggregations in a random sampler aggregation.
|
||||
*/
|
||||
export function buildRandomSamplerAggregation(
|
||||
aggs: Aggs,
|
||||
probability: number | null,
|
||||
seed: number
|
||||
): Record<string, estypes.AggregationsAggregationContainer> {
|
||||
if (probability === null || probability <= 0 || probability > 1) {
|
||||
return aggs;
|
||||
}
|
||||
|
||||
return {
|
||||
sample: {
|
||||
aggs,
|
||||
// @ts-expect-error AggregationsAggregationContainer needs to be updated with random_sampler
|
||||
random_sampler: {
|
||||
probability,
|
||||
...(seed ? { seed } : {}),
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function buildSamplerAggregation(
|
||||
aggs: Aggs,
|
||||
shardSize: number
|
||||
): Record<string, estypes.AggregationsAggregationContainer> {
|
||||
if (shardSize <= 0) {
|
||||
return aggs;
|
||||
}
|
||||
|
||||
return {
|
||||
sample: {
|
||||
aggs,
|
||||
sampler: {
|
||||
shard_size: shardSize,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
@ -4561,6 +4561,10 @@
|
|||
version "0.0.0"
|
||||
uid ""
|
||||
|
||||
"@kbn/ml-random-sampler-utils@link:x-pack/packages/ml/random_sampler_utils":
|
||||
version "0.0.0"
|
||||
uid ""
|
||||
|
||||
"@kbn/ml-route-utils@link:x-pack/packages/ml/route_utils":
|
||||
version "0.0.0"
|
||||
uid ""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue