mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
- Fixes error handling that before was not providing enough information for debugging purposes and support. This will now output more fine grained error information to the Kibana server log. The analysis is now more resilient to errors for individual queries. For example, we don't stop the analysis anymore if individual queries for p-values or histograms fail.
- Moves the error callout above all other possible elements like empty prompts when the analysis doesn't return results.
(cherry picked from commit 4753d7c170
)
Co-authored-by: Walter Rafelsberger <walter.rafelsberger@elastic.co>
This commit is contained in:
parent
9c10e55f77
commit
3140395f7d
7 changed files with 403 additions and 257 deletions
|
@ -18,6 +18,7 @@ export const API_ACTION_NAME = {
|
|||
ADD_CHANGE_POINTS_GROUP: 'add_change_point_group',
|
||||
ADD_CHANGE_POINTS_GROUP_HISTOGRAM: 'add_change_point_group_histogram',
|
||||
ADD_ERROR: 'add_error',
|
||||
PING: 'ping',
|
||||
RESET: 'reset',
|
||||
UPDATE_LOADING_STATE: 'update_loading_state',
|
||||
} as const;
|
||||
|
@ -89,6 +90,14 @@ export function addErrorAction(payload: ApiActionAddError['payload']): ApiAction
|
|||
};
|
||||
}
|
||||
|
||||
interface ApiActionPing {
|
||||
type: typeof API_ACTION_NAME.PING;
|
||||
}
|
||||
|
||||
export function pingAction(): ApiActionPing {
|
||||
return { type: API_ACTION_NAME.PING };
|
||||
}
|
||||
|
||||
interface ApiActionReset {
|
||||
type: typeof API_ACTION_NAME.RESET;
|
||||
}
|
||||
|
@ -121,5 +130,6 @@ export type AiopsExplainLogRateSpikesApiAction =
|
|||
| ApiActionAddChangePointsHistogram
|
||||
| ApiActionAddChangePointsGroupHistogram
|
||||
| ApiActionAddError
|
||||
| ApiActionPing
|
||||
| ApiActionReset
|
||||
| ApiActionUpdateLoadingState;
|
||||
|
|
|
@ -11,6 +11,7 @@ export {
|
|||
addChangePointsGroupHistogramAction,
|
||||
addChangePointsHistogramAction,
|
||||
addErrorAction,
|
||||
pingAction,
|
||||
resetAction,
|
||||
updateLoadingStateAction,
|
||||
API_ACTION_NAME,
|
||||
|
|
|
@ -172,6 +172,33 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
|
|||
onCancel={cancel}
|
||||
shouldRerunAnalysis={shouldRerunAnalysis}
|
||||
/>
|
||||
{errors.length > 0 ? (
|
||||
<>
|
||||
<EuiCallOut
|
||||
title={i18n.translate('xpack.aiops.analysis.errorCallOutTitle', {
|
||||
defaultMessage:
|
||||
'The following {errorCount, plural, one {error} other {errors}} occurred running the analysis.',
|
||||
values: { errorCount: errors.length },
|
||||
})}
|
||||
color="warning"
|
||||
iconType="alert"
|
||||
size="s"
|
||||
>
|
||||
<EuiText size="s">
|
||||
{errors.length === 1 ? (
|
||||
<p>{errors[0]}</p>
|
||||
) : (
|
||||
<ul>
|
||||
{errors.map((e, i) => (
|
||||
<li key={i}>{e}</li>
|
||||
))}
|
||||
</ul>
|
||||
)}
|
||||
</EuiText>
|
||||
</EuiCallOut>
|
||||
<EuiSpacer size="xs" />
|
||||
</>
|
||||
) : null}
|
||||
{showSpikeAnalysisTable && foundGroups && (
|
||||
<EuiFormRow display="columnCompressedSwitch" label={groupResultsMessage}>
|
||||
<EuiSwitch
|
||||
|
@ -207,33 +234,6 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
|
|||
}
|
||||
/>
|
||||
)}
|
||||
{errors.length > 0 && (
|
||||
<>
|
||||
<EuiCallOut
|
||||
title={i18n.translate('xpack.aiops.analysis.errorCallOutTitle', {
|
||||
defaultMessage:
|
||||
'The following {errorCount, plural, one {error} other {errors}} occurred running the analysis.',
|
||||
values: { errorCount: errors.length },
|
||||
})}
|
||||
color="warning"
|
||||
iconType="alert"
|
||||
size="s"
|
||||
>
|
||||
<EuiText size="s">
|
||||
{errors.length === 1 ? (
|
||||
<p>{errors[0]}</p>
|
||||
) : (
|
||||
<ul>
|
||||
{errors.map((e, i) => (
|
||||
<li key={i}>{e}</li>
|
||||
))}
|
||||
</ul>
|
||||
)}
|
||||
</EuiText>
|
||||
</EuiCallOut>
|
||||
<EuiSpacer size="xs" />
|
||||
</>
|
||||
)}
|
||||
{showSpikeAnalysisTable && groupResults && foundGroups ? (
|
||||
<SpikeAnalysisGroupsTable
|
||||
changePoints={data.changePoints}
|
||||
|
|
|
@ -27,6 +27,7 @@ import {
|
|||
addChangePointsHistogramAction,
|
||||
aiopsExplainLogRateSpikesSchema,
|
||||
addErrorAction,
|
||||
pingAction,
|
||||
resetAction,
|
||||
updateLoadingStateAction,
|
||||
AiopsExplainLogRateSpikesApiAction,
|
||||
|
@ -74,6 +75,15 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
return response.forbidden();
|
||||
}
|
||||
|
||||
let logMessageCounter = 1;
|
||||
|
||||
function logInfoMessage(msg: string) {
|
||||
logger.info(`Explain Log Rate Spikes #${logMessageCounter}: ${msg}`);
|
||||
logMessageCounter++;
|
||||
}
|
||||
|
||||
logInfoMessage('Starting analysis.');
|
||||
|
||||
const groupingEnabled = !!request.body.grouping;
|
||||
|
||||
const client = (await context.core).elasticsearch.client.asCurrentUser;
|
||||
|
@ -83,19 +93,33 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
let loaded = 0;
|
||||
let shouldStop = false;
|
||||
request.events.aborted$.subscribe(() => {
|
||||
logInfoMessage('aborted$ subscription trigger.');
|
||||
shouldStop = true;
|
||||
controller.abort();
|
||||
});
|
||||
request.events.completed$.subscribe(() => {
|
||||
logInfoMessage('completed$ subscription trigger.');
|
||||
shouldStop = true;
|
||||
controller.abort();
|
||||
});
|
||||
|
||||
const { end, push, responseWithHeaders } = streamFactory<AiopsExplainLogRateSpikesApiAction>(
|
||||
request.headers,
|
||||
logger,
|
||||
true
|
||||
);
|
||||
const {
|
||||
end: streamEnd,
|
||||
push,
|
||||
responseWithHeaders,
|
||||
} = streamFactory<AiopsExplainLogRateSpikesApiAction>(request.headers, logger, true);
|
||||
|
||||
function pushPing() {
|
||||
push(pingAction());
|
||||
}
|
||||
|
||||
const pingInterval = setInterval(pushPing, 1000);
|
||||
|
||||
function end() {
|
||||
logInfoMessage('Ending analysis.');
|
||||
clearInterval(pingInterval);
|
||||
streamEnd();
|
||||
}
|
||||
|
||||
function endWithUpdatedLoadingState() {
|
||||
push(
|
||||
|
@ -114,9 +138,16 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
end();
|
||||
}
|
||||
|
||||
function pushError(m: string) {
|
||||
logInfoMessage('Push error.');
|
||||
push(addErrorAction(m));
|
||||
}
|
||||
|
||||
// Async IIFE to run the analysis while not blocking returning `responseWithHeaders`.
|
||||
(async () => {
|
||||
logInfoMessage('Reset.');
|
||||
push(resetAction());
|
||||
logInfoMessage('Load field candidates.');
|
||||
push(
|
||||
updateLoadingStateAction({
|
||||
ccsWarning: false,
|
||||
|
@ -134,7 +165,8 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
try {
|
||||
fieldCandidates = await fetchFieldCandidates(client, request.body);
|
||||
} catch (e) {
|
||||
push(addErrorAction(e.toString()));
|
||||
logger.error(`Failed to fetch field candidates, got: \n${e.toString()}`);
|
||||
pushError(`Failed to fetch field candidates.`);
|
||||
end();
|
||||
return;
|
||||
}
|
||||
|
@ -168,17 +200,33 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
const changePoints: ChangePoint[] = [];
|
||||
const fieldsToSample = new Set<string>();
|
||||
const chunkSize = 10;
|
||||
let chunkCount = 0;
|
||||
|
||||
const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize);
|
||||
|
||||
logInfoMessage('Fetch p-values.');
|
||||
|
||||
for (const fieldCandidatesChunk of fieldCandidatesChunks) {
|
||||
chunkCount++;
|
||||
logInfoMessage(`Fetch p-values. Chunk ${chunkCount} of ${fieldCandidatesChunks.length}`);
|
||||
let pValues: Awaited<ReturnType<typeof fetchChangePointPValues>>;
|
||||
try {
|
||||
pValues = await fetchChangePointPValues(client, request.body, fieldCandidatesChunk);
|
||||
pValues = await fetchChangePointPValues(
|
||||
client,
|
||||
request.body,
|
||||
fieldCandidatesChunk,
|
||||
logger,
|
||||
pushError
|
||||
);
|
||||
} catch (e) {
|
||||
push(addErrorAction(e.toString()));
|
||||
end();
|
||||
return;
|
||||
logger.error(
|
||||
`Failed to fetch p-values for ${JSON.stringify(
|
||||
fieldCandidatesChunk
|
||||
)}, got: \n${e.toString()}`
|
||||
);
|
||||
pushError(`Failed to fetch p-values for ${JSON.stringify(fieldCandidatesChunk)}.`);
|
||||
// Still continue the analysis even if chunks of p-value queries fail.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pValues.length > 0) {
|
||||
|
@ -210,12 +258,15 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
);
|
||||
|
||||
if (shouldStop) {
|
||||
logInfoMessage('shouldStop fetching p-values.');
|
||||
|
||||
end();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (changePoints?.length === 0) {
|
||||
logInfoMessage('Stopping analysis, did not find change points.');
|
||||
endWithUpdatedLoadingState();
|
||||
return;
|
||||
}
|
||||
|
@ -224,16 +275,27 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
{ fieldName: request.body.timeFieldName, type: KBN_FIELD_TYPES.DATE },
|
||||
];
|
||||
|
||||
const [overallTimeSeries] = (await fetchHistogramsForFields(
|
||||
client,
|
||||
request.body.index,
|
||||
{ match_all: {} },
|
||||
// fields
|
||||
histogramFields,
|
||||
// samplerShardSize
|
||||
-1,
|
||||
undefined
|
||||
)) as [NumericChartData];
|
||||
logInfoMessage('Fetch overall histogram.');
|
||||
|
||||
let overallTimeSeries: NumericChartData | undefined;
|
||||
try {
|
||||
overallTimeSeries = (
|
||||
(await fetchHistogramsForFields(
|
||||
client,
|
||||
request.body.index,
|
||||
{ match_all: {} },
|
||||
// fields
|
||||
histogramFields,
|
||||
// samplerShardSize
|
||||
-1,
|
||||
undefined
|
||||
)) as [NumericChartData]
|
||||
)[0];
|
||||
} catch (e) {
|
||||
logger.error(`Failed to fetch the overall histogram data, got: \n${e.toString()}`);
|
||||
pushError(`Failed to fetch overall histogram data.`);
|
||||
// Still continue the analysis even if loading the overall histogram fails.
|
||||
}
|
||||
|
||||
function pushHistogramDataLoadingState() {
|
||||
push(
|
||||
|
@ -251,6 +313,8 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
}
|
||||
|
||||
if (groupingEnabled) {
|
||||
logInfoMessage('Group results.');
|
||||
|
||||
push(
|
||||
updateLoadingStateAction({
|
||||
ccsWarning: false,
|
||||
|
@ -283,208 +347,242 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
(g) => g.group.length > 1
|
||||
);
|
||||
|
||||
const { fields, df } = await fetchFrequentItems(
|
||||
client,
|
||||
request.body.index,
|
||||
JSON.parse(request.body.searchQuery) as estypes.QueryDslQueryContainer,
|
||||
deduplicatedChangePoints,
|
||||
request.body.timeFieldName,
|
||||
request.body.deviationMin,
|
||||
request.body.deviationMax
|
||||
);
|
||||
try {
|
||||
const { fields, df } = await fetchFrequentItems(
|
||||
client,
|
||||
request.body.index,
|
||||
JSON.parse(request.body.searchQuery) as estypes.QueryDslQueryContainer,
|
||||
deduplicatedChangePoints,
|
||||
request.body.timeFieldName,
|
||||
request.body.deviationMin,
|
||||
request.body.deviationMax,
|
||||
logger,
|
||||
pushError
|
||||
);
|
||||
|
||||
// The way the `frequent_items` aggregations works could return item sets that include
|
||||
// field/value pairs that are not part of the original list of significant change points.
|
||||
// This cleans up groups and removes those unrelated field/value pairs.
|
||||
const filteredDf = df
|
||||
.map((fi) => {
|
||||
fi.set = Object.entries(fi.set).reduce<ItemsetResult['set']>(
|
||||
(set, [field, value]) => {
|
||||
if (
|
||||
changePoints.some((cp) => cp.fieldName === field && cp.fieldValue === value)
|
||||
) {
|
||||
set[field] = value;
|
||||
if (fields.length > 0 && df.length > 0) {
|
||||
// The way the `frequent_items` aggregations works could return item sets that include
|
||||
// field/value pairs that are not part of the original list of significant change points.
|
||||
// This cleans up groups and removes those unrelated field/value pairs.
|
||||
const filteredDf = df
|
||||
.map((fi) => {
|
||||
fi.set = Object.entries(fi.set).reduce<ItemsetResult['set']>(
|
||||
(set, [field, value]) => {
|
||||
if (
|
||||
changePoints.some((cp) => cp.fieldName === field && cp.fieldValue === value)
|
||||
) {
|
||||
set[field] = value;
|
||||
}
|
||||
return set;
|
||||
},
|
||||
{}
|
||||
);
|
||||
fi.size = Object.keys(fi.set).length;
|
||||
return fi;
|
||||
})
|
||||
.filter((fi) => fi.size > 1);
|
||||
|
||||
// `frequent_items` returns lot of different small groups of field/value pairs that co-occur.
|
||||
// The following steps analyse these small groups, identify overlap between these groups,
|
||||
// and then summarize them in larger groups where possible.
|
||||
|
||||
// Get a tree structure based on `frequent_items`.
|
||||
const { root } = getSimpleHierarchicalTree(filteredDf, true, false, fields);
|
||||
|
||||
// Each leave of the tree will be a summarized group of co-occuring field/value pairs.
|
||||
const treeLeaves = getSimpleHierarchicalTreeLeaves(root, []);
|
||||
|
||||
// To be able to display a more cleaned up results table in the UI, we identify field/value pairs
|
||||
// that occur in multiple groups. This will allow us to highlight field/value pairs that are
|
||||
// unique to a group in a better way. This step will also re-add duplicates we identified in the
|
||||
// beginning and didn't pass on to the `frequent_items` agg.
|
||||
const fieldValuePairCounts = getFieldValuePairCounts(treeLeaves);
|
||||
const changePointGroups = markDuplicates(treeLeaves, fieldValuePairCounts).map(
|
||||
(g) => {
|
||||
const group = [...g.group];
|
||||
|
||||
for (const groupItem of g.group) {
|
||||
const { duplicate } = groupItem;
|
||||
const duplicates = groupedChangePoints.find((d) =>
|
||||
d.group.some(
|
||||
(dg) =>
|
||||
dg.fieldName === groupItem.fieldName &&
|
||||
dg.fieldValue === groupItem.fieldValue
|
||||
)
|
||||
);
|
||||
|
||||
if (duplicates !== undefined) {
|
||||
group.push(
|
||||
...duplicates.group.map((d) => {
|
||||
return {
|
||||
fieldName: d.fieldName,
|
||||
fieldValue: d.fieldValue,
|
||||
duplicate,
|
||||
};
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
return set;
|
||||
},
|
||||
{}
|
||||
|
||||
return {
|
||||
...g,
|
||||
group,
|
||||
};
|
||||
}
|
||||
);
|
||||
fi.size = Object.keys(fi.set).length;
|
||||
return fi;
|
||||
})
|
||||
.filter((fi) => fi.size > 1);
|
||||
|
||||
// `frequent_items` returns lot of different small groups of field/value pairs that co-occur.
|
||||
// The following steps analyse these small groups, identify overlap between these groups,
|
||||
// and then summarize them in larger groups where possible.
|
||||
// Some field/value pairs might not be part of the `frequent_items` result set, for example
|
||||
// because they don't co-occur with other field/value pairs or because of the limits we set on the query.
|
||||
// In this next part we identify those missing pairs and add them as individual groups.
|
||||
const missingChangePoints = deduplicatedChangePoints.filter((cp) => {
|
||||
return !changePointGroups.some((cpg) => {
|
||||
return cpg.group.some(
|
||||
(d) => d.fieldName === cp.fieldName && d.fieldValue === cp.fieldValue
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
// Get a tree structure based on `frequent_items`.
|
||||
const { root } = getSimpleHierarchicalTree(filteredDf, true, false, fields);
|
||||
|
||||
// Each leave of the tree will be a summarized group of co-occuring field/value pairs.
|
||||
const treeLeaves = getSimpleHierarchicalTreeLeaves(root, []);
|
||||
|
||||
// To be able to display a more cleaned up results table in the UI, we identify field/value pairs
|
||||
// that occur in multiple groups. This will allow us to highlight field/value pairs that are
|
||||
// unique to a group in a better way. This step will also re-add duplicates we identified in the
|
||||
// beginning and didn't pass on to the `frequent_items` agg.
|
||||
const fieldValuePairCounts = getFieldValuePairCounts(treeLeaves);
|
||||
const changePointGroups = markDuplicates(treeLeaves, fieldValuePairCounts).map((g) => {
|
||||
const group = [...g.group];
|
||||
|
||||
for (const groupItem of g.group) {
|
||||
const { duplicate } = groupItem;
|
||||
const duplicates = groupedChangePoints.find((d) =>
|
||||
d.group.some(
|
||||
(dg) =>
|
||||
dg.fieldName === groupItem.fieldName && dg.fieldValue === groupItem.fieldValue
|
||||
changePointGroups.push(
|
||||
...missingChangePoints.map(
|
||||
({ fieldName, fieldValue, doc_count: docCount, pValue }) => {
|
||||
const duplicates = groupedChangePoints.find((d) =>
|
||||
d.group.some(
|
||||
(dg) => dg.fieldName === fieldName && dg.fieldValue === fieldValue
|
||||
)
|
||||
);
|
||||
if (duplicates !== undefined) {
|
||||
return {
|
||||
id: `${stringHash(
|
||||
JSON.stringify(
|
||||
duplicates.group.map((d) => ({
|
||||
fieldName: d.fieldName,
|
||||
fieldValue: d.fieldValue,
|
||||
}))
|
||||
)
|
||||
)}`,
|
||||
group: duplicates.group.map((d) => ({
|
||||
fieldName: d.fieldName,
|
||||
fieldValue: d.fieldValue,
|
||||
duplicate: false,
|
||||
})),
|
||||
docCount,
|
||||
pValue,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
id: `${stringHash(JSON.stringify({ fieldName, fieldValue }))}`,
|
||||
group: [
|
||||
{
|
||||
fieldName,
|
||||
fieldValue,
|
||||
duplicate: false,
|
||||
},
|
||||
],
|
||||
docCount,
|
||||
pValue,
|
||||
};
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
if (duplicates !== undefined) {
|
||||
group.push(
|
||||
...duplicates.group.map((d) => {
|
||||
return {
|
||||
fieldName: d.fieldName,
|
||||
fieldValue: d.fieldValue,
|
||||
duplicate,
|
||||
};
|
||||
})
|
||||
);
|
||||
// Finally, we'll find out if there's at least one group with at least two items,
|
||||
// only then will we return the groups to the clients and make the grouping option available.
|
||||
const maxItems = Math.max(...changePointGroups.map((g) => g.group.length));
|
||||
|
||||
if (maxItems > 1) {
|
||||
push(addChangePointsGroupAction(changePointGroups));
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
...g,
|
||||
group,
|
||||
};
|
||||
});
|
||||
loaded += PROGRESS_STEP_GROUPING;
|
||||
|
||||
// Some field/value pairs might not be part of the `frequent_items` result set, for example
|
||||
// because they don't co-occur with other field/value pairs or because of the limits we set on the query.
|
||||
// In this next part we identify those missing pairs and add them as individual groups.
|
||||
const missingChangePoints = deduplicatedChangePoints.filter((cp) => {
|
||||
return !changePointGroups.some((cpg) => {
|
||||
return cpg.group.some(
|
||||
(d) => d.fieldName === cp.fieldName && d.fieldValue === cp.fieldValue
|
||||
);
|
||||
});
|
||||
});
|
||||
pushHistogramDataLoadingState();
|
||||
|
||||
changePointGroups.push(
|
||||
...missingChangePoints.map(({ fieldName, fieldValue, doc_count: docCount, pValue }) => {
|
||||
const duplicates = groupedChangePoints.find((d) =>
|
||||
d.group.some((dg) => dg.fieldName === fieldName && dg.fieldValue === fieldValue)
|
||||
);
|
||||
if (duplicates !== undefined) {
|
||||
return {
|
||||
id: `${stringHash(
|
||||
JSON.stringify(
|
||||
duplicates.group.map((d) => ({
|
||||
fieldName: d.fieldName,
|
||||
fieldValue: d.fieldValue,
|
||||
}))
|
||||
)
|
||||
)}`,
|
||||
group: duplicates.group.map((d) => ({
|
||||
fieldName: d.fieldName,
|
||||
fieldValue: d.fieldValue,
|
||||
duplicate: false,
|
||||
})),
|
||||
docCount,
|
||||
pValue,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
id: `${stringHash(JSON.stringify({ fieldName, fieldValue }))}`,
|
||||
group: [
|
||||
{
|
||||
fieldName,
|
||||
fieldValue,
|
||||
duplicate: false,
|
||||
logInfoMessage('Fetch group histograms.');
|
||||
|
||||
await asyncForEach(changePointGroups, async (cpg) => {
|
||||
if (overallTimeSeries !== undefined) {
|
||||
const histogramQuery = {
|
||||
bool: {
|
||||
filter: cpg.group.map((d) => ({
|
||||
term: { [d.fieldName]: d.fieldValue },
|
||||
})),
|
||||
},
|
||||
],
|
||||
docCount,
|
||||
pValue,
|
||||
};
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
// Finally, we'll find out if there's at least one group with at least two items,
|
||||
// only then will we return the groups to the clients and make the grouping option available.
|
||||
const maxItems = Math.max(...changePointGroups.map((g) => g.group.length));
|
||||
|
||||
if (maxItems > 1) {
|
||||
push(addChangePointsGroupAction(changePointGroups));
|
||||
}
|
||||
|
||||
loaded += PROGRESS_STEP_GROUPING;
|
||||
|
||||
pushHistogramDataLoadingState();
|
||||
|
||||
if (changePointGroups) {
|
||||
await asyncForEach(changePointGroups, async (cpg, index) => {
|
||||
const histogramQuery = {
|
||||
bool: {
|
||||
filter: cpg.group.map((d) => ({
|
||||
term: { [d.fieldName]: d.fieldValue },
|
||||
})),
|
||||
},
|
||||
};
|
||||
|
||||
const [cpgTimeSeries] = (await fetchHistogramsForFields(
|
||||
client,
|
||||
request.body.index,
|
||||
histogramQuery,
|
||||
// fields
|
||||
[
|
||||
{
|
||||
fieldName: request.body.timeFieldName,
|
||||
type: KBN_FIELD_TYPES.DATE,
|
||||
interval: overallTimeSeries.interval,
|
||||
min: overallTimeSeries.stats[0],
|
||||
max: overallTimeSeries.stats[1],
|
||||
},
|
||||
],
|
||||
// samplerShardSize
|
||||
-1,
|
||||
undefined
|
||||
)) as [NumericChartData];
|
||||
|
||||
const histogram =
|
||||
overallTimeSeries.data.map((o, i) => {
|
||||
const current = cpgTimeSeries.data.find(
|
||||
(d1) => d1.key_as_string === o.key_as_string
|
||||
) ?? {
|
||||
doc_count: 0,
|
||||
};
|
||||
return {
|
||||
key: o.key,
|
||||
key_as_string: o.key_as_string ?? '',
|
||||
doc_count_change_point: current.doc_count,
|
||||
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
|
||||
};
|
||||
}) ?? [];
|
||||
|
||||
push(
|
||||
addChangePointsGroupHistogramAction([
|
||||
{
|
||||
id: cpg.id,
|
||||
histogram,
|
||||
},
|
||||
])
|
||||
);
|
||||
});
|
||||
let cpgTimeSeries: NumericChartData;
|
||||
try {
|
||||
cpgTimeSeries = (
|
||||
(await fetchHistogramsForFields(
|
||||
client,
|
||||
request.body.index,
|
||||
histogramQuery,
|
||||
// fields
|
||||
[
|
||||
{
|
||||
fieldName: request.body.timeFieldName,
|
||||
type: KBN_FIELD_TYPES.DATE,
|
||||
interval: overallTimeSeries.interval,
|
||||
min: overallTimeSeries.stats[0],
|
||||
max: overallTimeSeries.stats[1],
|
||||
},
|
||||
],
|
||||
// samplerShardSize
|
||||
-1,
|
||||
undefined
|
||||
)) as [NumericChartData]
|
||||
)[0];
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
`Failed to fetch the histogram data for group #${
|
||||
cpg.id
|
||||
}, got: \n${e.toString()}`
|
||||
);
|
||||
pushError(`Failed to fetch the histogram data for group #${cpg.id}.`);
|
||||
return;
|
||||
}
|
||||
const histogram =
|
||||
overallTimeSeries.data.map((o, i) => {
|
||||
const current = cpgTimeSeries.data.find(
|
||||
(d1) => d1.key_as_string === o.key_as_string
|
||||
) ?? {
|
||||
doc_count: 0,
|
||||
};
|
||||
return {
|
||||
key: o.key,
|
||||
key_as_string: o.key_as_string ?? '',
|
||||
doc_count_change_point: current.doc_count,
|
||||
doc_count_overall: Math.max(0, o.doc_count - current.doc_count),
|
||||
};
|
||||
}) ?? [];
|
||||
|
||||
push(
|
||||
addChangePointsGroupHistogramAction([
|
||||
{
|
||||
id: cpg.id,
|
||||
histogram,
|
||||
},
|
||||
])
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
`Failed to transform field/value pairs into groups, got: \n${e.toString()}`
|
||||
);
|
||||
pushError(`Failed to transform field/value pairs into groups.`);
|
||||
}
|
||||
}
|
||||
|
||||
loaded += PROGRESS_STEP_HISTOGRAMS_GROUPS;
|
||||
|
||||
logInfoMessage('Fetch field/value histograms.');
|
||||
|
||||
// time series filtered by fields
|
||||
if (changePoints) {
|
||||
await asyncForEach(changePoints, async (cp, index) => {
|
||||
if (changePoints) {
|
||||
if (changePoints && overallTimeSeries !== undefined) {
|
||||
await asyncForEach(changePoints, async (cp) => {
|
||||
if (overallTimeSeries !== undefined) {
|
||||
const histogramQuery = {
|
||||
bool: {
|
||||
filter: [
|
||||
|
@ -495,24 +593,40 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
},
|
||||
};
|
||||
|
||||
const [cpTimeSeries] = (await fetchHistogramsForFields(
|
||||
client,
|
||||
request.body.index,
|
||||
histogramQuery,
|
||||
// fields
|
||||
[
|
||||
{
|
||||
fieldName: request.body.timeFieldName,
|
||||
type: KBN_FIELD_TYPES.DATE,
|
||||
interval: overallTimeSeries.interval,
|
||||
min: overallTimeSeries.stats[0],
|
||||
max: overallTimeSeries.stats[1],
|
||||
},
|
||||
],
|
||||
// samplerShardSize
|
||||
-1,
|
||||
undefined
|
||||
)) as [NumericChartData];
|
||||
let cpTimeSeries: NumericChartData;
|
||||
|
||||
try {
|
||||
cpTimeSeries = (
|
||||
(await fetchHistogramsForFields(
|
||||
client,
|
||||
request.body.index,
|
||||
histogramQuery,
|
||||
// fields
|
||||
[
|
||||
{
|
||||
fieldName: request.body.timeFieldName,
|
||||
type: KBN_FIELD_TYPES.DATE,
|
||||
interval: overallTimeSeries.interval,
|
||||
min: overallTimeSeries.stats[0],
|
||||
max: overallTimeSeries.stats[1],
|
||||
},
|
||||
],
|
||||
// samplerShardSize
|
||||
-1,
|
||||
undefined
|
||||
)) as [NumericChartData]
|
||||
)[0];
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${
|
||||
cp.fieldValue
|
||||
}", got: \n${e.toString()}`
|
||||
);
|
||||
pushError(
|
||||
`Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const histogram =
|
||||
overallTimeSeries.data.map((o, i) => {
|
||||
|
|
|
@ -8,6 +8,7 @@ import { uniqBy } from 'lodash';
|
|||
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { ElasticsearchClient } from '@kbn/core/server';
|
||||
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import { ChangePoint } from '@kbn/ml-agg-utils';
|
||||
import { SPIKE_ANALYSIS_THRESHOLD } from '../../../common/constants';
|
||||
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';
|
||||
|
@ -92,7 +93,9 @@ interface Aggs extends estypes.AggregationsSignificantLongTermsAggregate {
|
|||
export const fetchChangePointPValues = async (
|
||||
esClient: ElasticsearchClient,
|
||||
params: AiopsExplainLogRateSpikesSchema,
|
||||
fieldNames: string[]
|
||||
fieldNames: string[],
|
||||
logger: Logger,
|
||||
emitError: (m: string) => void
|
||||
): Promise<ChangePoint[]> => {
|
||||
const result: ChangePoint[] = [];
|
||||
|
||||
|
@ -101,7 +104,16 @@ export const fetchChangePointPValues = async (
|
|||
const resp = await esClient.search<unknown, { change_point_p_value: Aggs }>(request);
|
||||
|
||||
if (resp.aggregations === undefined) {
|
||||
throw new Error('fetchChangePoint failed, did not return aggregations.');
|
||||
logger.error(
|
||||
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
|
||||
resp,
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
);
|
||||
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
|
||||
// Still continue the analysis even if individual p-value queries fail.
|
||||
continue;
|
||||
}
|
||||
|
||||
const overallResult = resp.aggregations.change_point_p_value;
|
||||
|
|
|
@ -10,6 +10,7 @@ import { uniq, uniqWith, pick, isEqual } from 'lodash';
|
|||
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import type { ChangePoint, FieldValuePair } from '@kbn/ml-agg-utils';
|
||||
|
||||
interface FrequentItemsAggregation extends estypes.AggregationsSamplerAggregation {
|
||||
|
@ -53,9 +54,11 @@ export async function fetchFrequentItems(
|
|||
changePoints: ChangePoint[],
|
||||
timeFieldName: string,
|
||||
deviationMin: number,
|
||||
deviationMax: number
|
||||
deviationMax: number,
|
||||
logger: Logger,
|
||||
emitError: (m: string) => void
|
||||
) {
|
||||
// get unique fields that are left
|
||||
// get unique fields from change points
|
||||
const fields = [...new Set(changePoints.map((t) => t.fieldName))];
|
||||
|
||||
// TODO add query params
|
||||
|
@ -91,6 +94,8 @@ export async function fetchFrequentItems(
|
|||
sampleProbability = Math.min(0.5, minDocCount / totalDocCount);
|
||||
}
|
||||
|
||||
logger.debug(`frequent_items sample probability: ${sampleProbability}`);
|
||||
|
||||
// frequent items can be slow, so sample and use 10% min_support
|
||||
const aggs: Record<string, estypes.AggregationsAggregationContainer> = {
|
||||
sample: {
|
||||
|
@ -103,7 +108,7 @@ export async function fetchFrequentItems(
|
|||
frequent_items: {
|
||||
minimum_set_size: 2,
|
||||
size: 200,
|
||||
minimum_support: 0.01,
|
||||
minimum_support: 0.1,
|
||||
fields: aggFields,
|
||||
},
|
||||
},
|
||||
|
@ -125,12 +130,18 @@ export async function fetchFrequentItems(
|
|||
{ maxRetries: 0 }
|
||||
);
|
||||
|
||||
const totalDocCountFi = (body.hits.total as estypes.SearchTotalHits).value;
|
||||
|
||||
if (body.aggregations === undefined) {
|
||||
throw new Error('fetchFrequentItems failed, did not return aggregations.');
|
||||
logger.error(`Failed to fetch frequent_items, got: \n${JSON.stringify(body, null, 2)}`);
|
||||
emitError(`Failed to fetch frequent_items.`);
|
||||
return {
|
||||
fields: [],
|
||||
df: [],
|
||||
totalDocCount: 0,
|
||||
};
|
||||
}
|
||||
|
||||
const totalDocCountFi = (body.hits.total as estypes.SearchTotalHits).value;
|
||||
|
||||
const shape = body.aggregations.sample.fi.buckets.length;
|
||||
let maximum = shape;
|
||||
if (maximum > 50000) {
|
||||
|
|
|
@ -222,9 +222,7 @@ export default ({ getService }: FtrProviderContext) => {
|
|||
const errorActions = data.filter((d) => d.type === expected.errorFilter);
|
||||
expect(errorActions.length).to.be(1);
|
||||
|
||||
expect(errorActions[0].payload).to.be(
|
||||
'ResponseError: index_not_found_exception: [index_not_found_exception] Reason: no such index [does_not_exist]'
|
||||
);
|
||||
expect(errorActions[0].payload).to.be('Failed to fetch field candidates.');
|
||||
});
|
||||
});
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue