[ML] Explain Log Rate Spikes: Allow to continue failed stream. (#143301)

The analysis can be long running and in cases can time out depending on
server/proxy settings. This update allows a user to try to continue the
analysis if it failed half way through.
This commit is contained in:
Walter Rafelsberger 2022-11-10 09:03:44 +01:00 committed by GitHub
parent 3a24c1f8a6
commit 1d511a439d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 77 deletions

View file

@ -19,7 +19,8 @@ export const API_ACTION_NAME = {
ADD_CHANGE_POINTS_GROUP_HISTOGRAM: 'add_change_point_group_histogram',
ADD_ERROR: 'add_error',
PING: 'ping',
RESET: 'reset',
RESET_ALL: 'reset_all',
RESET_ERRORS: 'reset_errors',
UPDATE_LOADING_STATE: 'update_loading_state',
} as const;
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];
@ -90,6 +91,16 @@ export function addErrorAction(payload: ApiActionAddError['payload']): ApiAction
};
}
interface ApiActionResetErrors {
type: typeof API_ACTION_NAME.RESET_ERRORS;
}
export function resetErrorsAction() {
return {
type: API_ACTION_NAME.RESET_ERRORS,
};
}
interface ApiActionPing {
type: typeof API_ACTION_NAME.PING;
}
@ -98,12 +109,12 @@ export function pingAction(): ApiActionPing {
return { type: API_ACTION_NAME.PING };
}
interface ApiActionReset {
type: typeof API_ACTION_NAME.RESET;
interface ApiActionResetAll {
type: typeof API_ACTION_NAME.RESET_ALL;
}
export function resetAction(): ApiActionReset {
return { type: API_ACTION_NAME.RESET };
export function resetAllAction(): ApiActionResetAll {
return { type: API_ACTION_NAME.RESET_ALL };
}
interface ApiActionUpdateLoadingState {
@ -112,6 +123,8 @@ interface ApiActionUpdateLoadingState {
ccsWarning: boolean;
loaded: number;
loadingState: string;
remainingFieldCandidates?: string[];
groupsMissing?: boolean;
};
}
@ -131,5 +144,6 @@ export type AiopsExplainLogRateSpikesApiAction =
| ApiActionAddChangePointsGroupHistogram
| ApiActionAddError
| ApiActionPing
| ApiActionReset
| ApiActionResetAll
| ApiActionResetErrors
| ApiActionUpdateLoadingState;

View file

@ -12,7 +12,8 @@ export {
addChangePointsHistogramAction,
addErrorAction,
pingAction,
resetAction,
resetAllAction,
resetErrorsAction,
updateLoadingStateAction,
API_ACTION_NAME,
} from './actions';

View file

@ -24,6 +24,15 @@ export const aiopsExplainLogRateSpikesSchema = schema.object({
/** Settings to override headers derived compression and flush fix */
compressResponse: schema.maybe(schema.boolean()),
flushFix: schema.maybe(schema.boolean()),
/** Overrides to skip steps of the analysis with existing data */
overrides: schema.maybe(
schema.object({
loaded: schema.maybe(schema.number()),
remainingFieldCandidates: schema.maybe(schema.arrayOf(schema.string())),
// TODO Improve schema
changePoints: schema.maybe(schema.arrayOf(schema.any())),
})
),
});
export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;

View file

@ -7,7 +7,7 @@
import {
addChangePointsAction,
resetAction,
resetAllAction,
updateLoadingStateAction,
} from './explain_log_rate_spikes';
import { initialState, streamReducer } from './stream_reducer';
@ -49,7 +49,7 @@ describe('streamReducer', () => {
expect(state1.changePoints).toHaveLength(1);
const state2 = streamReducer(state1, resetAction());
const state2 = streamReducer(state1, resetAllAction());
expect(state2.changePoints).toHaveLength(0);
});

View file

@ -16,6 +16,8 @@ interface StreamState {
errors: string[];
loaded: number;
loadingState: string;
remainingFieldCandidates?: string[];
groupsMissing?: boolean;
}
export const initialState: StreamState = {
@ -62,7 +64,9 @@ export function streamReducer(
return { ...state, changePointsGroups };
case API_ACTION_NAME.ADD_ERROR:
return { ...state, errors: [...state.errors, action.payload] };
case API_ACTION_NAME.RESET:
case API_ACTION_NAME.RESET_ERRORS:
return { ...state, errors: [] };
case API_ACTION_NAME.RESET_ALL:
return initialState;
case API_ACTION_NAME.UPDATE_LOADING_STATE:
return { ...state, ...action.payload };

View file

@ -9,6 +9,7 @@ import React, { useEffect, useMemo, useState, FC } from 'react';
import { isEqual } from 'lodash';
import {
EuiButton,
EuiCallOut,
EuiEmptyPrompt,
EuiFormRow,
@ -71,6 +72,10 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
WindowParameters | undefined
>();
const [groupResults, setGroupResults] = useState<boolean>(false);
const [overrides, setOverrides] = useState<
ApiExplainLogRateSpikes['body']['overrides'] | undefined
>(undefined);
const [shouldStart, setShouldStart] = useState(false);
const onSwitchToggle = (e: { target: { checked: React.SetStateAction<boolean> } }) => {
setGroupResults(e.target.checked);
@ -97,23 +102,55 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
grouping: true,
flushFix: true,
...windowParameters,
overrides,
},
{ reducer: streamReducer, initialState }
);
useEffect(() => {
if (!isRunning) {
const { loaded, remainingFieldCandidates, groupsMissing } = data;
if (
(Array.isArray(remainingFieldCandidates) && remainingFieldCandidates.length > 0) ||
groupsMissing
) {
setOverrides({ loaded, remainingFieldCandidates, changePoints: data.changePoints });
} else {
setOverrides(undefined);
}
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [isRunning]);
const errors = useMemo(() => [...streamErrors, ...data.errors], [streamErrors, data.errors]);
// Start handler clears possibly hovered or pinned
// change points on analysis refresh.
function startHandler() {
function startHandler(continueAnalysis = false) {
if (!continueAnalysis) {
setOverrides(undefined);
}
// Reset grouping to false and clear all row selections when restarting the analysis.
setGroupResults(false);
clearAllRowState();
setCurrentAnalysisWindowParameters(windowParameters);
start();
// We trigger hooks updates above so we cannot directly call `start()` here
// because it would be run with stale arguments.
setShouldStart(true);
}
useEffect(() => {
if (shouldStart) {
start();
setShouldStart(false);
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [shouldStart]);
useEffect(() => {
setCurrentAnalysisWindowParameters(windowParameters);
start();
@ -169,7 +206,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
progress={data.loaded}
progressMessage={data.loadingState ?? ''}
isRunning={isRunning}
onRefresh={startHandler}
onRefresh={() => startHandler(false)}
onCancel={cancel}
shouldRerunAnalysis={shouldRerunAnalysis}
/>
@ -195,6 +232,16 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
))}
</ul>
)}
{overrides !== undefined ? (
<p>
<EuiButton size="s" onClick={() => startHandler(true)}>
<FormattedMessage
id="xpack.aiops.explainLogRateSpikesPage.tryToContinueAnalysisButtonText"
defaultMessage="Try to continue analysis"
/>
</EuiButton>
</p>
) : null}
</EuiText>
</EuiCallOut>
<EuiSpacer size="xs" />

View file

@ -32,7 +32,8 @@ import {
aiopsExplainLogRateSpikesSchema,
addErrorAction,
pingAction,
resetAction,
resetAllAction,
resetErrorsAction,
updateLoadingStateAction,
AiopsExplainLogRateSpikesApiAction,
} from '../../common/api/explain_log_rate_spikes';
@ -170,87 +171,123 @@ export const defineExplainLogRateSpikesRoute = (
async function runAnalysis() {
try {
isRunning = true;
logDebugMessage('Reset.');
push(resetAction());
if (!request.body.overrides) {
logDebugMessage('Full Reset.');
push(resetAllAction());
} else {
logDebugMessage('Reset Errors.');
push(resetErrorsAction());
}
if (request.body.overrides?.loaded) {
logDebugMessage(`Set 'loaded' override to '${request.body.overrides?.loaded}'.`);
loaded = request.body.overrides?.loaded;
}
pushPingWithTimeout();
// Step 1: Index Info: Field candidates, total doc count, sample probability
const fieldCandidates: Awaited<ReturnType<typeof fetchIndexInfo>>['fieldCandidates'] = [];
let fieldCandidatesCount = fieldCandidates.length;
let sampleProbability = 1;
let totalDocCount = 0;
logDebugMessage('Fetch index information.');
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: i18n.translate(
'xpack.aiops.explainLogRateSpikes.loadingState.loadingIndexInformation',
{
defaultMessage: 'Loading index information.',
}
),
})
);
if (!request.body.overrides?.remainingFieldCandidates) {
logDebugMessage('Fetch index information.');
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: i18n.translate(
'xpack.aiops.explainLogRateSpikes.loadingState.loadingIndexInformation',
{
defaultMessage: 'Loading index information.',
}
),
})
);
try {
const indexInfo = await fetchIndexInfo(client, request.body, abortSignal);
fieldCandidates.push(...indexInfo.fieldCandidates);
sampleProbability = indexInfo.sampleProbability;
totalDocCount = indexInfo.totalDocCount;
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch index information, got: \n${e.toString()}`);
pushError(`Failed to fetch index information.`);
try {
const indexInfo = await fetchIndexInfo(client, request.body, abortSignal);
fieldCandidates.push(...indexInfo.fieldCandidates);
fieldCandidatesCount = fieldCandidates.length;
sampleProbability = indexInfo.sampleProbability;
totalDocCount = indexInfo.totalDocCount;
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch index information, got: \n${e.toString()}`);
pushError(`Failed to fetch index information.`);
}
end();
return;
}
logDebugMessage(`Total document count: ${totalDocCount}`);
logDebugMessage(`Sample probability: ${sampleProbability}`);
loaded += LOADED_FIELD_CANDIDATES;
pushPingWithTimeout();
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: i18n.translate(
'xpack.aiops.explainLogRateSpikes.loadingState.identifiedFieldCandidates',
{
defaultMessage:
'Identified {fieldCandidatesCount, plural, one {# field candidate} other {# field candidates}}.',
values: {
fieldCandidatesCount,
},
}
),
})
);
if (fieldCandidatesCount === 0) {
endWithUpdatedLoadingState();
} else if (shouldStop) {
logDebugMessage('shouldStop after fetching field candidates.');
end();
return;
}
end();
return;
}
logDebugMessage(`Total document count: ${totalDocCount}`);
logDebugMessage(`Sample probability: ${sampleProbability}`);
// Step 2: Significant Terms
loaded += LOADED_FIELD_CANDIDATES;
const fieldCandidatesCount = fieldCandidates.length;
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: i18n.translate(
'xpack.aiops.explainLogRateSpikes.loadingState.identifiedFieldCandidates',
{
defaultMessage:
'Identified {fieldCandidatesCount, plural, one {# field candidate} other {# field candidates}}.',
values: {
fieldCandidatesCount,
},
}
),
})
);
if (fieldCandidatesCount === 0) {
endWithUpdatedLoadingState();
} else if (shouldStop) {
logDebugMessage('shouldStop after fetching field candidates.');
end();
return;
}
const changePoints: ChangePoint[] = [];
const changePoints: ChangePoint[] = request.body.overrides?.changePoints
? request.body.overrides?.changePoints
: [];
const fieldsToSample = new Set<string>();
// Don't use more than 10 here otherwise Kibana will emit an error
// regarding a limit of abort signal listeners of more than 10.
const MAX_CONCURRENT_QUERIES = 10;
let remainingFieldCandidates: string[];
let loadingStepSizePValues = PROGRESS_STEP_P_VALUES;
if (request.body.overrides?.remainingFieldCandidates) {
fieldCandidates.push(...request.body.overrides?.remainingFieldCandidates);
remainingFieldCandidates = request.body.overrides?.remainingFieldCandidates;
fieldCandidatesCount = fieldCandidates.length;
loadingStepSizePValues =
LOADED_FIELD_CANDIDATES +
PROGRESS_STEP_P_VALUES -
(request.body.overrides?.loaded ?? PROGRESS_STEP_P_VALUES);
} else {
remainingFieldCandidates = fieldCandidates;
}
logDebugMessage('Fetch p-values.');
const pValuesQueue = queue(async function (fieldCandidate: string) {
loaded += (1 / fieldCandidatesCount) * PROGRESS_STEP_P_VALUES;
loaded += (1 / fieldCandidatesCount) * loadingStepSizePValues;
let pValues: Awaited<ReturnType<typeof fetchChangePointPValues>>;
@ -274,6 +311,8 @@ export const defineExplainLogRateSpikesRoute = (
return;
}
remainingFieldCandidates = remainingFieldCandidates.filter((d) => d !== fieldCandidate);
if (pValues.length > 0) {
pValues.forEach((d) => {
fieldsToSample.add(d.fieldName);
@ -297,17 +336,23 @@ export const defineExplainLogRateSpikesRoute = (
},
}
),
remainingFieldCandidates,
})
);
}, MAX_CONCURRENT_QUERIES);
if (shouldStop) {
pValuesQueue.push(fieldCandidates, (err) => {
if (err) {
logger.error(`Failed to fetch p-values.', got: \n${err.toString()}`);
pushError(`Failed to fetch p-values.`);
pValuesQueue.kill();
end();
} else if (shouldStop) {
logDebugMessage('shouldStop fetching p-values.');
pValuesQueue.kill();
end();
}
}, MAX_CONCURRENT_QUERIES);
pValuesQueue.push(fieldCandidates);
});
await pValuesQueue.drain();
if (changePoints.length === 0) {
@ -383,6 +428,7 @@ export const defineExplainLogRateSpikesRoute = (
defaultMessage: 'Transforming significant field/value pairs into groups.',
}
),
groupsMissing: true,
})
);