mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[ML] Explain log rate spikes: Adds API license check. (#135431)
- Adds a check to aiops API endpoints to only allow requests with active platinum license. - Adds integration tests for basic license where the endpoints should return permission denied. - Improved error handling: - Low level errors (like a non valid argument pushed to a stream) will now be logged to Kibana server's console, because the way HTTP streams work we cannot really emit a useful error to an already running stream to the client. So the stream will just abort but Kibana server will log an error. - Higher level errors on the application level (like when we find out an index does not exist to run the analysis) will be pushed to the stream now as an error type action and we can update the UI accordingly. Note this PR only updates the API and corresponding tests to support this, the UI doesn't make use of it yet.
This commit is contained in:
parent
e2907662fc
commit
d50434ed7b
23 changed files with 358 additions and 104 deletions
|
@ -14,6 +14,7 @@ export const API_ACTION_NAME = {
|
|||
UPDATE_PROGRESS: 'update_progress',
|
||||
ADD_TO_ENTITY: 'add_to_entity',
|
||||
DELETE_ENTITY: 'delete_entity',
|
||||
ERROR: 'error',
|
||||
} as const;
|
||||
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];
|
||||
|
||||
|
@ -59,7 +60,20 @@ export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
|
|||
};
|
||||
}
|
||||
|
||||
interface ApiActionError {
|
||||
type: typeof API_ACTION_NAME.ERROR;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export function errorAction(payload: string): ApiActionError {
|
||||
return {
|
||||
type: API_ACTION_NAME.ERROR,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
export type ReducerStreamApiAction =
|
||||
| ApiActionUpdateProgress
|
||||
| ApiActionAddToEntity
|
||||
| ApiActionDeleteEntity;
|
||||
| ApiActionDeleteEntity
|
||||
| ApiActionError;
|
||||
|
|
|
@ -14,10 +14,12 @@ export const UI_ACTION_NAME = {
|
|||
export type UiActionName = typeof UI_ACTION_NAME[keyof typeof UI_ACTION_NAME];
|
||||
|
||||
export interface StreamState {
|
||||
errors: string[];
|
||||
progress: number;
|
||||
entities: Record<string, number>;
|
||||
}
|
||||
export const initialState: StreamState = {
|
||||
errors: [],
|
||||
progress: 0,
|
||||
entities: {},
|
||||
};
|
||||
|
@ -64,6 +66,11 @@ export function reducerStreamReducer(
|
|||
...state,
|
||||
entities: addToEntities,
|
||||
};
|
||||
case API_ACTION_NAME.ERROR:
|
||||
return {
|
||||
...state,
|
||||
errors: [...state.errors, action.payload],
|
||||
};
|
||||
case UI_ACTION_NAME.RESET:
|
||||
return initialState;
|
||||
default:
|
||||
|
|
|
@ -65,12 +65,20 @@ export const PageReducerStream: FC = () => {
|
|||
}
|
||||
};
|
||||
|
||||
// This is for low level errors on the stream/HTTP level.
|
||||
useEffect(() => {
|
||||
if (error) {
|
||||
notifications.toasts.addDanger(error);
|
||||
}
|
||||
}, [error, notifications.toasts]);
|
||||
|
||||
// This is for errors on the application level
|
||||
useEffect(() => {
|
||||
if (data.errors.length > 0) {
|
||||
notifications.toasts.addDanger(data.errors[data.errors.length - 1]);
|
||||
}
|
||||
}, [data.errors, notifications.toasts]);
|
||||
|
||||
const buttonLabel = isRunning ? 'Stop development' : 'Start development';
|
||||
|
||||
return (
|
||||
|
|
|
@ -10,6 +10,7 @@ import type { IRouter, Logger } from '@kbn/core/server';
|
|||
import { streamFactory } from '@kbn/aiops-utils';
|
||||
|
||||
import {
|
||||
errorAction,
|
||||
reducerStreamRequestBodySchema,
|
||||
updateProgressAction,
|
||||
addToEntityAction,
|
||||
|
@ -38,8 +39,9 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
|
|||
shouldStop = true;
|
||||
});
|
||||
|
||||
const { end, error, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
|
||||
request.headers
|
||||
const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
|
||||
request.headers,
|
||||
logger
|
||||
);
|
||||
|
||||
const entities = [
|
||||
|
@ -84,18 +86,17 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
|
|||
push(deleteEntityAction(randomEntity));
|
||||
} else if (randomAction === 'throw-error') {
|
||||
// Throw an error. It should not crash Kibana!
|
||||
// It should be caught, logged and passed on as a stream error.
|
||||
// It should be caught and logged to the Kibana server console.
|
||||
throw new Error('There was a (simulated) server side error!');
|
||||
} else if (randomAction === 'emit-error') {
|
||||
// Directly emit an error to the stream, this will not be logged.
|
||||
error('Error pushed to the stream');
|
||||
// Emit an error as a stream action.
|
||||
push(errorAction('(Simulated) error pushed to the stream'));
|
||||
return;
|
||||
}
|
||||
|
||||
pushStreamUpdate();
|
||||
} catch (e) {
|
||||
logger.error(e);
|
||||
error(e);
|
||||
}
|
||||
}, Math.floor(Math.random() * maxTimeoutMs));
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) =
|
|||
shouldStop = true;
|
||||
});
|
||||
|
||||
const { end, error, push, responseWithHeaders } = streamFactory(request.headers);
|
||||
const { end, push, responseWithHeaders } = streamFactory(request.headers, logger);
|
||||
|
||||
const text =
|
||||
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';
|
||||
|
@ -62,7 +62,7 @@ export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) =
|
|||
end();
|
||||
}
|
||||
} catch (e) {
|
||||
error(`There was an error: ${e.toString()}`);
|
||||
logger.error(`There was an error: ${e.toString()}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
|
||||
import * as zlib from 'zlib';
|
||||
|
||||
import type { Logger } from '@kbn/logging';
|
||||
|
||||
import { streamFactory } from './stream_factory';
|
||||
|
||||
interface MockItem {
|
||||
|
@ -24,8 +26,14 @@ const mockItem2: MockItem = {
|
|||
};
|
||||
|
||||
describe('streamFactory', () => {
|
||||
let mockLogger: Logger;
|
||||
|
||||
beforeEach(() => {
|
||||
mockLogger = { error: jest.fn() } as unknown as Logger;
|
||||
});
|
||||
|
||||
it('should encode and receive an uncompressed string based stream', async () => {
|
||||
const { end, push, responseWithHeaders } = streamFactory({});
|
||||
const { end, push, responseWithHeaders } = streamFactory({}, mockLogger);
|
||||
|
||||
push('push1');
|
||||
push('push2');
|
||||
|
@ -41,7 +49,7 @@ describe('streamFactory', () => {
|
|||
});
|
||||
|
||||
it('should encode and receive an uncompressed NDJSON based stream', async () => {
|
||||
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({});
|
||||
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({}, mockLogger);
|
||||
|
||||
push(mockItem1);
|
||||
push(mockItem2);
|
||||
|
@ -74,9 +82,12 @@ describe('streamFactory', () => {
|
|||
// without the need for additional custom code.
|
||||
it('should encode and receive a compressed string based stream', (done) => {
|
||||
(async () => {
|
||||
const { end, push, responseWithHeaders } = streamFactory({
|
||||
'accept-encoding': 'gzip',
|
||||
});
|
||||
const { end, push, responseWithHeaders } = streamFactory(
|
||||
{
|
||||
'accept-encoding': 'gzip',
|
||||
},
|
||||
mockLogger
|
||||
);
|
||||
|
||||
push('push1');
|
||||
push('push2');
|
||||
|
@ -104,9 +115,12 @@ describe('streamFactory', () => {
|
|||
|
||||
it('should encode and receive a compressed NDJSON based stream', (done) => {
|
||||
(async () => {
|
||||
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({
|
||||
'accept-encoding': 'gzip',
|
||||
});
|
||||
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>(
|
||||
{
|
||||
'accept-encoding': 'gzip',
|
||||
},
|
||||
mockLogger
|
||||
);
|
||||
|
||||
push(mockItem1);
|
||||
push(mockItem2);
|
||||
|
@ -140,49 +154,49 @@ describe('streamFactory', () => {
|
|||
})();
|
||||
});
|
||||
|
||||
it('should throw when a string based stream receives a non-string chunk', async () => {
|
||||
const { push } = streamFactory({});
|
||||
it('should log an error when a string based stream receives a non-string chunk', async () => {
|
||||
const { push } = streamFactory({}, mockLogger);
|
||||
|
||||
// First push initializes the stream as string based.
|
||||
expect(() => {
|
||||
push('push1');
|
||||
}).not.toThrow();
|
||||
push('push1');
|
||||
expect(mockLogger.error).toHaveBeenCalledTimes(0);
|
||||
|
||||
// Second push is again a string and should not throw.
|
||||
expect(() => {
|
||||
push('push2');
|
||||
}).not.toThrow();
|
||||
push('push2');
|
||||
expect(mockLogger.error).toHaveBeenCalledTimes(0);
|
||||
|
||||
// Third push is not a string and should trigger an error.
|
||||
expect(() => {
|
||||
push({ myObject: 'push3' } as unknown as string);
|
||||
}).toThrow('Must not push non-string chunks to a string based stream.');
|
||||
push({ myObject: 'push3' } as unknown as string);
|
||||
expect(mockLogger.error).toHaveBeenCalledTimes(1);
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
'Must not push non-string chunks to a string based stream.'
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw when an NDJSON based stream receives a string chunk', async () => {
|
||||
const { push } = streamFactory<MockItem>({});
|
||||
it('should log an error when an NDJSON based stream receives a string chunk', async () => {
|
||||
const { push } = streamFactory<MockItem>({}, mockLogger);
|
||||
|
||||
// First push initializes the stream as NDJSON based.
|
||||
expect(() => {
|
||||
push(mockItem1);
|
||||
}).not.toThrow();
|
||||
push(mockItem1);
|
||||
expect(mockLogger.error).toHaveBeenCalledTimes(0);
|
||||
|
||||
// Second push is again a valid object and should not throw.
|
||||
expect(() => {
|
||||
push(mockItem1);
|
||||
}).not.toThrow();
|
||||
push(mockItem1);
|
||||
expect(mockLogger.error).toHaveBeenCalledTimes(0);
|
||||
|
||||
// Third push is a string and should trigger an error.
|
||||
expect(() => {
|
||||
push('push3' as unknown as MockItem);
|
||||
}).toThrow('Must not push raw string chunks to an NDJSON based stream.');
|
||||
push('push3' as unknown as MockItem);
|
||||
expect(mockLogger.error).toHaveBeenCalledTimes(1);
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
'Must not push raw string chunks to an NDJSON based stream.'
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw for undefined as push value', async () => {
|
||||
const { push } = streamFactory({});
|
||||
it('should log an error for undefined as push value', async () => {
|
||||
const { push } = streamFactory({}, mockLogger);
|
||||
|
||||
expect(() => {
|
||||
push(undefined as unknown as string);
|
||||
}).toThrow('Stream chunk must not be undefined.');
|
||||
push(undefined as unknown as string);
|
||||
expect(mockLogger.error).toHaveBeenCalledTimes(1);
|
||||
expect(mockLogger.error).toHaveBeenCalledWith('Stream chunk must not be undefined.');
|
||||
});
|
||||
});
|
||||
|
|
|
@ -8,6 +8,8 @@
|
|||
import { Stream } from 'stream';
|
||||
import * as zlib from 'zlib';
|
||||
|
||||
import type { Logger } from '@kbn/logging';
|
||||
|
||||
import { acceptCompression } from './accept_compression';
|
||||
|
||||
/**
|
||||
|
@ -30,7 +32,6 @@ type StreamType = 'string' | 'ndjson';
|
|||
interface StreamFactoryReturnType<T = unknown> {
|
||||
DELIMITER: string;
|
||||
end: () => void;
|
||||
error: (errorText: string) => void;
|
||||
push: (d: T) => void;
|
||||
responseWithHeaders: {
|
||||
body: zlib.Gzip | ResponseStream;
|
||||
|
@ -47,7 +48,10 @@ interface StreamFactoryReturnType<T = unknown> {
|
|||
* @param headers - Request headers.
|
||||
* @returns An object with stream attributes and methods.
|
||||
*/
|
||||
export function streamFactory<T = string>(headers: Headers): StreamFactoryReturnType<T>;
|
||||
export function streamFactory<T = string>(
|
||||
headers: Headers,
|
||||
logger: Logger
|
||||
): StreamFactoryReturnType<T>;
|
||||
/**
|
||||
* Sets up a response stream with support for gzip compression depending on provided
|
||||
* request headers. Any non-string data pushed to the stream will be stream as NDJSON.
|
||||
|
@ -55,23 +59,22 @@ export function streamFactory<T = string>(headers: Headers): StreamFactoryReturn
|
|||
* @param headers - Request headers.
|
||||
* @returns An object with stream attributes and methods.
|
||||
*/
|
||||
export function streamFactory<T = unknown>(headers: Headers): StreamFactoryReturnType<T> {
|
||||
export function streamFactory<T = unknown>(
|
||||
headers: Headers,
|
||||
logger: Logger
|
||||
): StreamFactoryReturnType<T> {
|
||||
let streamType: StreamType;
|
||||
const isCompressed = acceptCompression(headers);
|
||||
|
||||
const stream = isCompressed ? zlib.createGzip() : new ResponseStream();
|
||||
|
||||
function error(errorText: string) {
|
||||
stream.emit('error', errorText);
|
||||
}
|
||||
|
||||
function end() {
|
||||
stream.end();
|
||||
}
|
||||
|
||||
function push(d: T) {
|
||||
if (d === undefined) {
|
||||
error('Stream chunk must not be undefined.');
|
||||
logger.error('Stream chunk must not be undefined.');
|
||||
return;
|
||||
}
|
||||
// Initialize the stream type with the first push to the stream,
|
||||
|
@ -79,10 +82,10 @@ export function streamFactory<T = unknown>(headers: Headers): StreamFactoryRetur
|
|||
if (streamType === undefined) {
|
||||
streamType = typeof d === 'string' ? 'string' : 'ndjson';
|
||||
} else if (streamType === 'string' && typeof d !== 'string') {
|
||||
error('Must not push non-string chunks to a string based stream.');
|
||||
logger.error('Must not push non-string chunks to a string based stream.');
|
||||
return;
|
||||
} else if (streamType === 'ndjson' && typeof d === 'string') {
|
||||
error('Must not push raw string chunks to an NDJSON based stream.');
|
||||
logger.error('Must not push raw string chunks to an NDJSON based stream.');
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -90,7 +93,8 @@ export function streamFactory<T = unknown>(headers: Headers): StreamFactoryRetur
|
|||
const line = typeof d !== 'string' ? `${JSON.stringify(d)}${DELIMITER}` : d;
|
||||
stream.write(line);
|
||||
} catch (e) {
|
||||
error(`Could not serialize or stream data chunk: ${e.toString()}`);
|
||||
logger.error(`Could not serialize or stream data chunk: ${e.toString()}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Calling .flush() on a compression stream will
|
||||
|
@ -111,5 +115,5 @@ export function streamFactory<T = unknown>(headers: Headers): StreamFactoryRetur
|
|||
: {}),
|
||||
};
|
||||
|
||||
return { DELIMITER, end, error, push, responseWithHeaders };
|
||||
return { DELIMITER, end, push, responseWithHeaders };
|
||||
}
|
||||
|
|
|
@ -5,29 +5,11 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { schema, TypeOf } from '@kbn/config-schema';
|
||||
|
||||
import type { ChangePoint } from '../types';
|
||||
|
||||
export const aiopsExplainLogRateSpikesSchema = schema.object({
|
||||
start: schema.number(),
|
||||
end: schema.number(),
|
||||
kuery: schema.string(),
|
||||
timeFieldName: schema.string(),
|
||||
includeFrozen: schema.maybe(schema.boolean()),
|
||||
/** Analysis selection time ranges */
|
||||
baselineMin: schema.number(),
|
||||
baselineMax: schema.number(),
|
||||
deviationMin: schema.number(),
|
||||
deviationMax: schema.number(),
|
||||
/** The index to query for log rate spikes */
|
||||
index: schema.string(),
|
||||
});
|
||||
|
||||
export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;
|
||||
import type { ChangePoint } from '../../types';
|
||||
|
||||
export const API_ACTION_NAME = {
|
||||
ADD_CHANGE_POINTS: 'add_change_points',
|
||||
ERROR: 'error',
|
||||
UPDATE_LOADING_STATE: 'update_loading_state',
|
||||
} as const;
|
||||
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];
|
||||
|
@ -37,7 +19,7 @@ interface ApiActionAddChangePoints {
|
|||
payload: ChangePoint[];
|
||||
}
|
||||
|
||||
export function addChangePoints(
|
||||
export function addChangePointsAction(
|
||||
payload: ApiActionAddChangePoints['payload']
|
||||
): ApiActionAddChangePoints {
|
||||
return {
|
||||
|
@ -46,6 +28,18 @@ export function addChangePoints(
|
|||
};
|
||||
}
|
||||
|
||||
interface ApiActionError {
|
||||
type: typeof API_ACTION_NAME.ERROR;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export function errorAction(payload: ApiActionError['payload']): ApiActionError {
|
||||
return {
|
||||
type: API_ACTION_NAME.ERROR,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
interface ApiActionUpdateLoadingState {
|
||||
type: typeof API_ACTION_NAME.UPDATE_LOADING_STATE;
|
||||
payload: {
|
||||
|
@ -66,4 +60,5 @@ export function updateLoadingStateAction(
|
|||
|
||||
export type AiopsExplainLogRateSpikesApiAction =
|
||||
| ApiActionAddChangePoints
|
||||
| ApiActionError
|
||||
| ApiActionUpdateLoadingState;
|
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
export {
|
||||
addChangePointsAction,
|
||||
errorAction,
|
||||
updateLoadingStateAction,
|
||||
API_ACTION_NAME,
|
||||
} from './actions';
|
||||
export type { AiopsExplainLogRateSpikesApiAction } from './actions';
|
||||
|
||||
export { aiopsExplainLogRateSpikesSchema } from './schema';
|
||||
export type { AiopsExplainLogRateSpikesSchema } from './schema';
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { schema, TypeOf } from '@kbn/config-schema';
|
||||
|
||||
export const aiopsExplainLogRateSpikesSchema = schema.object({
|
||||
start: schema.number(),
|
||||
end: schema.number(),
|
||||
kuery: schema.string(),
|
||||
timeFieldName: schema.string(),
|
||||
includeFrozen: schema.maybe(schema.boolean()),
|
||||
/** Analysis selection time ranges */
|
||||
baselineMin: schema.number(),
|
||||
baselineMax: schema.number(),
|
||||
deviationMin: schema.number(),
|
||||
deviationMax: schema.number(),
|
||||
/** The index to query for log rate spikes */
|
||||
index: schema.string(),
|
||||
});
|
||||
|
||||
export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;
|
|
@ -9,7 +9,10 @@
|
|||
"description": "AIOps plugin maintained by ML team.",
|
||||
"server": true,
|
||||
"ui": true,
|
||||
"requiredPlugins": ["data"],
|
||||
"requiredPlugins": [
|
||||
"data",
|
||||
"licensing"
|
||||
],
|
||||
"optionalPlugins": [],
|
||||
"requiredBundles": ["kibanaReact"],
|
||||
"extraPublicDirs": ["common"]
|
||||
|
|
12
x-pack/plugins/aiops/server/lib/license.ts
Normal file
12
x-pack/plugins/aiops/server/lib/license.ts
Normal file
|
@ -0,0 +1,12 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { ILicense, LicenseType } from '@kbn/licensing-plugin/common/types';
|
||||
|
||||
export function isActiveLicense(licenseType: LicenseType, license?: ILicense): boolean {
|
||||
return (license && license.isActive && license.hasAtLeast(licenseType)) || false;
|
||||
}
|
|
@ -5,12 +5,16 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Subscription } from 'rxjs';
|
||||
|
||||
import { PluginInitializerContext, CoreSetup, CoreStart, Plugin, Logger } from '@kbn/core/server';
|
||||
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
|
||||
|
||||
import { AIOPS_ENABLED } from '../common';
|
||||
|
||||
import { isActiveLicense } from './lib/license';
|
||||
import {
|
||||
AiopsLicense,
|
||||
AiopsPluginSetup,
|
||||
AiopsPluginStart,
|
||||
AiopsPluginSetupDeps,
|
||||
|
@ -22,19 +26,29 @@ export class AiopsPlugin
|
|||
implements Plugin<AiopsPluginSetup, AiopsPluginStart, AiopsPluginSetupDeps, AiopsPluginStartDeps>
|
||||
{
|
||||
private readonly logger: Logger;
|
||||
private licenseSubscription: Subscription | null = null;
|
||||
|
||||
constructor(initializerContext: PluginInitializerContext) {
|
||||
this.logger = initializerContext.logger.get();
|
||||
}
|
||||
|
||||
public setup(core: CoreSetup<AiopsPluginStartDeps>, deps: AiopsPluginSetupDeps) {
|
||||
public setup(core: CoreSetup<AiopsPluginStartDeps>, plugins: AiopsPluginSetupDeps) {
|
||||
this.logger.debug('aiops: Setup');
|
||||
|
||||
// Subscribe to license changes and store the current license in `currentLicense`.
|
||||
// This way we can pass on license changes to the route factory having always
|
||||
// the current license because it's stored in a mutable attribute.
|
||||
const aiopsLicense: AiopsLicense = { isActivePlatinumLicense: false };
|
||||
this.licenseSubscription = plugins.licensing.license$.subscribe(async (license) => {
|
||||
aiopsLicense.isActivePlatinumLicense = isActiveLicense('platinum', license);
|
||||
});
|
||||
|
||||
const router = core.http.createRouter<DataRequestHandlerContext>();
|
||||
|
||||
// Register server side APIs
|
||||
if (AIOPS_ENABLED) {
|
||||
core.getStartServices().then(([_, depsStart]) => {
|
||||
defineExplainLogRateSpikesRoute(router, this.logger);
|
||||
defineExplainLogRateSpikesRoute(router, aiopsLicense, this.logger);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -46,5 +60,8 @@ export class AiopsPlugin
|
|||
return {};
|
||||
}
|
||||
|
||||
public stop() {}
|
||||
public stop() {
|
||||
this.logger.debug('aiops: Stop');
|
||||
this.licenseSubscription?.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,19 +7,23 @@
|
|||
|
||||
import { chunk } from 'lodash';
|
||||
|
||||
import type { IRouter, Logger } from '@kbn/core/server';
|
||||
import type { IRouter } from '@kbn/core/server';
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
|
||||
import { streamFactory } from '@kbn/aiops-utils';
|
||||
|
||||
import {
|
||||
addChangePoints,
|
||||
addChangePointsAction,
|
||||
aiopsExplainLogRateSpikesSchema,
|
||||
errorAction,
|
||||
updateLoadingStateAction,
|
||||
AiopsExplainLogRateSpikesApiAction,
|
||||
} from '../../common/api/explain_log_rate_spikes';
|
||||
import { API_ENDPOINT } from '../../common/api';
|
||||
import type { ChangePoint } from '../../common/types';
|
||||
|
||||
import type { AiopsLicense } from '../types';
|
||||
|
||||
import { fetchFieldCandidates } from './queries/fetch_field_candidates';
|
||||
import { fetchChangePointPValues } from './queries/fetch_change_point_p_values';
|
||||
|
||||
|
@ -29,6 +33,7 @@ const PROGRESS_STEP_P_VALUES = 0.8;
|
|||
|
||||
export const defineExplainLogRateSpikesRoute = (
|
||||
router: IRouter<DataRequestHandlerContext>,
|
||||
license: AiopsLicense,
|
||||
logger: Logger
|
||||
) => {
|
||||
router.post(
|
||||
|
@ -39,6 +44,10 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
},
|
||||
},
|
||||
async (context, request, response) => {
|
||||
if (!license.isActivePlatinumLicense) {
|
||||
return response.forbidden();
|
||||
}
|
||||
|
||||
const client = (await context.core).elasticsearch.client.asCurrentUser;
|
||||
|
||||
const controller = new AbortController();
|
||||
|
@ -55,7 +64,8 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
});
|
||||
|
||||
const { end, push, responseWithHeaders } = streamFactory<AiopsExplainLogRateSpikesApiAction>(
|
||||
request.headers
|
||||
request.headers,
|
||||
logger
|
||||
);
|
||||
|
||||
// Async IIFE to run the analysis while not blocking returning `responseWithHeaders`.
|
||||
|
@ -68,7 +78,14 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
})
|
||||
);
|
||||
|
||||
const { fieldCandidates } = await fetchFieldCandidates(client, request.body);
|
||||
let fieldCandidates: Awaited<ReturnType<typeof fetchFieldCandidates>>;
|
||||
try {
|
||||
fieldCandidates = await fetchFieldCandidates(client, request.body);
|
||||
} catch (e) {
|
||||
push(errorAction(e.toString()));
|
||||
end();
|
||||
return;
|
||||
}
|
||||
|
||||
if (fieldCandidates.length > 0) {
|
||||
loaded += LOADED_FIELD_CANDIDATES;
|
||||
|
@ -96,11 +113,14 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize);
|
||||
|
||||
for (const fieldCandidatesChunk of fieldCandidatesChunks) {
|
||||
const { changePoints: pValues } = await fetchChangePointPValues(
|
||||
client,
|
||||
request.body,
|
||||
fieldCandidatesChunk
|
||||
);
|
||||
let pValues: Awaited<ReturnType<typeof fetchChangePointPValues>>;
|
||||
try {
|
||||
pValues = await fetchChangePointPValues(client, request.body, fieldCandidatesChunk);
|
||||
} catch (e) {
|
||||
push(errorAction(e.toString()));
|
||||
end();
|
||||
return;
|
||||
}
|
||||
|
||||
if (pValues.length > 0) {
|
||||
pValues.forEach((d) => {
|
||||
|
@ -111,7 +131,7 @@ export const defineExplainLogRateSpikesRoute = (
|
|||
|
||||
loaded += (1 / fieldCandidatesChunks.length) * PROGRESS_STEP_P_VALUES;
|
||||
if (pValues.length > 0) {
|
||||
push(addChangePoints(pValues));
|
||||
push(addChangePointsAction(pValues));
|
||||
}
|
||||
push(
|
||||
updateLoadingStateAction({
|
||||
|
|
|
@ -90,7 +90,7 @@ export const fetchChangePointPValues = async (
|
|||
esClient: ElasticsearchClient,
|
||||
params: AiopsExplainLogRateSpikesSchema,
|
||||
fieldNames: string[]
|
||||
) => {
|
||||
): Promise<ChangePoint[]> => {
|
||||
const result: ChangePoint[] = [];
|
||||
|
||||
for (const fieldName of fieldNames) {
|
||||
|
@ -119,7 +119,5 @@ export const fetchChangePointPValues = async (
|
|||
}
|
||||
}
|
||||
|
||||
return {
|
||||
changePoints: uniqBy(result, (d) => `${d.fieldName},${d.fieldValue}`),
|
||||
};
|
||||
return uniqBy(result, (d) => `${d.fieldName},${d.fieldValue}`);
|
||||
};
|
||||
|
|
|
@ -95,9 +95,7 @@ describe('query_field_candidates', () => {
|
|||
|
||||
const resp = await fetchFieldCandidates(esClientMock, params);
|
||||
|
||||
expect(resp).toEqual({
|
||||
fieldCandidates: ['myIpFieldName', 'myKeywordFieldName'],
|
||||
});
|
||||
expect(resp).toEqual(['myIpFieldName', 'myKeywordFieldName']);
|
||||
expect(esClientFieldCapsMock).toHaveBeenCalledTimes(1);
|
||||
expect(esClientSearchMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
|
|
@ -45,7 +45,7 @@ export const getRandomDocsRequest = (
|
|||
export const fetchFieldCandidates = async (
|
||||
esClient: ElasticsearchClient,
|
||||
params: AiopsExplainLogRateSpikesSchema
|
||||
): Promise<{ fieldCandidates: string[] }> => {
|
||||
): Promise<string[]> => {
|
||||
const { index } = params;
|
||||
// Get all supported fields
|
||||
const respMapping = await esClient.fieldCaps({
|
||||
|
@ -78,7 +78,5 @@ export const fetchFieldCandidates = async (
|
|||
}
|
||||
});
|
||||
|
||||
return {
|
||||
fieldCandidates: [...finalFieldCandidates],
|
||||
};
|
||||
return [...finalFieldCandidates];
|
||||
};
|
||||
|
|
|
@ -5,10 +5,12 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { PluginSetup, PluginStart } from '@kbn/data-plugin/server';
|
||||
import type { PluginSetup, PluginStart } from '@kbn/data-plugin/server';
|
||||
import type { LicensingPluginStart } from '@kbn/licensing-plugin/server';
|
||||
|
||||
export interface AiopsPluginSetupDeps {
|
||||
data: PluginSetup;
|
||||
licensing: LicensingPluginStart;
|
||||
}
|
||||
|
||||
export interface AiopsPluginStartDeps {
|
||||
|
@ -26,3 +28,7 @@ export interface AiopsPluginSetup {}
|
|||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
export interface AiopsPluginStart {}
|
||||
|
||||
export interface AiopsLicense {
|
||||
isActivePlatinumLicense: boolean;
|
||||
}
|
||||
|
|
|
@ -36,7 +36,10 @@ export default ({ getService }: FtrProviderContext) => {
|
|||
const expected = {
|
||||
chunksLength: 7,
|
||||
actionsLength: 6,
|
||||
noIndexChunksLength: 3,
|
||||
noIndexActionsLength: 2,
|
||||
actionFilter: 'add_change_points',
|
||||
errorFilter: 'error',
|
||||
changePoints: [
|
||||
{
|
||||
fieldName: 'day_of_week',
|
||||
|
@ -129,6 +132,9 @@ export default ({ getService }: FtrProviderContext) => {
|
|||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
|
||||
expect(response.ok).to.be(true);
|
||||
expect(response.status).to.be(200);
|
||||
|
||||
const stream = response.body;
|
||||
|
||||
expect(stream).not.to.be(null);
|
||||
|
@ -164,5 +170,41 @@ export default ({ getService }: FtrProviderContext) => {
|
|||
});
|
||||
}
|
||||
});
|
||||
|
||||
it('should return an error for non existing index without streaming', async () => {
|
||||
const resp = await supertest
|
||||
.post(`/internal/aiops/explain_log_rate_spikes`)
|
||||
.set('kbn-xsrf', 'kibana')
|
||||
.send({
|
||||
...requestBody,
|
||||
index: 'does_not_exist',
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
const chunks: string[] = resp.body.toString().split('\n');
|
||||
|
||||
expect(chunks.length).to.be(expected.noIndexChunksLength);
|
||||
|
||||
const lastChunk = chunks.pop();
|
||||
expect(lastChunk).to.be('');
|
||||
|
||||
let data: any[] = [];
|
||||
|
||||
expect(() => {
|
||||
data = chunks.map((c) => JSON.parse(c));
|
||||
}).not.to.throwError();
|
||||
|
||||
expect(data.length).to.be(expected.noIndexActionsLength);
|
||||
data.forEach((d) => {
|
||||
expect(typeof d.type).to.be('string');
|
||||
});
|
||||
|
||||
const errorActions = data.filter((d) => d.type === expected.errorFilter);
|
||||
expect(errorActions.length).to.be(1);
|
||||
|
||||
expect(errorActions[0].payload).to.be(
|
||||
'ResponseError: index_not_found_exception: [index_not_found_exception] Reason: no such index [does_not_exist]'
|
||||
);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
|
|
@ -11,7 +11,7 @@ import { FtrProviderContext } from '../../ftr_provider_context';
|
|||
|
||||
export default function ({ loadTestFile }: FtrProviderContext) {
|
||||
describe('AIOps', function () {
|
||||
this.tags(['ml']);
|
||||
this.tags(['aiops']);
|
||||
|
||||
if (AIOPS_ENABLED) {
|
||||
loadTestFile(require.resolve('./explain_log_rate_spikes'));
|
||||
|
|
17
x-pack/test/api_integration_basic/apis/aiops/index.ts
Normal file
17
x-pack/test/api_integration_basic/apis/aiops/index.ts
Normal file
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
|
||||
export default function ({ loadTestFile }: FtrProviderContext) {
|
||||
describe('aiops basic license', function () {
|
||||
this.tags(['aiops']);
|
||||
|
||||
// The aiops API should return forbidden when called without a trial/platinum license.
|
||||
loadTestFile(require.resolve('./permissions'));
|
||||
});
|
||||
}
|
57
x-pack/test/api_integration_basic/apis/aiops/permissions.ts
Normal file
57
x-pack/test/api_integration_basic/apis/aiops/permissions.ts
Normal file
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import fetch from 'node-fetch';
|
||||
import { format as formatUrl } from 'url';
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
|
||||
import type { ApiExplainLogRateSpikes } from '@kbn/aiops-plugin/common/api';
|
||||
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
|
||||
export default ({ getService }: FtrProviderContext) => {
|
||||
const supertest = getService('supertest');
|
||||
const config = getService('config');
|
||||
const kibanaServerUrl = formatUrl(config.get('servers.kibana'));
|
||||
|
||||
const requestBody: ApiExplainLogRateSpikes['body'] = {
|
||||
baselineMax: 1561719083292,
|
||||
baselineMin: 1560954147006,
|
||||
deviationMax: 1562254538692,
|
||||
deviationMin: 1561986810992,
|
||||
end: 2147483647000,
|
||||
index: 'ft_ecommerce',
|
||||
kuery: '',
|
||||
start: 0,
|
||||
timeFieldName: 'order_date',
|
||||
};
|
||||
|
||||
describe('POST /internal/aiops/explain_log_rate_spikes', () => {
|
||||
it('should return permission denied without streaming', async () => {
|
||||
await supertest
|
||||
.post(`/internal/aiops/explain_log_rate_spikes`)
|
||||
.set('kbn-xsrf', 'kibana')
|
||||
.send(requestBody)
|
||||
.expect(403);
|
||||
});
|
||||
|
||||
it('should return permission denied with streaming', async () => {
|
||||
const response = await fetch(`${kibanaServerUrl}/internal/aiops/explain_log_rate_spikes`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'kbn-xsrf': 'stream',
|
||||
},
|
||||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
|
||||
expect(response.ok).to.be(false);
|
||||
expect(response.status).to.be(403);
|
||||
});
|
||||
});
|
||||
};
|
|
@ -9,6 +9,7 @@ import { FtrProviderContext } from '../ftr_provider_context';
|
|||
|
||||
export default function ({ loadTestFile }: FtrProviderContext) {
|
||||
describe('apis', function () {
|
||||
loadTestFile(require.resolve('./aiops'));
|
||||
loadTestFile(require.resolve('./transform'));
|
||||
loadTestFile(require.resolve('./security_solution'));
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue