[ML] Explain Log Rate Spikes: Group results API. (#140683)

Extends the `/internal/aiops/explain_log_rate_spikes` with an option to extend the analysis and summarize significant field/value pairs into groups using the frequent_items aggregation.
This commit is contained in:
Walter Rafelsberger 2022-09-14 18:26:15 +02:00 committed by GitHub
parent 0fbe9e9837
commit 5ba23e432d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 524 additions and 6 deletions

View file

@ -15,6 +15,7 @@ export type { FieldsForHistograms } from './src/fetch_histograms_for_fields';
export type {
AggCardinality,
ChangePoint,
ChangePointGroup,
ChangePointHistogram,
ChangePointHistogramItem,
HistogramField,

View file

@ -62,6 +62,8 @@ export interface HistogramField {
export interface ChangePoint extends FieldValuePair {
doc_count: number;
bg_count: number;
total_doc_count: number;
total_bg_count: number;
score: number;
pValue: number | null;
normalizedScore: number;
@ -84,3 +86,15 @@ export interface ChangePointHistogramItem {
export interface ChangePointHistogram extends FieldValuePair {
histogram: ChangePointHistogramItem[];
}
interface ChangePointGroupItem extends FieldValuePair {
duplicate?: boolean;
}
/**
* Tree leaves
*/
export interface ChangePointGroup {
group: ChangePointGroupItem[];
docCount: number;
}

View file

@ -5,11 +5,12 @@
* 2.0.
*/
import type { ChangePoint, ChangePointHistogram } from '@kbn/ml-agg-utils';
import type { ChangePoint, ChangePointHistogram, ChangePointGroup } from '@kbn/ml-agg-utils';
export const API_ACTION_NAME = {
ADD_CHANGE_POINTS: 'add_change_points',
ADD_CHANGE_POINTS_HISTOGRAM: 'add_change_points_histogram',
ADD_CHANGE_POINTS_GROUP: 'add_change_point_group',
ADD_ERROR: 'add_error',
RESET: 'reset',
UPDATE_LOADING_STATE: 'update_loading_state',
@ -44,6 +45,18 @@ export function addChangePointsHistogramAction(
};
}
interface ApiActionAddChangePointsGroup {
type: typeof API_ACTION_NAME.ADD_CHANGE_POINTS_GROUP;
payload: ChangePointGroup[];
}
export function addChangePointsGroupAction(payload: ApiActionAddChangePointsGroup['payload']) {
return {
type: API_ACTION_NAME.ADD_CHANGE_POINTS_GROUP,
payload,
};
}
interface ApiActionAddError {
type: typeof API_ACTION_NAME.ADD_ERROR;
payload: string;
@ -84,6 +97,7 @@ export function updateLoadingStateAction(
export type AiopsExplainLogRateSpikesApiAction =
| ApiActionAddChangePoints
| ApiActionAddChangePointsGroup
| ApiActionAddChangePointsHistogram
| ApiActionAddError
| ApiActionReset

View file

@ -7,6 +7,7 @@
export {
addChangePointsAction,
addChangePointsGroupAction,
addChangePointsHistogramAction,
addErrorAction,
resetAction,

View file

@ -13,6 +13,7 @@ export const aiopsExplainLogRateSpikesSchema = schema.object({
searchQuery: schema.string(),
timeFieldName: schema.string(),
includeFrozen: schema.maybe(schema.boolean()),
grouping: schema.maybe(schema.boolean()),
/** Analysis selection time ranges */
baselineMin: schema.number(),
baselineMax: schema.number(),

View file

@ -24,6 +24,7 @@ describe('streamReducer', () => {
loaded: 50,
loadingState: 'Loaded 50%',
changePoints: [],
changePointsGroups: [],
errors: [],
});
});
@ -37,6 +38,8 @@ describe('streamReducer', () => {
fieldValue: 'the-field-value',
doc_count: 10,
bg_count: 100,
total_doc_count: 1000,
total_bg_count: 10000,
score: 0.1,
pValue: 0.01,
normalizedScore: 0.123,

View file

@ -5,13 +5,14 @@
* 2.0.
*/
import type { ChangePoint } from '@kbn/ml-agg-utils';
import type { ChangePoint, ChangePointGroup } from '@kbn/ml-agg-utils';
import { API_ACTION_NAME, AiopsExplainLogRateSpikesApiAction } from './explain_log_rate_spikes';
interface StreamState {
ccsWarning: boolean;
changePoints: ChangePoint[];
changePointsGroups: ChangePointGroup[];
errors: string[];
loaded: number;
loadingState: string;
@ -20,6 +21,7 @@ interface StreamState {
export const initialState: StreamState = {
ccsWarning: false,
changePoints: [],
changePointsGroups: [],
errors: [],
loaded: 0,
loadingState: '',
@ -47,6 +49,8 @@ export function streamReducer(
return cp;
});
return { ...state, changePoints };
case API_ACTION_NAME.ADD_CHANGE_POINTS_GROUP:
return { ...state, changePointsGroups: action.payload };
case API_ACTION_NAME.ADD_ERROR:
return { ...state, errors: [...state.errors, action.payload] };
case API_ACTION_NAME.RESET:

View file

@ -19,6 +19,7 @@ import { fetchHistogramsForFields } from '@kbn/ml-agg-utils';
import {
addChangePointsAction,
addChangePointsGroupAction,
addChangePointsHistogramAction,
aiopsExplainLogRateSpikesSchema,
addErrorAction,
@ -30,8 +31,14 @@ import { API_ENDPOINT } from '../../common/api';
import type { AiopsLicense } from '../types';
import { fetchFieldCandidates } from './queries/fetch_field_candidates';
import { fetchChangePointPValues } from './queries/fetch_change_point_p_values';
import { fetchFieldCandidates } from './queries/fetch_field_candidates';
import { fetchFrequentItems } from './queries/fetch_frequent_items';
import {
getSimpleHierarchicalTree,
getSimpleHierarchicalTreeLeaves,
markDuplicates,
} from './queries/get_simple_hierarchical_tree';
// Overall progress is a float from 0 to 1.
const LOADED_FIELD_CANDIDATES = 0.2;
@ -55,6 +62,8 @@ export const defineExplainLogRateSpikesRoute = (
return response.forbidden();
}
const groupingEnabled = !!request.body.grouping;
const client = (await context.core).elasticsearch.client.asCurrentUser;
const controller = new AbortController();
@ -198,6 +207,33 @@ export const defineExplainLogRateSpikesRoute = (
return;
}
if (groupingEnabled) {
const { fields, df } = await fetchFrequentItems(
client,
request.body.index,
changePoints,
request.body.timeFieldName,
request.body.deviationMin,
request.body.deviationMax
);
// Filter itemsets by significant change point field value pairs
const filteredDf = df.filter((fi) => {
const { set: currentItems } = fi;
return Object.entries(currentItems).every(([key, value]) => {
return changePoints.some((cp) => {
return cp.fieldName === key && cp.fieldValue === value;
});
});
});
const { root } = getSimpleHierarchicalTree(filteredDf, true, false, fields);
const changePointsGroups = getSimpleHierarchicalTreeLeaves(root, []);
push(addChangePointsGroupAction(markDuplicates(changePointsGroups)));
}
const histogramFields: [NumericHistogramField] = [
{ fieldName: request.body.timeFieldName, type: KBN_FIELD_TYPES.DATE },
];

View file

@ -121,7 +121,9 @@ export const fetchChangePointPValues = async (
fieldName,
fieldValue: String(bucket.key),
doc_count: bucket.doc_count,
bg_count: bucket.doc_count,
bg_count: bucket.bg_count,
total_doc_count: overallResult.doc_count,
total_bg_count: overallResult.bg_count,
score: bucket.score,
pValue,
normalizedScore,

View file

@ -0,0 +1,175 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { uniq, uniqWith, pick, isEqual } from 'lodash';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { ChangePoint } from '@kbn/ml-agg-utils';
interface FrequentItemsAggregation extends estypes.AggregationsSamplerAggregation {
fi: {
buckets: Array<{ key: Record<string, string[]>; doc_count: number; support: number }>;
};
}
function dropDuplicates(cp: ChangePoint[], uniqueFields: string[]) {
return uniqWith(cp, (a, b) => isEqual(pick(a, uniqueFields), pick(b, uniqueFields)));
}
export async function fetchFrequentItems(
client: ElasticsearchClient,
index: string,
changePoints: ChangePoint[],
timeFieldName: string,
deviationMin: number,
deviationMax: number
) {
// first remove duplicates in sig terms - note this is not strictly perfect as there could
// be conincidentally equal counts, but in general is ok...
const terms = dropDuplicates(changePoints, [
'doc_count',
'bg_count',
'total_doc_count',
'total_bg_count',
]);
// get unique fields that are left
const fields = [...new Set(terms.map((t) => t.fieldName))];
// TODO add query params
const query = {
bool: {
filter: [
{
range: {
[timeFieldName]: {
gte: deviationMin,
lt: deviationMax,
},
},
},
],
should: terms.map((t) => {
return { term: { [t.fieldName]: t.fieldValue } };
}),
},
};
const aggFields = fields.map((field) => ({
field,
}));
const totalDocCount = terms[0].total_doc_count;
const minDocCount = 50000;
let sampleProbability = 1;
if (totalDocCount > minDocCount) {
sampleProbability = Math.min(0.5, minDocCount / totalDocCount);
}
// frequent items can be slow, so sample and use 10% min_support
const aggs: Record<string, estypes.AggregationsAggregationContainer> = {
sample: {
random_sampler: {
probability: sampleProbability,
},
aggs: {
fi: {
// @ts-expect-error `frequent_items` is not yet part of `AggregationsAggregationContainer`
frequent_items: {
size: 200,
minimum_support: 0.1,
fields: aggFields,
},
},
},
},
};
const body = await client.search<unknown, { sample: FrequentItemsAggregation }>(
{
index,
size: 0,
body: {
query,
aggs,
size: 0,
track_total_hits: true,
},
},
{ maxRetries: 0 }
);
const totalDocCountFi = (body.hits.total as estypes.SearchTotalHits).value;
if (body.aggregations === undefined) {
throw new Error('fetchFrequentItems failed, did not return aggregations.');
}
const shape = body.aggregations.sample.fi.buckets.length;
let maximum = shape;
if (maximum > 50000) {
maximum = 50000;
}
const fiss = body.aggregations.sample.fi.buckets;
fiss.length = maximum;
const results: ItemsetResult[] = [];
fiss.forEach((fis) => {
const result: ItemsetResult = {
set: {},
size: 0,
maxPValue: 0,
doc_count: 0,
support: 0,
total_doc_count: 0,
};
let maxPValue: number | undefined;
Object.entries(fis.key).forEach(([key, value]) => {
result.set[key] = value[0];
const pValue = changePoints.find(
(t) => t.fieldName === key && t.fieldValue === value[0]
)?.pValue;
if (pValue !== undefined && pValue !== null) {
maxPValue = Math.max(maxPValue ?? 0, pValue);
}
});
if (maxPValue === undefined) {
return;
}
result.size = Object.keys(result).length;
result.maxPValue = maxPValue;
result.doc_count = fis.doc_count;
result.support = fis.support;
result.total_doc_count = totalDocCountFi;
results.push(result);
});
return {
fields: uniq(results.flatMap((r) => Object.keys(r.set))),
df: results,
totalDocCount: totalDocCountFi,
};
}
export interface ItemsetResult {
set: Record<string, string>;
size: number;
maxPValue: number;
doc_count: number;
support: number;
total_doc_count: number;
}

View file

@ -0,0 +1,267 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { max } from 'd3-array';
// import { omit, uniq } from 'lodash';
import type { ChangePointGroup, FieldValuePair } from '@kbn/ml-agg-utils';
import type { ItemsetResult } from './fetch_frequent_items';
function getValueCounts(df: ItemsetResult[], field: string) {
return df.reduce((p, c) => {
if (c.set[field] === undefined) {
return p;
}
p[c.set[field]] = p[c.set[field]] ? p[c.set[field]] + 1 : 1;
return p;
}, {} as Record<string, number>);
}
function getValuesDescending(df: ItemsetResult[], field: string): string[] {
const valueCounts = getValueCounts(df, field);
const keys = Object.keys(valueCounts);
return keys.sort((a, b) => {
return valueCounts[b] - valueCounts[a];
});
}
interface NewNode {
name: string;
set: FieldValuePair[];
docCount: number;
children: NewNode[];
icon: string;
iconStyle: string;
addNode: (node: NewNode) => void;
}
function NewNodeFactory(name: string): NewNode {
const children: NewNode[] = [];
const addNode = (node: NewNode) => {
children.push(node);
};
return {
name,
set: [],
docCount: 0,
children,
icon: 'default',
iconStyle: 'default',
addNode,
};
}
/**
* Simple (poorly implemented) function that constructs a tree from an itemset DataFrame sorted by support (count)
* The resulting tree components are non-overlapping subsets of the data.
* In summary, we start with the most inclusive itemset (highest count), and perform a depth first search in field order.
*
* TODO - the code style here is hacky and should be re-written
*
* @param displayParent
* @param parentDocCount
* @param parentLabel
* @param field
* @param value
* @param iss
* @param collapseRedundant
* @param displayOther
* @returns
*/
function dfDepthFirstSearch(
fields: string[],
displayParent: NewNode,
parentDocCount: number,
parentLabel: string,
field: string,
value: string,
iss: ItemsetResult[],
collapseRedundant: boolean,
displayOther: boolean
) {
const filteredItemSets = iss.filter((is) => {
for (const [key, values] of Object.entries(is.set)) {
if (key === field && values.includes(value)) {
return true;
}
}
return false;
});
if (filteredItemSets.length === 0) {
return 0;
}
const docCount = max(filteredItemSets.map((fis) => fis.doc_count)) ?? 0;
const totalDocCount = max(filteredItemSets.map((fis) => fis.total_doc_count)) ?? 0;
let label = `${parentLabel} ${value}`;
let displayNode: NewNode;
if (parentDocCount === docCount && collapseRedundant) {
// collapse identical paths
displayParent.name += ` ${value}`;
displayParent.set.push({ fieldName: field, fieldValue: value });
displayParent.docCount = docCount;
displayNode = displayParent;
} else {
displayNode = NewNodeFactory(`${docCount}/${totalDocCount}${label}`);
displayNode.iconStyle = 'warning';
displayNode.set = [...displayParent.set];
displayNode.set.push({ fieldName: field, fieldValue: value });
displayNode.docCount = docCount;
displayParent.addNode(displayNode);
}
let nextField: string;
while (true) {
const nextFieldIndex = fields.indexOf(field) + 1;
if (nextFieldIndex >= fields.length) {
displayNode.icon = 'file';
displayNode.iconStyle = 'info';
return docCount;
}
nextField = fields[nextFieldIndex];
// TODO - add handling of creating * as next level of tree
if (Object.keys(getValueCounts(filteredItemSets, nextField)).length > 0) {
break;
} else {
field = nextField;
if (collapseRedundant) {
// add dummy node label
displayNode.name += ` '*'`;
label += ` '*'`;
const nextDisplayNode = NewNodeFactory(`${docCount}/${totalDocCount}${label}`);
nextDisplayNode.iconStyle = 'warning';
nextDisplayNode.set = displayNode.set;
nextDisplayNode.docCount = docCount;
displayNode.addNode(nextDisplayNode);
displayNode = nextDisplayNode;
}
}
}
let subCount = 0;
for (const nextValue of getValuesDescending(filteredItemSets, nextField)) {
subCount += dfDepthFirstSearch(
fields,
displayNode,
docCount,
label,
nextField,
nextValue,
filteredItemSets,
collapseRedundant,
displayOther
);
}
if (displayOther) {
if (subCount < docCount) {
displayNode.addNode(
NewNodeFactory(`${docCount - subCount}/${totalDocCount}${parentLabel} '{value}' 'OTHER`)
);
}
}
return docCount;
}
/**
* Create simple tree consisting or non-overlapping sets of data.
*
* By default (fields==None), the field search order is dependent on the highest count itemsets.
*/
export function getSimpleHierarchicalTree(
df: ItemsetResult[],
collapseRedundant: boolean,
displayOther: boolean,
fields: string[] = []
) {
// const candidates = uniq(
// df.flatMap((d) =>
// Object.keys(omit(d, ['size', 'maxPValue', 'doc_count', 'support', 'total_doc_count']))
// )
// );
const field = fields[0];
const totalDocCount = max(df.map((d) => d.total_doc_count)) ?? 0;
const newRoot = NewNodeFactory('');
for (const value of getValuesDescending(df, field)) {
dfDepthFirstSearch(
fields,
newRoot,
totalDocCount + 1,
'',
field,
value,
df,
collapseRedundant,
displayOther
);
}
return { root: newRoot, fields };
}
/**
* Get leaves from hierarchical tree.
*/
export function getSimpleHierarchicalTreeLeaves(
tree: NewNode,
leaves: ChangePointGroup[],
level = 1
) {
// console.log(`${'-'.repeat(level)} ${tree.name} ${tree.children.length}`);
if (tree.children.length === 0) {
leaves.push({ group: tree.set, docCount: tree.docCount });
} else {
for (const child of tree.children) {
const newLeaves = getSimpleHierarchicalTreeLeaves(child, [], level + 1);
if (newLeaves.length > 0) {
leaves.push(...newLeaves);
}
}
}
return leaves;
}
/**
* Analyse duplicate field/value pairs in change point groups.
*/
export function markDuplicates(cpgs: ChangePointGroup[]): ChangePointGroup[] {
const fieldValuePairCounts: Record<string, number> = {};
cpgs.forEach((cpg) => {
cpg.group.forEach((g) => {
const str = `${g.fieldName}$$$$${g.fieldValue}`;
fieldValuePairCounts[str] = fieldValuePairCounts[str] ? fieldValuePairCounts[str] + 1 : 1;
});
});
return cpgs.map((cpg) => {
return {
...cpg,
group: cpg.group.map((g) => {
const str = `${g.fieldName}$$$$${g.fieldValue}`;
return {
...g,
duplicate: fieldValuePairCounts[str] > 1,
};
}),
};
});
}

View file

@ -46,7 +46,7 @@ export default ({ getService }: FtrProviderContext) => {
fieldName: 'day_of_week',
fieldValue: 'Wednesday',
doc_count: 145,
bg_count: 145,
bg_count: 142,
score: 36.31595998561873,
pValue: 1.6911377077437753e-16,
normalizedScore: 0.8055203624020835,
@ -55,7 +55,7 @@ export default ({ getService }: FtrProviderContext) => {
fieldName: 'day_of_week',
fieldValue: 'Thursday',
doc_count: 157,
bg_count: 157,
bg_count: 224,
score: 20.366950718358762,
pValue: 1.428057484826135e-9,
normalizedScore: 0.7661649691018979,