[ML] Explain log rate spikes: Page setup (#132121)

Builds out UI/code boilerplate necessary before we start implementing the feature's own UI on a dedicated page.

- Updates navigation to bring up data view/saved search selection before moving on to the explain log spike rates page.
The bar chart race demo page was moved to the aiops/single_endpoint_streaming_demo url. It is kept in this PR so we have two different pages + API endpoints that use streaming. With this still in place it's easier to update the streaming code to be more generic and reusable.
- The url/page aiops/explain_log_rate_spikes has been added with some dummy request that slowly streams a data view's fields to the client. This page will host the actual UI to be brought over from the PoC in follow ups to this PR.
- The structure to embed aiops plugin pages in the ml plugin has been simplified. Instead of a lot of custom code to load the components at runtime in the aiops plugin itself, this now uses React lazy loading with Suspense, similar to how we load Vega charts in other places. We no longer initialize the aiops client side code during startup of the plugin itself and augment it, instead we statically import components and pass on props/contexts from the ml plugin.
- The code to handle streaming chunks on the client side in stream_fetch.ts/use_stream_fetch_reducer.ts has been improved to make better use of TS generics so for a given API endpoint it's able to return the appropriate coresponding return data type and only allows to use the supported reducer actions for that endpoint. Buffering client side actions has been tweaked to handle state updates more quickly if updates from the server are stalling.
This commit is contained in:
Walter Rafelsberger 2022-05-20 10:45:50 +02:00 committed by GitHub
parent 3982bfd3fd
commit 24bdc97413
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 1203 additions and 481 deletions

View file

@ -65,4 +65,7 @@ export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
};
}
export type ApiAction = ApiActionUpdateProgress | ApiActionAddToEntity | ApiActionDeleteEntity;
export type AiopsExampleStreamApiAction =
| ApiActionUpdateProgress
| ApiActionAddToEntity
| ApiActionDeleteEntity;

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema, TypeOf } from '@kbn/config-schema';
export const aiopsExplainLogRateSpikesSchema = schema.object({
/** The index to query for log rate spikes */
index: schema.string(),
});
export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;
export const API_ACTION_NAME = {
ADD_FIELDS: 'add_fields',
} as const;
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];
interface ApiActionAddFields {
type: typeof API_ACTION_NAME.ADD_FIELDS;
payload: string[];
}
export function addFieldsAction(payload: string[]): ApiActionAddFields {
return {
type: API_ACTION_NAME.ADD_FIELDS,
payload,
};
}
export type AiopsExplainLogRateSpikesApiAction = ApiActionAddFields;

View file

@ -5,15 +5,24 @@
* 2.0.
*/
import type { AiopsExampleStreamSchema } from './example_stream';
import type {
AiopsExplainLogRateSpikesSchema,
AiopsExplainLogRateSpikesApiAction,
} from './explain_log_rate_spikes';
import type { AiopsExampleStreamSchema, AiopsExampleStreamApiAction } from './example_stream';
export const API_ENDPOINT = {
EXAMPLE_STREAM: '/internal/aiops/example_stream',
ANOTHER: '/internal/aiops/another',
EXPLAIN_LOG_RATE_SPIKES: '/internal/aiops/explain_log_rate_spikes',
} as const;
export type ApiEndpoint = typeof API_ENDPOINT[keyof typeof API_ENDPOINT];
export interface ApiEndpointOptions {
[API_ENDPOINT.EXAMPLE_STREAM]: AiopsExampleStreamSchema;
[API_ENDPOINT.ANOTHER]: { anotherOption: string };
[API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES]: AiopsExplainLogRateSpikesSchema;
}
export interface ApiEndpointActions {
[API_ENDPOINT.EXAMPLE_STREAM]: AiopsExampleStreamApiAction;
[API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES]: AiopsExplainLogRateSpikesApiAction;
}

View file

@ -9,7 +9,7 @@
"description": "AIOps plugin maintained by ML team.",
"server": true,
"ui": true,
"requiredPlugins": [],
"requiredPlugins": ["data"],
"optionalPlugins": [],
"requiredBundles": ["kibanaReact"],
"extraPublicDirs": ["common"]

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { lazyLoadModules } from '../lazy_load_bundle';
import type { ExplainLogRateSpikesSpec } from '../components/explain_log_rate_spikes';
export async function getExplainLogRateSpikesComponent(): Promise<() => ExplainLogRateSpikesSpec> {
const modules = await lazyLoadModules();
return () => modules.ExplainLogRateSpikes;
}

View file

@ -1,167 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useEffect, useState } from 'react';
import { Chart, Settings, Axis, BarSeries, Position, ScaleType } from '@elastic/charts';
import { i18n } from '@kbn/i18n';
import { FormattedMessage } from '@kbn/i18n-react';
import { useKibana } from '@kbn/kibana-react-plugin/public';
import {
EuiBadge,
EuiButton,
EuiCheckbox,
EuiFlexGroup,
EuiFlexItem,
EuiPage,
EuiPageBody,
EuiPageContent,
EuiPageContentBody,
EuiPageContentHeader,
EuiProgress,
EuiSpacer,
EuiTitle,
EuiText,
} from '@elastic/eui';
import { getStatusMessage } from './get_status_message';
import { initialState, resetStream, streamReducer } from './stream_reducer';
import { useStreamFetchReducer } from './use_stream_fetch_reducer';
export const AiopsApp = () => {
const { notifications } = useKibana();
const [simulateErrors, setSimulateErrors] = useState(false);
const { dispatch, start, cancel, data, isCancelled, isRunning } = useStreamFetchReducer(
'/internal/aiops/example_stream',
streamReducer,
initialState,
{ simulateErrors }
);
const { errors, progress, entities } = data;
const onClickHandler = async () => {
if (isRunning) {
cancel();
} else {
dispatch(resetStream());
start();
}
};
useEffect(() => {
if (errors.length > 0) {
notifications.toasts.danger({ body: errors[errors.length - 1] });
}
}, [errors, notifications.toasts]);
const buttonLabel = isRunning
? i18n.translate('xpack.aiops.stopbuttonText', {
defaultMessage: 'Stop development',
})
: i18n.translate('xpack.aiops.startbuttonText', {
defaultMessage: 'Start development',
});
return (
<EuiPage restrictWidth="1000px">
<EuiPageBody>
<EuiPageContent>
<EuiPageContentHeader>
<EuiTitle>
<h2>
<FormattedMessage
id="xpack.aiops.congratulationsTitle"
defaultMessage="Single endpoint streaming demo"
/>
</h2>
</EuiTitle>
</EuiPageContentHeader>
<EuiPageContentBody>
<EuiText>
<EuiFlexGroup alignItems="center">
<EuiFlexItem grow={false}>
<EuiButton
type="primary"
size="s"
onClick={onClickHandler}
aria-label={buttonLabel}
>
{buttonLabel}
</EuiButton>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiText>
<EuiBadge>{progress}%</EuiBadge>
</EuiText>
</EuiFlexItem>
<EuiFlexItem>
<EuiProgress value={progress} max={100} size="xs" />
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer />
<div style={{ height: '300px' }}>
<Chart>
<Settings rotation={90} />
<Axis
id="entities"
position={Position.Bottom}
title={i18n.translate('xpack.aiops.barChart.commitsTitle', {
defaultMessage: 'Commits',
})}
showOverlappingTicks
/>
<Axis
id="left2"
title={i18n.translate('xpack.aiops.barChart.developersTitle', {
defaultMessage: 'Developers',
})}
position={Position.Left}
/>
<BarSeries
id="commits"
xScaleType={ScaleType.Linear}
yScaleType={ScaleType.Linear}
xAccessor="x"
yAccessors={['y']}
data={Object.entries(entities)
.map(([x, y]) => {
return {
x,
y,
};
})
.sort((a, b) => b.y - a.y)}
/>
</Chart>
</div>
<p>{getStatusMessage(isRunning, isCancelled, data.progress)}</p>
<EuiCheckbox
id="aiopSimulateErrorsCheckbox"
label={i18n.translate(
'xpack.aiops.explainLogRateSpikes.simulateErrorsCheckboxLabel',
{
defaultMessage:
'Simulate errors (gets applied to new streams only, not currently running ones).',
}
)}
checked={simulateErrors}
onChange={(e) => setSimulateErrors(!simulateErrors)}
compressed
/>
</EuiText>
</EuiPageContentBody>
</EuiPageContent>
</EuiPageBody>
</EuiPage>
);
};

View file

@ -1,34 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import { KibanaContextProvider, KibanaThemeProvider } from '@kbn/kibana-react-plugin/public';
import { I18nProvider } from '@kbn/i18n-react';
import { getCoreStart } from '../kibana_services';
import { AiopsApp } from './app';
/**
* Spec used for lazy loading in the ML plugin
*/
export type ExplainLogRateSpikesSpec = typeof ExplainLogRateSpikes;
export const ExplainLogRateSpikes: FC = () => {
const coreStart = getCoreStart();
return (
<KibanaThemeProvider theme$={coreStart.theme.theme$}>
<KibanaContextProvider services={coreStart}>
<I18nProvider>
<AiopsApp />
</I18nProvider>
</KibanaContextProvider>
</KibanaThemeProvider>
);
};

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useEffect, FC } from 'react';
import { EuiBadge, EuiSpacer, EuiText } from '@elastic/eui';
import type { DataView } from '@kbn/data-views-plugin/public';
import { useStreamFetchReducer } from '../../hooks/use_stream_fetch_reducer';
import { initialState, streamReducer } from './stream_reducer';
/**
* ExplainLogRateSpikes props require a data view.
*/
export interface ExplainLogRateSpikesProps {
/** The data view to analyze. */
dataView: DataView;
}
export const ExplainLogRateSpikes: FC<ExplainLogRateSpikesProps> = ({ dataView }) => {
const { start, data, isRunning } = useStreamFetchReducer(
'/internal/aiops/explain_log_rate_spikes',
streamReducer,
initialState,
{ index: dataView.title }
);
useEffect(() => {
start();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
return (
<EuiText>
<h2>{dataView.title}</h2>
<p>{isRunning ? 'Loading fields ...' : 'Loaded all fields.'}</p>
<EuiSpacer size="xs" />
{data.fields.map((field) => (
<EuiBadge>{field}</EuiBadge>
))}
</EuiText>
);
};

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export type { ExplainLogRateSpikesProps } from './explain_log_rate_spikes';
import { ExplainLogRateSpikes } from './explain_log_rate_spikes';
// required for dynamic import using React.lazy()
// eslint-disable-next-line import/no-default-export
export default ExplainLogRateSpikes;

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
API_ACTION_NAME,
AiopsExplainLogRateSpikesApiAction,
} from '../../../common/api/explain_log_rate_spikes';
interface StreamState {
fields: string[];
}
export const initialState: StreamState = {
fields: [],
};
export function streamReducer(
state: StreamState,
action: AiopsExplainLogRateSpikesApiAction | AiopsExplainLogRateSpikesApiAction[]
): StreamState {
if (Array.isArray(action)) {
return action.reduce(streamReducer, state);
}
switch (action.type) {
case API_ACTION_NAME.ADD_FIELDS:
return {
fields: [...state.fields, ...action.payload],
};
default:
return state;
}
}

View file

@ -5,5 +5,8 @@
* 2.0.
*/
export type { ExplainLogRateSpikesSpec } from '../../components/explain_log_rate_spikes';
export { ExplainLogRateSpikes } from '../../components/explain_log_rate_spikes';
import { SingleEndpointStreamingDemo } from './single_endpoint_streaming_demo';
// required for dynamic import using React.lazy()
// eslint-disable-next-line import/no-default-export
export default SingleEndpointStreamingDemo;

View file

@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useEffect, useState, FC } from 'react';
import { Chart, Settings, Axis, BarSeries, Position, ScaleType } from '@elastic/charts';
import { i18n } from '@kbn/i18n';
import { useKibana } from '@kbn/kibana-react-plugin/public';
import {
EuiBadge,
EuiButton,
EuiCheckbox,
EuiFlexGroup,
EuiFlexItem,
EuiProgress,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import { useStreamFetchReducer } from '../../hooks/use_stream_fetch_reducer';
import { getStatusMessage } from './get_status_message';
import { initialState, resetStream, streamReducer } from './stream_reducer';
export const SingleEndpointStreamingDemo: FC = () => {
const { notifications } = useKibana();
const [simulateErrors, setSimulateErrors] = useState(false);
const { dispatch, start, cancel, data, isCancelled, isRunning } = useStreamFetchReducer(
'/internal/aiops/example_stream',
streamReducer,
initialState,
{ simulateErrors }
);
const { errors, progress, entities } = data;
const onClickHandler = async () => {
if (isRunning) {
cancel();
} else {
dispatch(resetStream());
start();
}
};
useEffect(() => {
if (errors.length > 0) {
notifications.toasts.danger({ body: errors[errors.length - 1] });
}
}, [errors, notifications.toasts]);
const buttonLabel = isRunning
? i18n.translate('xpack.aiops.stopbuttonText', {
defaultMessage: 'Stop development',
})
: i18n.translate('xpack.aiops.startbuttonText', {
defaultMessage: 'Start development',
});
return (
<EuiText>
<EuiFlexGroup alignItems="center">
<EuiFlexItem grow={false}>
<EuiButton type="primary" size="s" onClick={onClickHandler} aria-label={buttonLabel}>
{buttonLabel}
</EuiButton>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiText>
<EuiBadge>{progress}%</EuiBadge>
</EuiText>
</EuiFlexItem>
<EuiFlexItem>
<EuiProgress value={progress} max={100} size="xs" />
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer />
<div style={{ height: '300px' }}>
<Chart>
<Settings rotation={90} />
<Axis
id="entities"
position={Position.Bottom}
title={i18n.translate('xpack.aiops.barChart.commitsTitle', {
defaultMessage: 'Commits',
})}
showOverlappingTicks
/>
<Axis
id="left2"
title={i18n.translate('xpack.aiops.barChart.developersTitle', {
defaultMessage: 'Developers',
})}
position={Position.Left}
/>
<BarSeries
id="commits"
xScaleType={ScaleType.Linear}
yScaleType={ScaleType.Linear}
xAccessor="x"
yAccessors={['y']}
data={Object.entries(entities)
.map(([x, y]) => {
return {
x,
y,
};
})
.sort((a, b) => b.y - a.y)}
/>
</Chart>
</div>
<p>{getStatusMessage(isRunning, isCancelled, data.progress)}</p>
<EuiCheckbox
id="aiopSimulateErrorsCheckbox"
label={i18n.translate('xpack.aiops.explainLogRateSpikes.simulateErrorsCheckboxLabel', {
defaultMessage:
'Simulate errors (gets applied to new streams only, not currently running ones).',
})}
checked={simulateErrors}
onChange={(e) => setSimulateErrors(!simulateErrors)}
compressed
/>
</EuiText>
);
};

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { ApiAction, API_ACTION_NAME } from '../../common/api/example_stream';
import { AiopsExampleStreamApiAction, API_ACTION_NAME } from '../../../common/api/example_stream';
export const UI_ACTION_NAME = {
ERROR: 'error',
@ -37,7 +37,7 @@ export function resetStream(): UiActionResetStream {
}
type UiAction = UiActionResetStream | UiActionError;
export type ReducerAction = ApiAction | UiAction;
export type ReducerAction = AiopsExampleStreamApiAction | UiAction;
export function streamReducer(
state: StreamState,
action: ReducerAction | ReducerAction[]

View file

@ -7,14 +7,19 @@
import type React from 'react';
import type { ApiEndpoint, ApiEndpointOptions } from '../../common/api';
import type { ApiEndpoint, ApiEndpointActions, ApiEndpointOptions } from '../../common/api';
export async function* streamFetch<A = unknown, E = ApiEndpoint>(
interface ErrorAction {
type: 'error';
payload: string;
}
export async function* streamFetch<E extends ApiEndpoint>(
endpoint: E,
abortCtrl: React.MutableRefObject<AbortController>,
options: ApiEndpointOptions[ApiEndpoint],
options: ApiEndpointOptions[E],
basePath = ''
) {
): AsyncGenerator<Array<ApiEndpointActions[E] | ErrorAction>> {
const stream = await fetch(`${basePath}${endpoint}`, {
signal: abortCtrl.current.signal,
method: 'POST',
@ -36,7 +41,7 @@ export async function* streamFetch<A = unknown, E = ApiEndpoint>(
const bufferBounce = 100;
let partial = '';
let actionBuffer: A[] = [];
let actionBuffer: Array<ApiEndpointActions[E]> = [];
let lastCall = 0;
while (true) {
@ -52,7 +57,7 @@ export async function* streamFetch<A = unknown, E = ApiEndpoint>(
partial = last ?? '';
const actions = parts.map((p) => JSON.parse(p));
const actions = parts.map((p) => JSON.parse(p)) as Array<ApiEndpointActions[E]>;
actionBuffer.push(...actions);
const now = Date.now();
@ -61,10 +66,26 @@ export async function* streamFetch<A = unknown, E = ApiEndpoint>(
yield actionBuffer;
actionBuffer = [];
lastCall = now;
// In cases where the next chunk takes longer to be received than the `bufferBounce` timeout,
// we trigger this client side timeout to clear a potential intermediate buffer state.
// Since `yield` cannot be passed on to other scopes like callbacks,
// this pattern using a Promise is used to wait for the timeout.
yield new Promise<Array<ApiEndpointActions[E]>>((resolve) => {
setTimeout(() => {
if (actionBuffer.length > 0) {
resolve(actionBuffer);
actionBuffer = [];
lastCall = now;
} else {
resolve([]);
}
}, bufferBounce + 10);
});
}
} catch (error) {
if (error.name !== 'AbortError') {
yield { type: 'error', payload: error.toString() };
yield [{ type: 'error', payload: error.toString() }];
}
break;
}

View file

@ -5,7 +5,15 @@
* 2.0.
*/
import { useReducer, useRef, useState, Reducer, ReducerAction, ReducerState } from 'react';
import {
useEffect,
useReducer,
useRef,
useState,
Reducer,
ReducerAction,
ReducerState,
} from 'react';
import { useKibana } from '@kbn/kibana-react-plugin/public';
@ -13,11 +21,11 @@ import type { ApiEndpoint, ApiEndpointOptions } from '../../common/api';
import { streamFetch } from './stream_fetch';
export const useStreamFetchReducer = <R extends Reducer<any, any>, E = ApiEndpoint>(
export const useStreamFetchReducer = <R extends Reducer<any, any>, E extends ApiEndpoint>(
endpoint: E,
reducer: R,
initialState: ReducerState<R>,
options: ApiEndpointOptions[ApiEndpoint]
options: ApiEndpointOptions[E]
) => {
const kibana = useKibana();
@ -44,7 +52,9 @@ export const useStreamFetchReducer = <R extends Reducer<any, any>, E = ApiEndpoi
options,
kibana.services.http?.basePath.get()
)) {
dispatch(actions as ReducerAction<R>);
if (actions.length > 0) {
dispatch(actions as ReducerAction<R>);
}
}
setIsRunning(false);
@ -56,6 +66,11 @@ export const useStreamFetchReducer = <R extends Reducer<any, any>, E = ApiEndpoi
setIsRunning(false);
};
// If components using this custom hook get unmounted, cancel any ongoing request.
useEffect(() => {
return () => abortCtrl.current.abort();
}, []);
return {
cancel,
data,

View file

@ -13,6 +13,6 @@ export function plugin() {
return new AiopsPlugin();
}
export type { ExplainLogRateSpikesProps } from './components/explain_log_rate_spikes';
export { ExplainLogRateSpikes, SingleEndpointStreamingDemo } from './shared_lazy_components';
export type { AiopsPluginSetup, AiopsPluginStart } from './types';
export type { ExplainLogRateSpikesSpec } from './components/explain_log_rate_spikes';

View file

@ -1,19 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreStart } from '@kbn/core/public';
import { AppPluginStartDependencies } from './types';
let coreStart: CoreStart;
let pluginsStart: AppPluginStartDependencies;
export function setStartServices(core: CoreStart, plugins: AppPluginStartDependencies) {
coreStart = core;
pluginsStart = plugins;
}
export const getCoreStart = () => coreStart;
export const getPluginsStart = () => pluginsStart;

View file

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ExplainLogRateSpikesSpec } from '../components/explain_log_rate_spikes';
let loadModulesPromise: Promise<LazyLoadedModules>;
interface LazyLoadedModules {
ExplainLogRateSpikes: ExplainLogRateSpikesSpec;
}
export async function lazyLoadModules(): Promise<LazyLoadedModules> {
if (typeof loadModulesPromise !== 'undefined') {
return loadModulesPromise;
}
loadModulesPromise = new Promise(async (resolve, reject) => {
try {
const lazyImports = await import('./lazy');
resolve({ ...lazyImports });
} catch (error) {
reject(error);
}
});
return loadModulesPromise;
}

View file

@ -7,19 +7,10 @@
import { CoreSetup, CoreStart, Plugin } from '@kbn/core/public';
import { getExplainLogRateSpikesComponent } from './api';
import { setStartServices } from './kibana_services';
import { AiopsPluginSetup, AiopsPluginStart } from './types';
export class AiopsPlugin implements Plugin<AiopsPluginSetup, AiopsPluginStart> {
public setup(core: CoreSetup) {}
public start(core: CoreStart) {
setStartServices(core, {});
return {
getExplainLogRateSpikesComponent,
};
}
public start(core: CoreStart) {}
public stop() {}
}

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC, Suspense } from 'react';
import { EuiErrorBoundary, EuiLoadingContent } from '@elastic/eui';
import type { ExplainLogRateSpikesProps } from './components/explain_log_rate_spikes';
const ExplainLogRateSpikesLazy = React.lazy(() => import('./components/explain_log_rate_spikes'));
const SingleEndpointStreamingDemoLazy = React.lazy(
() => import('./components/single_endpoint_streaming_demo')
);
const LazyWrapper: FC = ({ children }) => (
<EuiErrorBoundary>
<Suspense fallback={<EuiLoadingContent lines={3} />}>{children}</Suspense>
</EuiErrorBoundary>
);
/**
* Lazy-wrapped ExplainLogRateSpikes React component
* @param {ExplainLogRateSpikesProps} props - properties specifying the data on which to run the analysis.
*/
export const ExplainLogRateSpikes: FC<ExplainLogRateSpikesProps> = (props) => (
<LazyWrapper>
<ExplainLogRateSpikesLazy {...props} />
</LazyWrapper>
);
/**
* Lazy-wrapped SingleEndpointStreamingDemo React component
*/
export const SingleEndpointStreamingDemo: FC = () => (
<LazyWrapper>
<SingleEndpointStreamingDemoLazy />
</LazyWrapper>
);

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { acceptCompression } from './accept_compression';
describe('acceptCompression', () => {
it('should return false for empty headers', () => {
expect(acceptCompression({})).toBe(false);
});
it('should return false for other header containing gzip as string', () => {
expect(acceptCompression({ 'other-header': 'gzip, other' })).toBe(false);
});
it('should return false for other header containing gzip as array', () => {
expect(acceptCompression({ 'other-header': ['gzip', 'other'] })).toBe(false);
});
it('should return true for upper-case header containing gzip as string', () => {
expect(acceptCompression({ 'Accept-Encoding': 'gzip, other' })).toBe(true);
});
it('should return true for lower-case header containing gzip as string', () => {
expect(acceptCompression({ 'accept-encoding': 'gzip, other' })).toBe(true);
});
it('should return true for upper-case header containing gzip as array', () => {
expect(acceptCompression({ 'Accept-Encoding': ['gzip', 'other'] })).toBe(true);
});
it('should return true for lower-case header containing gzip as array', () => {
expect(acceptCompression({ 'accept-encoding': ['gzip', 'other'] })).toBe(true);
});
it('should return true for mixed headers containing gzip as string', () => {
expect(
acceptCompression({ 'accept-encoding': 'gzip, other', 'other-header': 'other-value' })
).toBe(true);
});
it('should return true for mixed headers containing gzip as array', () => {
expect(
acceptCompression({ 'accept-encoding': ['gzip', 'other'], 'other-header': 'other-value' })
).toBe(true);
});
});

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Headers } from '@kbn/core/server';
/**
* Returns whether request headers accept a response using gzip compression.
*
* @param headers - Request headers.
* @returns boolean
*/
export function acceptCompression(headers: Headers) {
let compressed = false;
Object.keys(headers).forEach((key) => {
if (key.toLocaleLowerCase() === 'accept-encoding') {
const acceptEncoding = headers[key];
function containsGzip(s: string) {
return s
.split(',')
.map((d) => d.trim())
.includes('gzip');
}
if (typeof acceptEncoding === 'string') {
compressed = containsGzip(acceptEncoding);
} else if (Array.isArray(acceptEncoding)) {
for (const ae of acceptEncoding) {
if (containsGzip(ae)) {
compressed = true;
break;
}
}
}
}
});
return compressed;
}

View file

@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import zlib from 'zlib';
import { loggerMock, MockedLogger } from '@kbn/logging-mocks';
import { API_ENDPOINT } from '../../common/api';
import type { ApiEndpointActions } from '../../common/api';
import { streamFactory } from './stream_factory';
type Action = ApiEndpointActions['/internal/aiops/explain_log_rate_spikes'];
const mockItem1: Action = {
type: 'add_fields',
payload: ['clientip'],
};
const mockItem2: Action = {
type: 'add_fields',
payload: ['referer'],
};
describe('streamFactory', () => {
let mockLogger: MockedLogger;
beforeEach(() => {
mockLogger = loggerMock.create();
});
it('should encode and receive an uncompressed stream', async () => {
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(mockLogger, {});
push(mockItem1);
push(mockItem2);
end();
let streamResult = '';
for await (const chunk of stream) {
streamResult += chunk.toString('utf8');
}
const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();
const parsedItems = streamItems.map((d) => JSON.parse(d));
expect(responseWithHeaders.headers).toBe(undefined);
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');
});
// Because zlib.gunzip's API expects a callback, we need to use `done` here
// to indicate once all assertions are run. However, it's not allowed to use both
// `async` and `done` for the test callback. That's why we're using an "async IIFE"
// pattern inside the tests callback to still be able to do async/await for the
// `for await()` part. Note that the unzipping here is done just to be able to
// decode the stream for the test and assert it. When used in actual code,
// the browser on the client side will automatically take care of unzipping
// without the need for additional custom code.
it('should encode and receive a compressed stream', (done) => {
(async () => {
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(mockLogger, { 'accept-encoding': 'gzip' });
push(mockItem1);
push(mockItem2);
end();
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
zlib.gunzip(buffer, function (err, decoded) {
expect(err).toBe(null);
const streamResult = decoded.toString('utf8');
const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();
const parsedItems = streamItems.map((d) => JSON.parse(d));
expect(responseWithHeaders.headers).toStrictEqual({ 'content-encoding': 'gzip' });
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');
done();
});
})();
});
});

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Stream } from 'stream';
import zlib from 'zlib';
import type { Headers, Logger } from '@kbn/core/server';
import { ApiEndpoint, ApiEndpointActions } from '../../common/api';
import { acceptCompression } from './accept_compression';
// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.
class ResponseStream extends Stream.PassThrough {
flush() {}
_read() {}
}
const DELIMITER = '\n';
/**
* Sets up a response stream with support for gzip compression depending on provided
* request headers.
*
* @param logger - Kibana provided logger.
* @param headers - Request headers.
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T extends ApiEndpoint>(logger: Logger, headers: Headers) {
const isCompressed = acceptCompression(headers);
const stream = isCompressed ? zlib.createGzip() : new ResponseStream();
function push(d: ApiEndpointActions[T]) {
try {
const line = JSON.stringify(d);
stream.write(`${line}${DELIMITER}`);
// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressed) {
stream.flush();
}
} catch (error) {
logger.error('Could not serialize or stream a message.');
logger.error(error);
}
}
function end() {
stream.end();
}
const responseWithHeaders = {
body: stream,
...(isCompressed
? {
headers: {
'content-encoding': 'gzip',
},
}
: {}),
};
return { DELIMITER, end, push, responseWithHeaders, stream };
}

View file

@ -6,23 +6,38 @@
*/
import { PluginInitializerContext, CoreSetup, CoreStart, Plugin, Logger } from '@kbn/core/server';
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
import { AiopsPluginSetup, AiopsPluginStart } from './types';
import { defineRoutes } from './routes';
import { AIOPS_ENABLED } from '../common';
export class AiopsPlugin implements Plugin<AiopsPluginSetup, AiopsPluginStart> {
import {
AiopsPluginSetup,
AiopsPluginStart,
AiopsPluginSetupDeps,
AiopsPluginStartDeps,
} from './types';
import { defineExampleStreamRoute, defineExplainLogRateSpikesRoute } from './routes';
export class AiopsPlugin
implements Plugin<AiopsPluginSetup, AiopsPluginStart, AiopsPluginSetupDeps, AiopsPluginStartDeps>
{
private readonly logger: Logger;
constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
}
public setup(core: CoreSetup) {
public setup(core: CoreSetup<AiopsPluginStartDeps>, deps: AiopsPluginSetupDeps) {
this.logger.debug('aiops: Setup');
const router = core.http.createRouter();
const router = core.http.createRouter<DataRequestHandlerContext>();
// Register server side APIs
defineRoutes(router, this.logger);
if (AIOPS_ENABLED) {
core.getStartServices().then(([_, depsStart]) => {
defineExampleStreamRoute(router, this.logger);
defineExplainLogRateSpikesRoute(router, this.logger);
});
}
return {};
}

View file

@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { IRouter, Logger } from '@kbn/core/server';
import {
aiopsExampleStreamSchema,
updateProgressAction,
addToEntityAction,
deleteEntityAction,
} from '../../common/api/example_stream';
import { API_ENDPOINT } from '../../common/api';
import { streamFactory } from '../lib/stream_factory';
export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
{
path: API_ENDPOINT.EXAMPLE_STREAM,
validate: {
body: aiopsExampleStreamSchema,
},
},
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
});
request.events.completed$.subscribe(() => {
shouldStop = true;
});
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXAMPLE_STREAM
>(logger, request.headers);
const entities = [
'kimchy',
's1monw',
'martijnvg',
'jasontedor',
'nik9000',
'javanna',
'rjernst',
'jrodewig',
];
const actions = [...Array(19).fill('add'), 'delete'];
if (simulateError) {
actions.push('server-only-error');
actions.push('server-to-client-error');
actions.push('client-error');
}
let progress = 0;
async function pushStreamUpdate() {
setTimeout(() => {
try {
progress++;
if (progress > 100 || shouldStop) {
end();
return;
}
push(updateProgressAction(progress));
const randomEntity = entities[Math.floor(Math.random() * entities.length)];
const randomAction = actions[Math.floor(Math.random() * actions.length)];
if (randomAction === 'add') {
const randomCommits = Math.floor(Math.random() * 100);
push(addToEntityAction(randomEntity, randomCommits));
} else if (randomAction === 'delete') {
push(deleteEntityAction(randomEntity));
} else if (randomAction === 'server-to-client-error') {
// Throw an error. It should not crash Kibana!
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'client-error') {
// Return not properly encoded JSON to the client.
stream.push(`{body:'Not valid JSON${DELIMITER}`);
}
pushStreamUpdate();
} catch (error) {
stream.push(
`${JSON.stringify({ type: 'error', payload: error.toString() })}${DELIMITER}`
);
end();
}
}, Math.floor(Math.random() * maxTimeoutMs));
}
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
return response.ok(responseWithHeaders);
}
);
};

View file

@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { firstValueFrom } from 'rxjs';
import type { IRouter, Logger } from '@kbn/core/server';
import type { DataRequestHandlerContext, IEsSearchRequest } from '@kbn/data-plugin/server';
import {
aiopsExplainLogRateSpikesSchema,
addFieldsAction,
} from '../../common/api/explain_log_rate_spikes';
import { API_ENDPOINT } from '../../common/api';
import { streamFactory } from '../lib/stream_factory';
export const defineExplainLogRateSpikesRoute = (
router: IRouter<DataRequestHandlerContext>,
logger: Logger
) => {
router.post(
{
path: API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES,
validate: {
body: aiopsExplainLogRateSpikesSchema,
},
},
async (context, request, response) => {
const index = request.body.index;
const controller = new AbortController();
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
controller.abort();
});
request.events.completed$.subscribe(() => {
shouldStop = true;
controller.abort();
});
const search = await context.search;
const res = await firstValueFrom(
search.search(
{
params: {
index,
body: { size: 1 },
},
} as IEsSearchRequest,
{ abortSignal: controller.signal }
)
);
const doc = res.rawResponse.hits.hits.pop();
const fields = Object.keys(doc?._source ?? {});
const { end, push, responseWithHeaders } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(logger, request.headers);
async function pushField() {
setTimeout(() => {
if (shouldStop) {
end();
return;
}
const field = fields.pop();
if (field !== undefined) {
push(addFieldsAction([field]));
pushField();
} else {
end();
}
}, Math.random() * 1000);
}
pushField();
return response.ok(responseWithHeaders);
}
);
};

View file

@ -5,125 +5,5 @@
* 2.0.
*/
import { Readable } from 'stream';
import type { IRouter, Logger } from '@kbn/core/server';
import { AIOPS_ENABLED } from '../../common';
import type { ApiAction } from '../../common/api/example_stream';
import {
aiopsExampleStreamSchema,
updateProgressAction,
addToEntityAction,
deleteEntityAction,
} from '../../common/api/example_stream';
// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.
class ResponseStream extends Readable {
_read(): void {}
}
const delimiter = '\n';
export function defineRoutes(router: IRouter, logger: Logger) {
if (AIOPS_ENABLED) {
router.post(
{
path: '/internal/aiops/example_stream',
validate: {
body: aiopsExampleStreamSchema,
},
},
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
});
request.events.completed$.subscribe(() => {
shouldStop = true;
});
const stream = new ResponseStream();
function streamPush(d: ApiAction) {
try {
const line = JSON.stringify(d);
stream.push(`${line}${delimiter}`);
} catch (error) {
logger.error('Could not serialize or stream a message.');
logger.error(error);
}
}
const entities = [
'kimchy',
's1monw',
'martijnvg',
'jasontedor',
'nik9000',
'javanna',
'rjernst',
'jrodewig',
];
const actions = [...Array(19).fill('add'), 'delete'];
if (simulateError) {
actions.push('server-only-error');
actions.push('server-to-client-error');
actions.push('client-error');
}
let progress = 0;
async function pushStreamUpdate() {
setTimeout(() => {
try {
progress++;
if (progress > 100 || shouldStop) {
stream.push(null);
return;
}
streamPush(updateProgressAction(progress));
const randomEntity = entities[Math.floor(Math.random() * entities.length)];
const randomAction = actions[Math.floor(Math.random() * actions.length)];
if (randomAction === 'add') {
const randomCommits = Math.floor(Math.random() * 100);
streamPush(addToEntityAction(randomEntity, randomCommits));
} else if (randomAction === 'delete') {
streamPush(deleteEntityAction(randomEntity));
} else if (randomAction === 'server-to-client-error') {
// Throw an error. It should not crash Kibana!
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'client-error') {
// Return not properly encoded JSON to the client.
stream.push(`{body:'Not valid JSON${delimiter}`);
}
pushStreamUpdate();
} catch (error) {
stream.push(
`${JSON.stringify({ type: 'error', payload: error.toString() })}${delimiter}`
);
stream.push(null);
}
}, Math.floor(Math.random() * maxTimeoutMs));
}
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
return response.ok({
body: stream,
});
}
);
}
}
export { defineExampleStreamRoute } from './example_stream';
export { defineExplainLogRateSpikesRoute } from './explain_log_rate_spikes';

View file

@ -5,6 +5,16 @@
* 2.0.
*/
import { PluginSetup, PluginStart } from '@kbn/data-plugin/server';
export interface AiopsPluginSetupDeps {
data: PluginSetup;
}
export interface AiopsPluginStartDeps {
data: PluginStart;
}
/**
* aiops plugin server setup contract
*/

View file

@ -54,6 +54,8 @@ export const ML_PAGES = {
OVERVIEW: 'overview',
AIOPS: 'aiops',
AIOPS_EXPLAIN_LOG_RATE_SPIKES: 'aiops/explain_log_rate_spikes',
AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT: 'aiops/explain_log_rate_spikes_index_select',
AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO: 'aiops/single_endpoint_streaming_demo',
} as const;
export type MlPages = typeof ML_PAGES[keyof typeof ML_PAGES];

View file

@ -63,7 +63,9 @@ export type MlGenericUrlState = MLPageState<
| typeof ML_PAGES.DATA_VISUALIZER_FILE
| typeof ML_PAGES.DATA_VISUALIZER_INDEX_SELECT
| typeof ML_PAGES.AIOPS
| typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES,
| typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES
| typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT
| typeof ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO,
MlGenericUrlPageState | undefined
>;

View file

@ -5,44 +5,32 @@
* 2.0.
*/
import React, { FC, useEffect, useState } from 'react';
import React, { FC } from 'react';
import { FormattedMessage } from '@kbn/i18n-react';
import type { ExplainLogRateSpikesSpec } from '@kbn/aiops-plugin/public';
import { useMlKibana, useTimefilter } from '../contexts/kibana';
import { ExplainLogRateSpikes } from '@kbn/aiops-plugin/public';
import { useMlContext } from '../contexts/ml';
import { useMlKibana } from '../contexts/kibana';
import { HelpMenu } from '../components/help_menu';
import { MlPageHeader } from '../components/page_header';
export const ExplainLogRateSpikesPage: FC = () => {
useTimefilter({ timeRangeSelector: false, autoRefreshSelector: false });
const {
services: { docLinks, aiops },
services: { docLinks },
} = useMlKibana();
const [ExplainLogRateSpikes, setExplainLogRateSpikes] = useState<ExplainLogRateSpikesSpec | null>(
null
);
useEffect(() => {
if (aiops !== undefined) {
const { getExplainLogRateSpikesComponent } = aiops;
getExplainLogRateSpikesComponent().then(setExplainLogRateSpikes);
}
}, []);
const context = useMlContext();
return (
<>
{ExplainLogRateSpikes !== null ? (
<>
<MlPageHeader>
<FormattedMessage
id="xpack.ml.explainLogRateSpikes.pageHeader"
defaultMessage="Explain log rate spikes"
/>
</MlPageHeader>
<ExplainLogRateSpikes />
</>
) : null}
<MlPageHeader>
<FormattedMessage
id="xpack.ml.explainLogRateSpikes.pageHeader"
defaultMessage="Explain log rate spikes"
/>
</MlPageHeader>
<ExplainLogRateSpikes dataView={context.currentDataView} />
<HelpMenu docLink={docLinks.links.ml.guide} />
</>
);

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import { FormattedMessage } from '@kbn/i18n-react';
import { SingleEndpointStreamingDemo } from '@kbn/aiops-plugin/public';
import { useMlKibana, useTimefilter } from '../contexts/kibana';
import { HelpMenu } from '../components/help_menu';
import { MlPageHeader } from '../components/page_header';
export const SingleEndpointStreamingDemoPage: FC = () => {
useTimefilter({ timeRangeSelector: false, autoRefreshSelector: false });
const {
services: { docLinks },
} = useMlKibana();
return (
<>
<MlPageHeader>
<FormattedMessage
id="xpack.ml.singleEndpointStreamingDemo.pageHeader"
defaultMessage="Single endpoint streaming demo"
/>
</MlPageHeader>
<SingleEndpointStreamingDemo />
<HelpMenu docLink={docLinks.links.ml.guide} />
</>
);
};

View file

@ -229,13 +229,22 @@ export function useSideNavItems(activeRoute: MlRoute | undefined) {
items: [
{
id: 'explainlogratespikes',
pathId: ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES,
pathId: ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT,
name: i18n.translate('xpack.ml.navMenu.explainLogRateSpikesLinkText', {
defaultMessage: 'Explain log rate spikes',
}),
disabled: disableLinks,
testSubj: 'mlMainTab explainLogRateSpikes',
},
{
id: 'singleEndpointStreamingDemo',
pathId: ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO,
name: i18n.translate('xpack.ml.navMenu.singleEndpointStreamingDemoLinkText', {
defaultMessage: 'Single endpoint streaming demo',
}),
disabled: disableLinks,
testSubj: 'mlMainTab singleEndpointStreamingDemo',
},
],
});
}

View file

@ -6,9 +6,9 @@
*/
import React from 'react';
import { DataView, DataViewsContract } from '@kbn/data-views-plugin/public';
import { SavedSearchSavedObject } from '../../../../common/types/kibana';
import { MlServicesContext } from '../../app';
import type { DataView, DataViewsContract } from '@kbn/data-views-plugin/public';
import type { SavedSearchSavedObject } from '../../../../common/types/kibana';
import type { MlServicesContext } from '../../app';
export interface MlContextValue {
combinedQuery: any;

View file

@ -59,7 +59,7 @@ export const AIOPS_BREADCRUMB: ChromeBreadcrumb = Object.freeze({
text: i18n.translate('xpack.ml.aiopsBreadcrumbLabel', {
defaultMessage: 'AIOps',
}),
href: '/aiops',
href: '/aiops/explain_log_rate_spikes_index_select',
});
export const CREATE_JOB_BREADCRUMB: ChromeBreadcrumb = Object.freeze({

View file

@ -37,7 +37,7 @@ export const explainLogRateSpikesRouteFactory = (
getBreadcrumbWithUrlForApp('ML_BREADCRUMB', navigateToPath, basePath),
getBreadcrumbWithUrlForApp('AIOPS_BREADCRUMB', navigateToPath, basePath),
{
text: i18n.translate('xpack.ml.AiopsBreadcrumbs.explainLogRateSpikesLabel', {
text: i18n.translate('xpack.ml.aiopsBreadcrumbs.explainLogRateSpikesLabel', {
defaultMessage: 'Explain log rate spikes',
}),
},

View file

@ -6,3 +6,4 @@
*/
export * from './explain_log_rate_spikes';
export * from './single_endpoint_streaming_demo';

View file

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import { parse } from 'query-string';
import { i18n } from '@kbn/i18n';
import { AIOPS_ENABLED } from '@kbn/aiops-plugin/common';
import { NavigateToPath } from '../../../contexts/kibana';
import { MlRoute, PageLoader, PageProps } from '../../router';
import { useResolver } from '../../use_resolver';
import { SingleEndpointStreamingDemoPage as Page } from '../../../aiops/single_endpoint_streaming_demo';
import { checkBasicLicense } from '../../../license';
import { checkGetJobsCapabilitiesResolver } from '../../../capabilities/check_capabilities';
import { cacheDataViewsContract } from '../../../util/index_utils';
import { getBreadcrumbWithUrlForApp } from '../../breadcrumbs';
export const singleEndpointStreamingDemoRouteFactory = (
navigateToPath: NavigateToPath,
basePath: string
): MlRoute => ({
id: 'single_endpoint_streaming_demo',
path: '/aiops/single_endpoint_streaming_demo',
title: i18n.translate('xpack.ml.aiops.singleEndpointStreamingDemo.docTitle', {
defaultMessage: 'Single endpoint streaming demo',
}),
render: (props, deps) => <PageWrapper {...props} deps={deps} />,
breadcrumbs: [
getBreadcrumbWithUrlForApp('ML_BREADCRUMB', navigateToPath, basePath),
getBreadcrumbWithUrlForApp('AIOPS_BREADCRUMB', navigateToPath, basePath),
{
text: i18n.translate('xpack.ml.aiopsBreadcrumbs.singleEndpointStreamingDemoLabel', {
defaultMessage: 'Single endpoint streaming demo',
}),
},
],
disabled: !AIOPS_ENABLED,
});
const PageWrapper: FC<PageProps> = ({ location, deps }) => {
const { redirectToMlAccessDeniedPage } = deps;
const { index, savedSearchId }: Record<string, any> = parse(location.search, { sort: false });
const { context } = useResolver(index, savedSearchId, deps.config, deps.dataViewsContract, {
checkBasicLicense,
cacheDataViewsContract: () => cacheDataViewsContract(deps.dataViewsContract),
checkGetJobsCapabilities: () => checkGetJobsCapabilitiesResolver(redirectToMlAccessDeniedPage),
});
return (
<PageLoader context={context}>
<Page />
</PageLoader>
);
};

View file

@ -50,6 +50,16 @@ const getDataVisBreadcrumbs = (navigateToPath: NavigateToPath, basePath: string)
},
];
const getExplainLogRateSpikesBreadcrumbs = (navigateToPath: NavigateToPath, basePath: string) => [
getBreadcrumbWithUrlForApp('ML_BREADCRUMB', navigateToPath, basePath),
getBreadcrumbWithUrlForApp('AIOPS_BREADCRUMB', navigateToPath, basePath),
{
text: i18n.translate('xpack.ml.aiopsBreadcrumbs.selectDateViewLabel', {
defaultMessage: 'Data View',
}),
},
];
export const indexOrSearchRouteFactory = (
navigateToPath: NavigateToPath,
basePath: string
@ -86,6 +96,26 @@ export const dataVizIndexOrSearchRouteFactory = (
breadcrumbs: getDataVisBreadcrumbs(navigateToPath, basePath),
});
export const explainLogRateSpikesIndexOrSearchRouteFactory = (
navigateToPath: NavigateToPath,
basePath: string
): MlRoute => ({
id: 'data_view_explain_log_rate_spikes',
path: '/aiops/explain_log_rate_spikes_index_select',
title: i18n.translate('xpack.ml.selectDataViewLabel', {
defaultMessage: 'Select Data View',
}),
render: (props, deps) => (
<PageWrapper
{...props}
nextStepPath="aiops/explain_log_rate_spikes"
deps={deps}
mode={MODE.DATAVISUALIZER}
/>
),
breadcrumbs: getExplainLogRateSpikesBreadcrumbs(navigateToPath, basePath),
});
const PageWrapper: FC<IndexOrSearchPageProps> = ({ nextStepPath, deps, mode }) => {
const {
services: {

View file

@ -86,6 +86,8 @@ export class MlLocatorDefinition implements LocatorDefinition<MlLocatorParams> {
case ML_PAGES.DATA_VISUALIZER_INDEX_SELECT:
case ML_PAGES.AIOPS:
case ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES:
case ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT:
case ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO:
case ML_PAGES.OVERVIEW:
case ML_PAGES.SETTINGS:
case ML_PAGES.FILTER_LISTS_MANAGE:

View file

@ -12,6 +12,8 @@ import expect from '@kbn/expect';
import { FtrProviderContext } from '../../ftr_provider_context';
import { parseStream } from './parse_stream';
export default ({ getService }: FtrProviderContext) => {
const supertest = getService('supertest');
const config = getService('config');
@ -67,34 +69,15 @@ export default ({ getService }: FtrProviderContext) => {
expect(stream).not.to.be(null);
if (stream !== null) {
let partial = '';
let threw = false;
const progressData: any[] = [];
try {
for await (const value of stream) {
const full = `${partial}${value}`;
const parts = full.split('\n');
const last = parts.pop();
partial = last ?? '';
const actions = parts.map((p) => JSON.parse(p));
actions.forEach((action) => {
expect(typeof action.type).to.be('string');
if (action.type === 'update_progress') {
progressData.push(action);
}
});
for await (const action of parseStream(stream)) {
expect(action.type).not.to.be('error');
if (action.type === 'update_progress') {
progressData.push(action);
}
} catch (e) {
threw = true;
}
expect(threw).to.be(false);
expect(progressData.length).to.be(100);
expect(progressData[0].payload).to.be(1);
expect(progressData[progressData.length - 1].payload).to.be(100);

View file

@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import fetch from 'node-fetch';
import { format as formatUrl } from 'url';
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../ftr_provider_context';
import { parseStream } from './parse_stream';
export default ({ getService }: FtrProviderContext) => {
const supertest = getService('supertest');
const config = getService('config');
const kibanaServerUrl = formatUrl(config.get('servers.kibana'));
const expectedFields = [
'category',
'currency',
'customer_first_name',
'customer_full_name',
'customer_gender',
'customer_id',
'customer_last_name',
'customer_phone',
'day_of_week',
'day_of_week_i',
'email',
'geoip',
'manufacturer',
'order_date',
'order_id',
'products',
'sku',
'taxful_total_price',
'taxless_total_price',
'total_quantity',
'total_unique_products',
'type',
'user',
];
describe('POST /internal/aiops/explain_log_rate_spikes', () => {
const esArchiver = getService('esArchiver');
before(async () => {
await esArchiver.loadIfNeeded('x-pack/test/functional/es_archives/ml/ecommerce');
});
after(async () => {
await esArchiver.unload('x-pack/test/functional/es_archives/ml/ecommerce');
});
it('should return full data without streaming', async () => {
const resp = await supertest
.post(`/internal/aiops/explain_log_rate_spikes`)
.set('kbn-xsrf', 'kibana')
.send({
index: 'ft_ecommerce',
})
.expect(200);
expect(Buffer.isBuffer(resp.body)).to.be(true);
const chunks: string[] = resp.body.toString().split('\n');
expect(chunks.length).to.be(24);
const lastChunk = chunks.pop();
expect(lastChunk).to.be('');
let data: any[] = [];
expect(() => {
data = chunks.map((c) => JSON.parse(c));
}).not.to.throwError();
data.forEach((d) => {
expect(typeof d.type).to.be('string');
});
const fields = data.map((d) => d.payload[0]).sort();
expect(fields.length).to.equal(expectedFields.length);
fields.forEach((f) => {
expect(expectedFields.includes(f));
});
});
it('should return data in chunks with streaming', async () => {
const response = await fetch(`${kibanaServerUrl}/internal/aiops/explain_log_rate_spikes`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
body: JSON.stringify({ index: 'ft_ecommerce' }),
});
const stream = response.body;
expect(stream).not.to.be(null);
if (stream !== null) {
const data: any[] = [];
for await (const action of parseStream(stream)) {
expect(action.type).not.to.be('error');
data.push(action);
}
const fields = data.map((d) => d.payload[0]).sort();
expect(fields.length).to.equal(expectedFields.length);
fields.forEach((f) => {
expect(expectedFields.includes(f));
});
}
});
});
};

View file

@ -12,5 +12,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
this.tags(['ml']);
loadTestFile(require.resolve('./example_stream'));
loadTestFile(require.resolve('./explain_log_rate_spikes'));
});
}

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export async function* parseStream(stream: NodeJS.ReadableStream) {
let partial = '';
try {
for await (const value of stream) {
const full = `${partial}${value}`;
const parts = full.split('\n');
const last = parts.pop();
partial = last ?? '';
const actions = parts.map((p) => JSON.parse(p));
for (const action of actions) {
yield action;
}
}
} catch (error) {
yield { type: 'error', payload: error.toString() };
}
}