[ML] Explain log rate spikes: Analysis API endpoint. (#135058)

Updates the API endpoint "/internal/aiops/explain_log_rate_spikes" to work with real data instead of the dummy example.

The analysis now includes identifying field candidates and statistically significant field value pairs. Because there is no UI in place yet to allow a user to select custom time ranges for baseline and deviation, for now we fetch a date histogram of the selected index pattern and identify the parameters to get passed as WindowParameters automatically by picking the part of the date histogram with the highest doc count.

This PR is more about getting the API endpoint into shape and updates integration tests too. The UI code should be considered temporary. At the moment it just outputs the returned analysis state as raw JSON in a EUI code block.
This commit is contained in:
Walter Rafelsberger 2022-06-28 13:26:09 +02:00 committed by GitHub
parent 2fd780b549
commit a0f69e0e43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1077 additions and 111 deletions

View file

@ -37,7 +37,10 @@
"xpack.logstash": ["plugins/logstash"],
"xpack.main": "legacy/plugins/xpack_main",
"xpack.maps": ["plugins/maps"],
"xpack.aiops": ["plugins/aiops"],
"xpack.aiops": [
"packages/ml/aiops_utils",
"plugins/aiops"
],
"xpack.ml": ["plugins/ml"],
"xpack.monitoring": ["plugins/monitoring"],
"xpack.osquery": ["plugins/osquery"],

View file

@ -37,8 +37,10 @@ NPM_MODULE_EXTRA_FILES = [
# "@npm//name-of-package"
# eg. "@npm//lodash"
RUNTIME_DEPS = [
"//packages/kbn-logging",
"@npm//react"
"@npm//react",
"@npm//@elastic/eui",
"//packages/kbn-i18n-react",
"//packages/kbn-logging"
]
# In this array place dependencies necessary to build the types, which will include the
@ -51,10 +53,12 @@ RUNTIME_DEPS = [
#
# References to NPM packages work the same as RUNTIME_DEPS
TYPES_DEPS = [
"//packages/kbn-logging:npm_module_types",
"@npm//@types/node",
"@npm//@types/jest",
"@npm//@types/react"
"@npm//@types/react",
"@npm//@elastic/eui",
"//packages/kbn-i18n-react:npm_module_types",
"//packages/kbn-logging:npm_module_types"
]
jsts_transpiler(

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { EuiButton, EuiFlexGroup, EuiFlexItem, EuiProgress, EuiText } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { FormattedMessage } from '@kbn/i18n-react';
import React from 'react';
interface ProgressControlProps {
progress: number;
progressMessage: string;
onRefresh: () => void;
onCancel: () => void;
isRunning: boolean;
}
export function ProgressControls({
progress,
progressMessage,
onRefresh,
onCancel,
isRunning,
}: ProgressControlProps) {
return (
<EuiFlexGroup>
<EuiFlexItem>
<EuiFlexGroup direction="column" gutterSize="none">
<EuiFlexItem data-test-subj="aiopProgressTitle">
<EuiText size="xs" color="subdued">
<FormattedMessage
data-test-subj="aiopsProgressTitleMessage"
id="xpack.aiops.progressTitle"
defaultMessage="Progress: {progress}% — {progressMessage}"
values={{ progress: Math.round(progress * 100), progressMessage }}
/>
</EuiText>
</EuiFlexItem>
<EuiFlexItem>
<EuiProgress
aria-label={i18n.translate('xpack.aiops.progressAriaLabel', {
defaultMessage: 'Progress',
})}
value={Math.round(progress * 100)}
max={100}
size="m"
/>
</EuiFlexItem>
</EuiFlexGroup>
</EuiFlexItem>
<EuiFlexItem grow={false}>
{!isRunning && (
<EuiButton size="s" onClick={onRefresh}>
<FormattedMessage id="xpack.aiops.refreshButtonTitle" defaultMessage="Refresh" />
</EuiButton>
)}
{isRunning && (
<EuiButton size="s" onClick={onCancel}>
<FormattedMessage id="xpack.aiops.cancelButtonTitle" defaultMessage="Cancel" />
</EuiButton>
)}
</EuiFlexItem>
</EuiFlexGroup>
);
}

View file

@ -5,6 +5,9 @@
* 2.0.
*/
export { ProgressControls } from './components/progress_controls';
export { getWindowParameters } from './lib/get_window_parameters';
export type { WindowParameters } from './lib/get_window_parameters';
export { streamFactory } from './lib/stream_factory';
export { useFetchStream } from './lib/use_fetch_stream';
export type {

View file

@ -5,9 +5,11 @@
* 2.0.
*/
// TODO: Replace these with kbn packaged versions once we have those available to us.
// At the moment imports from runtime plugins into packages are not supported.
// import type { Headers } from '@kbn/core/server';
/**
* TODO: Replace these with kbn packaged versions once we have those available to us.
* At the moment imports from runtime plugins into packages are not supported.
* import type { Headers } from '@kbn/core/server';
*/
type Headers = Record<string, string | string[] | undefined>;
function containsGzip(s: string) {

View file

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/**
* Time range definition for baseline and deviation to be used by spike log analysis.
*/
export interface WindowParameters {
baselineMin: number;
baselineMax: number;
deviationMin: number;
deviationMax: number;
}
/**
* Given a point in time (e.g. where a user clicks), use simple heuristics to compute:
*
* 1. The time window around the click to evaluate for changes
* 2. The historical time window prior to the click to use as a baseline.
*
* The philosophy here is that charts are displayed with different granularities according to their
* overall time window. We select the change point and historical time windows inline with the
* overall time window.
*
* The algorithm for doing this is based on the typical granularities that exist in machine data.
*
* @param clickTime timestamp of the clicked log rate spike.
* @param minTime minimum timestamp of the time window to be analysed
* @param maxTime maximum timestamp of the time window to be analysed
* @returns WindowParameters
*/
export const getWindowParameters = (
clickTime: number,
minTime: number,
maxTime: number
): WindowParameters => {
const totalWindow = maxTime - minTime;
// min deviation window
const minDeviationWindow = 10 * 60 * 1000; // 10min
const minBaselineWindow = 30 * 60 * 1000; // 30min
const minWindowGap = 5 * 60 * 1000; // 5min
// work out bounds as done in the original notebooks,
// with the deviation window aiming to be a 1/10
// of the size of the total window and the baseline window
// being 3.5/10 of the total window.
const deviationWindow = Math.max(totalWindow / 10, minDeviationWindow);
const baselineWindow = Math.max(totalWindow / 3.5, minBaselineWindow);
const windowGap = Math.max(totalWindow / 10, minWindowGap);
const deviationMin = clickTime - deviationWindow / 2;
const deviationMax = clickTime + deviationWindow / 2;
const baselineMax = deviationMin - windowGap;
const baselineMin = baselineMax - baselineWindow;
return {
baselineMin: Math.round(baselineMin),
baselineMax: Math.round(baselineMax),
deviationMin: Math.round(deviationMin),
deviationMax: Math.round(deviationMax),
};
};

View file

@ -8,12 +8,13 @@
import { Stream } from 'stream';
import * as zlib from 'zlib';
// TODO: Replace these with kbn packaged versions once we have those available to us.
// At the moment imports from runtime plugins into packages are not supported.
// import type { Headers } from '@kbn/core/server';
import { acceptCompression } from './accept_compression';
/**
* TODO: Replace these with kbn packaged versions once we have those available to us.
* At the moment imports from runtime plugins into packages are not supported.
* import type { Headers } from '@kbn/core/server';
*/
type Headers = Record<string, string | string[] | undefined>;
// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.

View file

@ -7,7 +7,19 @@
import { schema, TypeOf } from '@kbn/config-schema';
import type { ChangePoint } from '../types';
export const aiopsExplainLogRateSpikesSchema = schema.object({
start: schema.number(),
end: schema.number(),
kuery: schema.string(),
timeFieldName: schema.string(),
includeFrozen: schema.maybe(schema.boolean()),
/** Analysis selection time ranges */
baselineMin: schema.number(),
baselineMax: schema.number(),
deviationMin: schema.number(),
deviationMax: schema.number(),
/** The index to query for log rate spikes */
index: schema.string(),
});
@ -15,20 +27,43 @@ export const aiopsExplainLogRateSpikesSchema = schema.object({
export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;
export const API_ACTION_NAME = {
ADD_FIELDS: 'add_fields',
ADD_CHANGE_POINTS: 'add_change_points',
UPDATE_LOADING_STATE: 'update_loading_state',
} as const;
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];
interface ApiActionAddFields {
type: typeof API_ACTION_NAME.ADD_FIELDS;
payload: string[];
interface ApiActionAddChangePoints {
type: typeof API_ACTION_NAME.ADD_CHANGE_POINTS;
payload: ChangePoint[];
}
export function addFieldsAction(payload: string[]): ApiActionAddFields {
export function addChangePoints(
payload: ApiActionAddChangePoints['payload']
): ApiActionAddChangePoints {
return {
type: API_ACTION_NAME.ADD_FIELDS,
type: API_ACTION_NAME.ADD_CHANGE_POINTS,
payload,
};
}
export type AiopsExplainLogRateSpikesApiAction = ApiActionAddFields;
interface ApiActionUpdateLoadingState {
type: typeof API_ACTION_NAME.UPDATE_LOADING_STATE;
payload: {
ccsWarning: boolean;
loaded: number;
loadingState: string;
};
}
export function updateLoadingStateAction(
payload: ApiActionUpdateLoadingState['payload']
): ApiActionUpdateLoadingState {
return {
type: API_ACTION_NAME.UPDATE_LOADING_STATE,
payload,
};
}
export type AiopsExplainLogRateSpikesApiAction =
| ApiActionAddChangePoints
| ApiActionUpdateLoadingState;

View file

@ -5,14 +5,22 @@
* 2.0.
*/
import type { ChangePoint } from '../types';
import { API_ACTION_NAME, AiopsExplainLogRateSpikesApiAction } from './explain_log_rate_spikes';
interface StreamState {
fields: string[];
ccsWarning: boolean;
changePoints: ChangePoint[];
loaded: number;
loadingState: string;
}
export const initialState: StreamState = {
fields: [],
ccsWarning: false,
changePoints: [],
loaded: 0,
loadingState: '',
};
export function streamReducer(
@ -24,10 +32,10 @@ export function streamReducer(
}
switch (action.type) {
case API_ACTION_NAME.ADD_FIELDS:
return {
fields: [...state.fields, ...action.payload],
};
case API_ACTION_NAME.ADD_CHANGE_POINTS:
return { ...state, changePoints: [...state.changePoints, ...action.payload] };
case API_ACTION_NAME.UPDATE_LOADING_STATE:
return { ...state, ...action.payload };
default:
return state;
}

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const SPIKE_ANALYSIS_THRESHOLD = 0.02;

View file

@ -19,4 +19,4 @@ export const PLUGIN_NAME = 'AIOps';
* This is an internal hard coded feature flag so we can easily turn on/off the
* "Explain log rate spikes UI" during development until the first release.
*/
export const AIOPS_ENABLED = false;
export const AIOPS_ENABLED = true;

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export interface FieldValuePair {
fieldName: string;
fieldValue: string;
isFallbackResult?: boolean;
}
export interface ChangePoint extends FieldValuePair {
doc_count: number;
bg_count: number;
score: number;
pValue: number | null;
}

View file

@ -7,10 +7,11 @@
import React, { useEffect, FC } from 'react';
import { EuiBadge, EuiSpacer, EuiText } from '@elastic/eui';
import { EuiCodeBlock, EuiSpacer, EuiText } from '@elastic/eui';
import type { DataView } from '@kbn/data-views-plugin/public';
import { useFetchStream } from '@kbn/aiops-utils';
import { useFetchStream, ProgressControls } from '@kbn/aiops-utils';
import type { WindowParameters } from '@kbn/aiops-utils';
import { useKibana } from '@kbn/kibana-react-plugin/public';
import { initialState, streamReducer } from '../../../common/api/stream_reducer';
@ -22,15 +23,35 @@ import type { ApiExplainLogRateSpikes } from '../../../common/api';
export interface ExplainLogRateSpikesProps {
/** The data view to analyze. */
dataView: DataView;
/** Window parameters for the analysis */
windowParameters: WindowParameters;
}
export const ExplainLogRateSpikes: FC<ExplainLogRateSpikesProps> = ({ dataView }) => {
export const ExplainLogRateSpikes: FC<ExplainLogRateSpikesProps> = ({
dataView,
windowParameters,
}) => {
const kibana = useKibana();
const basePath = kibana.services.http?.basePath.get() ?? '';
const { start, data, isRunning } = useFetchStream<ApiExplainLogRateSpikes, typeof basePath>(
const { cancel, start, data, isRunning } = useFetchStream<
ApiExplainLogRateSpikes,
typeof basePath
>(
`${basePath}/internal/aiops/explain_log_rate_spikes`,
{ index: dataView.title },
{
// TODO Consider actual user selected time ranges.
// Since we already receive window parameters here,
// we just set a maximum time range of 1970-2038 here.
start: 0,
end: 2147483647000,
// TODO Consider an optional Kuery.
kuery: '',
// TODO Handle data view without time fields.
timeFieldName: dataView.timeFieldName ?? '',
index: dataView.title,
...windowParameters,
},
{ reducer: streamReducer, initialState }
);
@ -42,11 +63,17 @@ export const ExplainLogRateSpikes: FC<ExplainLogRateSpikesProps> = ({ dataView }
return (
<EuiText>
<h2>{dataView.title}</h2>
<p>{isRunning ? 'Loading fields ...' : 'Loaded all fields.'}</p>
<ProgressControls
progress={data.loaded}
progressMessage={data.loadingState ?? ''}
isRunning={isRunning}
onRefresh={start}
onCancel={cancel}
/>
<EuiSpacer size="xs" />
{data.fields.map((field) => (
<EuiBadge>{field}</EuiBadge>
))}
<EuiCodeBlock language="json" fontSize="s" paddingSize="s">
{JSON.stringify(data, null, 2)}
</EuiCodeBlock>
</EuiText>
);
};

View file

@ -5,18 +5,27 @@
* 2.0.
*/
import { firstValueFrom } from 'rxjs';
import { chunk } from 'lodash';
import type { IRouter, Logger } from '@kbn/core/server';
import type { DataRequestHandlerContext, IEsSearchRequest } from '@kbn/data-plugin/server';
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
import { streamFactory } from '@kbn/aiops-utils';
import {
addChangePoints,
aiopsExplainLogRateSpikesSchema,
addFieldsAction,
updateLoadingStateAction,
AiopsExplainLogRateSpikesApiAction,
} from '../../common/api/explain_log_rate_spikes';
import { API_ENDPOINT } from '../../common/api';
import type { ChangePoint } from '../../common/types';
import { fetchFieldCandidates } from './queries/fetch_field_candidates';
import { fetchChangePointPValues } from './queries/fetch_change_point_p_values';
// Overall progress is a float from 0 to 1.
const LOADED_FIELD_CANDIDATES = 0.2;
const PROGRESS_STEP_P_VALUES = 0.8;
export const defineExplainLogRateSpikesRoute = (
router: IRouter<DataRequestHandlerContext>,
@ -30,10 +39,11 @@ export const defineExplainLogRateSpikesRoute = (
},
},
async (context, request, response) => {
const index = request.body.index;
const client = (await context.core).elasticsearch.client.asCurrentUser;
const controller = new AbortController();
let loaded = 0;
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
@ -44,47 +54,83 @@ export const defineExplainLogRateSpikesRoute = (
controller.abort();
});
const search = await context.search;
const res = await firstValueFrom(
search.search(
{
params: {
index,
body: { size: 1 },
},
} as IEsSearchRequest,
{ abortSignal: controller.signal }
)
);
const doc = res.rawResponse.hits.hits.pop();
const fields = Object.keys(doc?._source ?? {});
const { end, push, responseWithHeaders } = streamFactory<AiopsExplainLogRateSpikesApiAction>(
request.headers
);
async function pushField() {
setTimeout(() => {
// Async IIFE to run the analysis while not blocking returning `responseWithHeaders`.
(async () => {
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: 'Loading field candidates.',
})
);
const { fieldCandidates } = await fetchFieldCandidates(client, request.body);
if (fieldCandidates.length > 0) {
loaded += LOADED_FIELD_CANDIDATES;
} else {
loaded = 1;
}
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: `Identified ${fieldCandidates.length} field candidates.`,
})
);
if (shouldStop || fieldCandidates.length === 0) {
end();
return;
}
const changePoints: ChangePoint[] = [];
const fieldsToSample = new Set<string>();
const chunkSize = 10;
const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize);
for (const fieldCandidatesChunk of fieldCandidatesChunks) {
const { changePoints: pValues } = await fetchChangePointPValues(
client,
request.body,
fieldCandidatesChunk
);
if (pValues.length > 0) {
pValues.forEach((d) => {
fieldsToSample.add(d.fieldName);
});
changePoints.push(...pValues);
}
loaded += (1 / fieldCandidatesChunks.length) * PROGRESS_STEP_P_VALUES;
if (pValues.length > 0) {
push(addChangePoints(pValues));
}
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: `Identified ${
changePoints?.length ?? 0
} significant field/value pairs.`,
})
);
if (shouldStop) {
end();
return;
}
}
const field = fields.pop();
if (field !== undefined) {
push(addFieldsAction([field]));
pushField();
} else {
end();
}
// This is just exemplary demo code so we're adding a random timout of 0-250ms to each
// stream push to simulate string chunks appearing on the client with some randomness.
}, Math.random() * 250);
}
pushField();
end();
})();
return response.ok(responseWithHeaders);
}

View file

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { uniqBy } from 'lodash';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ElasticsearchClient } from '@kbn/core/server';
import { SPIKE_ANALYSIS_THRESHOLD } from '../../../common/constants';
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
import { ChangePoint } from '../../../common/types';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
export const getChangePointRequest = (
params: AiopsExplainLogRateSpikesSchema,
fieldName: string
): estypes.SearchRequest => {
const query = getQueryWithParams({
params,
});
const timeFieldName = params.timeFieldName ?? '@timestamp';
let filter: estypes.QueryDslQueryContainer[] = [];
if (Array.isArray(query.bool.filter)) {
filter = query.bool.filter.filter((d) => Object.keys(d)[0] !== 'range');
query.bool.filter = [
...filter,
{
range: {
[timeFieldName]: {
gte: params.deviationMin,
lt: params.deviationMax,
format: 'epoch_millis',
},
},
},
];
}
const body = {
query,
size: 0,
aggs: {
change_point_p_value: {
significant_terms: {
field: fieldName,
background_filter: {
bool: {
filter: [
...filter,
{
range: {
[timeFieldName]: {
gte: params.baselineMin,
lt: params.baselineMax,
format: 'epoch_millis',
},
},
},
],
},
},
p_value: { background_is_superset: false },
size: 1000,
},
},
},
};
return {
...getRequestBase(params),
body,
};
};
interface Aggs extends estypes.AggregationsSignificantLongTermsAggregate {
doc_count: number;
bg_count: number;
buckets: estypes.AggregationsSignificantLongTermsBucket[];
}
export const fetchChangePointPValues = async (
esClient: ElasticsearchClient,
params: AiopsExplainLogRateSpikesSchema,
fieldNames: string[]
) => {
const result: ChangePoint[] = [];
for (const fieldName of fieldNames) {
const request = getChangePointRequest(params, fieldName);
const resp = await esClient.search<unknown, { change_point_p_value: Aggs }>(request);
if (resp.aggregations === undefined) {
throw new Error('fetchChangePoint failed, did not return aggregations.');
}
const overallResult = resp.aggregations.change_point_p_value;
for (const bucket of overallResult.buckets) {
const pValue = Math.exp(-bucket.score);
if (typeof pValue === 'number' && pValue < SPIKE_ANALYSIS_THRESHOLD) {
result.push({
fieldName,
fieldValue: String(bucket.key),
doc_count: bucket.doc_count,
bg_count: bucket.doc_count,
score: bucket.score,
pValue,
});
}
}
}
return {
changePoints: uniqBy(result, (d) => `${d.fieldName},${d.fieldValue}`),
};
};

View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core/server';
import { fetchFieldCandidates, getRandomDocsRequest } from './fetch_field_candidates';
const params = {
index: 'the-index',
timeFieldName: 'the-time-field-name',
start: 1577836800000,
end: 1609459200000,
baselineMin: 10,
baselineMax: 20,
deviationMin: 30,
deviationMax: 40,
includeFrozen: false,
kuery: '',
};
describe('query_field_candidates', () => {
describe('getRandomDocsRequest', () => {
it('returns the most basic request body for a sample of random documents', () => {
const req = getRandomDocsRequest(params);
expect(req).toEqual({
body: {
_source: false,
fields: ['*'],
query: {
function_score: {
query: {
bool: {
filter: [
{
range: {
'the-time-field-name': {
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
],
},
},
random_score: {},
},
},
size: 1000,
},
index: params.index,
ignore_throttled: undefined,
ignore_unavailable: true,
});
});
});
describe('fetchFieldCandidates', () => {
it('returns field candidates and total hits', async () => {
const esClientFieldCapsMock = jest.fn(() => ({
fields: {
myIpFieldName: { ip: {} },
myKeywordFieldName: { keyword: {} },
myUnpopulatedKeywordFieldName: { keyword: {} },
myNumericFieldName: { number: {} },
},
}));
const esClientSearchMock = jest.fn((req: estypes.SearchRequest): estypes.SearchResponse => {
return {
hits: {
hits: [
{
fields: {
myIpFieldName: '1.1.1.1',
myKeywordFieldName: 'myKeywordFieldValue',
myNumericFieldName: 1234,
},
},
],
},
} as unknown as estypes.SearchResponse;
});
const esClientMock = {
fieldCaps: esClientFieldCapsMock,
search: esClientSearchMock,
} as unknown as ElasticsearchClient;
const resp = await fetchFieldCandidates(esClientMock, params);
expect(resp).toEqual({
fieldCandidates: ['myIpFieldName', 'myKeywordFieldName'],
});
expect(esClientFieldCapsMock).toHaveBeenCalledTimes(1);
expect(esClientSearchMock).toHaveBeenCalledTimes(1);
});
});
});

View file

@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ES_FIELD_TYPES } from '@kbn/field-types';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
const POPULATED_DOC_COUNT_SAMPLE_SIZE = 1000;
const SUPPORTED_ES_FIELD_TYPES = [
ES_FIELD_TYPES.KEYWORD,
ES_FIELD_TYPES.IP,
ES_FIELD_TYPES.BOOLEAN,
];
export const getRandomDocsRequest = (
params: AiopsExplainLogRateSpikesSchema
): estypes.SearchRequest => ({
...getRequestBase(params),
body: {
fields: ['*'],
_source: false,
query: {
function_score: {
query: getQueryWithParams({ params }),
// @ts-ignore
random_score: {},
},
},
size: POPULATED_DOC_COUNT_SAMPLE_SIZE,
},
});
export const fetchFieldCandidates = async (
esClient: ElasticsearchClient,
params: AiopsExplainLogRateSpikesSchema
): Promise<{ fieldCandidates: string[] }> => {
const { index } = params;
// Get all supported fields
const respMapping = await esClient.fieldCaps({
index,
fields: '*',
});
const finalFieldCandidates: Set<string> = new Set([]);
const acceptableFields: Set<string> = new Set();
Object.entries(respMapping.fields).forEach(([key, value]) => {
const fieldTypes = Object.keys(value) as ES_FIELD_TYPES[];
const isSupportedType = fieldTypes.some((type) => SUPPORTED_ES_FIELD_TYPES.includes(type));
// Check if fieldName is something we can aggregate on
if (isSupportedType) {
acceptableFields.add(key);
}
});
const resp = await esClient.search(getRandomDocsRequest(params));
const sampledDocs = resp.hits.hits.map((d) => d.fields ?? {});
// Get all field names for each returned doc and flatten it
// to a list of unique field names used across all docs.
// and filter by list of acceptable fields and some APM specific unique fields.
[...new Set(sampledDocs.map(Object.keys).flat(1))].forEach((field) => {
if (acceptableFields.has(field)) {
finalFieldCandidates.add(field);
}
});
return {
fieldCandidates: [...finalFieldCandidates],
};
};

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getFilters } from './get_filters';
describe('getFilters', () => {
it('returns an empty array with no timeFieldName and kuery supplied', () => {
const filters = getFilters({
index: 'the-index',
timeFieldName: '',
kuery: '',
start: 1577836800000,
end: 1609459200000,
baselineMin: 10,
baselineMax: 20,
deviationMin: 30,
deviationMax: 40,
});
expect(filters).toEqual([]);
});
it('returns a range filter when timeFieldName is supplied', () => {
const filters = getFilters({
index: 'the-index',
timeFieldName: 'the-time-field-name',
kuery: '',
start: 1577836800000,
end: 1609459200000,
baselineMin: 10,
baselineMax: 20,
deviationMin: 30,
deviationMax: 40,
});
expect(filters).toEqual([
{
range: {
'the-time-field-name': {
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
]);
});
});

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ESFilter } from '@kbn/core/types/elasticsearch';
import { fromKueryExpression, toElasticsearchQuery } from '@kbn/es-query';
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
export function rangeQuery(
start?: number,
end?: number,
field = '@timestamp'
): estypes.QueryDslQueryContainer[] {
return [
{
range: {
[field]: {
gte: start,
lte: end,
format: 'epoch_millis',
},
},
},
];
}
export function kqlQuery(kql: string): estypes.QueryDslQueryContainer[] {
if (!kql) {
return [];
}
const ast = fromKueryExpression(kql);
return [toElasticsearchQuery(ast)];
}
export function getFilters({
kuery,
start,
end,
timeFieldName,
}: AiopsExplainLogRateSpikesSchema): ESFilter[] {
const filters: ESFilter[] = [];
if (timeFieldName !== '') {
filters.push(...rangeQuery(start, end, timeFieldName));
}
if (kuery !== '') {
filters.push(...kqlQuery(kuery));
}
return filters;
}

View file

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getQueryWithParams } from './get_query_with_params';
describe('getQueryWithParams', () => {
it('returns the most basic query filtering', () => {
const query = getQueryWithParams({
params: {
index: 'the-index',
timeFieldName: 'the-time-field-name',
start: 1577836800000,
end: 1609459200000,
baselineMin: 10,
baselineMax: 20,
deviationMin: 30,
deviationMax: 40,
includeFrozen: false,
kuery: '',
},
});
expect(query).toEqual({
bool: {
filter: [
{
range: {
'the-time-field-name': {
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
],
},
});
});
it('returns a query considering a custom field/value pair', () => {
const query = getQueryWithParams({
params: {
index: 'the-index',
timeFieldName: 'the-time-field-name',
start: 1577836800000,
end: 1609459200000,
baselineMin: 10,
baselineMax: 20,
deviationMin: 30,
deviationMax: 40,
includeFrozen: false,
kuery: '',
},
termFilters: [
{
fieldName: 'actualFieldName',
fieldValue: 'actualFieldValue',
},
],
});
expect(query).toEqual({
bool: {
filter: [
{
range: {
'the-time-field-name': {
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
{
term: {
actualFieldName: 'actualFieldValue',
},
},
],
},
});
});
});

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { FieldValuePair } from '../../../common/types';
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
import { getFilters } from './get_filters';
export const getTermsQuery = ({ fieldName, fieldValue }: FieldValuePair) => {
return { term: { [fieldName]: fieldValue } };
};
interface QueryParams {
params: AiopsExplainLogRateSpikesSchema;
termFilters?: FieldValuePair[];
}
export const getQueryWithParams = ({ params, termFilters }: QueryParams) => {
return {
bool: {
filter: [
...getFilters(params),
...(Array.isArray(termFilters) ? termFilters.map(getTermsQuery) : []),
] as estypes.QueryDslQueryContainer[],
},
};
};

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getRequestBase } from './get_request_base';
describe('getRequestBase', () => {
it('defaults to not setting `ignore_throttled`', () => {
const requestBase = getRequestBase({
index: 'the-index',
timeFieldName: 'the-time-field-name',
kuery: '',
start: 1577836800000,
end: 1609459200000,
baselineMin: 10,
baselineMax: 20,
deviationMin: 30,
deviationMax: 40,
});
expect(requestBase.ignore_throttled).toEqual(undefined);
});
it('adds `ignore_throttled=false` when `includeFrozen=true`', () => {
const requestBase = getRequestBase({
index: 'the-index',
timeFieldName: 'the-time-field-name',
includeFrozen: true,
kuery: '',
start: 1577836800000,
end: 1609459200000,
baselineMin: 10,
baselineMax: 20,
deviationMin: 30,
deviationMax: 40,
});
expect(requestBase.ignore_throttled).toEqual(false);
});
});

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
export const getRequestBase = ({ index, includeFrozen }: AiopsExplainLogRateSpikesSchema) => ({
index,
...(includeFrozen ? { ignore_throttled: false } : {}),
ignore_unavailable: true,
});

View file

@ -5,13 +5,18 @@
* 2.0.
*/
import React, { FC } from 'react';
import React, { useEffect, useState, FC } from 'react';
import { FormattedMessage } from '@kbn/i18n-react';
import { ExplainLogRateSpikes } from '@kbn/aiops-plugin/public';
import { getWindowParameters } from '@kbn/aiops-utils';
import type { WindowParameters } from '@kbn/aiops-utils';
import { KBN_FIELD_TYPES } from '@kbn/data-plugin/public';
import { useMlContext } from '../contexts/ml';
import { useMlKibana } from '../contexts/kibana';
import { HelpMenu } from '../components/help_menu';
import { ml } from '../services/ml_api_service';
import { MlPageHeader } from '../components/page_header';
@ -21,6 +26,37 @@ export const ExplainLogRateSpikesPage: FC = () => {
} = useMlKibana();
const context = useMlContext();
const dataView = context.currentDataView;
const [windowParameters, setWindowParameters] = useState<WindowParameters | undefined>();
useEffect(() => {
async function fetchWindowParameters() {
if (dataView.timeFieldName) {
const stats: Array<{
data: Array<{ doc_count: number; key: number }>;
stats: [number, number];
}> = await ml.getVisualizerFieldHistograms({
indexPattern: dataView.title,
fields: [{ fieldName: dataView.timeFieldName, type: KBN_FIELD_TYPES.DATE }],
query: { match_all: {} },
samplerShardSize: -1,
});
const peak = stats[0].data.reduce((p, c) => (c.doc_count >= p.doc_count ? c : p), {
doc_count: 0,
key: 0,
});
const peakTimestamp = Math.round(peak.key);
setWindowParameters(
getWindowParameters(peakTimestamp, stats[0].stats[0], stats[0].stats[1])
);
}
}
fetchWindowParameters();
}, []);
return (
<>
@ -30,7 +66,9 @@ export const ExplainLogRateSpikesPage: FC = () => {
defaultMessage="Explain log rate spikes"
/>
</MlPageHeader>
<ExplainLogRateSpikes dataView={context.currentDataView} />
{dataView.timeFieldName && windowParameters && (
<ExplainLogRateSpikes dataView={dataView} windowParameters={windowParameters} />
)}
<HelpMenu docLink={docLinks.links.ml.guide} />
</>
);

View file

@ -10,6 +10,8 @@ import { format as formatUrl } from 'url';
import expect from '@kbn/expect';
import type { ApiExplainLogRateSpikes } from '@kbn/aiops-plugin/common/api';
import { FtrProviderContext } from '../../ftr_provider_context';
import { parseStream } from './parse_stream';
@ -19,31 +21,43 @@ export default ({ getService }: FtrProviderContext) => {
const config = getService('config');
const kibanaServerUrl = formatUrl(config.get('servers.kibana'));
const expectedFields = [
'category',
'currency',
'customer_first_name',
'customer_full_name',
'customer_gender',
'customer_id',
'customer_last_name',
'customer_phone',
'day_of_week',
'day_of_week_i',
'email',
'geoip',
'manufacturer',
'order_date',
'order_id',
'products',
'sku',
'taxful_total_price',
'taxless_total_price',
'total_quantity',
'total_unique_products',
'type',
'user',
];
const requestBody: ApiExplainLogRateSpikes['body'] = {
baselineMax: 1561719083292,
baselineMin: 1560954147006,
deviationMax: 1562254538692,
deviationMin: 1561986810992,
end: 2147483647000,
index: 'ft_ecommerce',
kuery: '',
start: 0,
timeFieldName: 'order_date',
};
const expected = {
chunksLength: 7,
actionsLength: 6,
actionFilter: 'add_change_points',
changePoints: [
{
fieldName: 'day_of_week',
fieldValue: 'Wednesday',
doc_count: 145,
bg_count: 145,
score: 36.31595998561873,
pValue: 1.6911377077437753e-16,
normalizedScore: 0.8055203624020835,
},
{
fieldName: 'day_of_week',
fieldValue: 'Thursday',
doc_count: 157,
bg_count: 157,
score: 20.366950718358762,
pValue: 1.428057484826135e-9,
normalizedScore: 0.7661649691018979,
},
],
};
describe('POST /internal/aiops/explain_log_rate_spikes', () => {
const esArchiver = getService('esArchiver');
@ -60,16 +74,14 @@ export default ({ getService }: FtrProviderContext) => {
const resp = await supertest
.post(`/internal/aiops/explain_log_rate_spikes`)
.set('kbn-xsrf', 'kibana')
.send({
index: 'ft_ecommerce',
})
.send(requestBody)
.expect(200);
expect(Buffer.isBuffer(resp.body)).to.be(true);
const chunks: string[] = resp.body.toString().split('\n');
expect(chunks.length).to.be(24);
expect(chunks.length).to.be(expected.chunksLength);
const lastChunk = chunks.pop();
expect(lastChunk).to.be('');
@ -80,15 +92,30 @@ export default ({ getService }: FtrProviderContext) => {
data = chunks.map((c) => JSON.parse(c));
}).not.to.throwError();
expect(data.length).to.be(expected.actionsLength);
data.forEach((d) => {
expect(typeof d.type).to.be('string');
});
const fields = data.map((d) => d.payload[0]).sort();
const addChangePointsActions = data.filter((d) => d.type === expected.actionFilter);
expect(addChangePointsActions.length).to.greaterThan(0);
expect(fields.length).to.equal(expectedFields.length);
fields.forEach((f) => {
expect(expectedFields.includes(f));
const changePoints = addChangePointsActions
.flatMap((d) => d.payload)
.sort(function (a, b) {
if (a.fieldName === b.fieldName) {
return b.fieldValue - a.fieldValue;
}
return a.fieldName > b.fieldName ? 1 : -1;
});
expect(changePoints.length).to.equal(expected.changePoints.length);
changePoints.forEach((cp, index) => {
const ecp = expected.changePoints[index];
expect(cp.fieldName).to.equal(ecp.fieldName);
expect(cp.fieldValue).to.equal(ecp.fieldValue);
expect(cp.doc_count).to.equal(ecp.doc_count);
expect(cp.bg_count).to.equal(ecp.bg_count);
});
});
@ -99,7 +126,7 @@ export default ({ getService }: FtrProviderContext) => {
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
body: JSON.stringify({ index: 'ft_ecommerce' }),
body: JSON.stringify(requestBody),
});
const stream = response.body;
@ -114,11 +141,26 @@ export default ({ getService }: FtrProviderContext) => {
data.push(action);
}
const fields = data.map((d) => d.payload[0]).sort();
expect(data.length).to.be(expected.actionsLength);
const addChangePointsActions = data.filter((d) => d.type === expected.actionFilter);
expect(addChangePointsActions.length).to.greaterThan(0);
expect(fields.length).to.equal(expectedFields.length);
fields.forEach((f) => {
expect(expectedFields.includes(f));
const changePoints = addChangePointsActions
.flatMap((d) => d.payload)
.sort(function (a, b) {
if (a.fieldName === b.fieldName) {
return b.fieldValue - a.fieldValue;
}
return a.fieldName > b.fieldName ? 1 : -1;
});
expect(changePoints.length).to.equal(expected.changePoints.length);
changePoints.forEach((cp, index) => {
const ecp = expected.changePoints[index];
expect(cp.fieldName).to.equal(ecp.fieldName);
expect(cp.fieldValue).to.equal(ecp.fieldValue);
expect(cp.doc_count).to.equal(ecp.doc_count);
expect(cp.bg_count).to.equal(ecp.bg_count);
});
}
});