[Synthetics] stream results back for project monitors (#138069) (#138848)

* stream results back for private locations

* update types

* consolidate apis

* adjust tests

* Fix API tests.

* Define types more clearly.

* Reintroduce default method.

* Add error handling to observable.

Co-authored-by: Justin Kambic <jk@elastic.co>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
(cherry picked from commit 4990478dea)

Co-authored-by: Dominique Clarke <dominique.clarke@elastic.co>
This commit is contained in:
Kibana Machine 2022-08-15 17:24:43 -04:00 committed by GitHub
parent 9463618539
commit 69dfad2689
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 567 additions and 277 deletions

View file

@ -6,7 +6,7 @@
* Side Public License, v 1.
*/
import type {
import {
CoreStart,
PluginInitializerContext,
CoreSetup,
@ -14,6 +14,9 @@ import type {
Logger,
KibanaRequest,
StartServicesAccessor,
RequestHandlerContext,
RequestHandler,
KibanaResponseFactory,
} from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { map$ } from '@kbn/std';
@ -46,7 +49,12 @@ export interface BfetchServerSetup {
) => void;
addStreamingResponseRoute: <Payload, Response>(
path: string,
params: (request: KibanaRequest) => StreamingResponseHandler<Payload, Response>
params: (
request: KibanaRequest,
context: RequestHandlerContext
) => StreamingResponseHandler<Payload, Response>,
method?: 'GET' | 'POST' | 'PUT' | 'DELETE',
pluginRouter?: ReturnType<CoreSetup['http']['createRouter']>
) => void;
}
@ -59,6 +67,9 @@ const streamingHeaders = {
'Transfer-Encoding': 'chunked',
};
interface Query {
compress: boolean;
}
export class BfetchServerPlugin
implements
Plugin<
@ -105,25 +116,45 @@ export class BfetchServerPlugin
router: ReturnType<CoreSetup['http']['createRouter']>;
logger: Logger;
}): BfetchServerSetup['addStreamingResponseRoute'] =>
(path, handler) => {
router.post(
{
path: `/${removeLeadingSlash(path)}`,
validate: {
body: schema.any(),
query: schema.object({ compress: schema.boolean({ defaultValue: false }) }),
},
(path, handler, method = 'POST', pluginRouter) => {
const httpRouter = pluginRouter || router;
const routeDefinition = {
path: `/${removeLeadingSlash(path)}`,
validate: {
body: schema.any(),
query: schema.object({ compress: schema.boolean({ defaultValue: false }) }),
},
async (context, request, response) => {
const handlerInstance = handler(request);
const data = request.body;
const compress = request.query.compress;
return response.ok({
headers: streamingHeaders,
body: createStream(handlerInstance.getResponseStream(data), logger, compress),
});
}
);
};
const routeHandler: RequestHandler<unknown, Query> = async (
context: RequestHandlerContext,
request: KibanaRequest<unknown, Query, any>,
response: KibanaResponseFactory
) => {
const handlerInstance = handler(request, context);
const data = request.body;
const compress = request.query.compress;
return response.ok({
headers: streamingHeaders,
body: createStream(handlerInstance.getResponseStream(data), logger, compress),
});
};
switch (method) {
case 'GET':
httpRouter.get(routeDefinition, routeHandler);
break;
case 'POST':
httpRouter.post(routeDefinition, routeHandler);
break;
case 'PUT':
httpRouter.put(routeDefinition, routeHandler);
break;
case 'DELETE':
httpRouter.delete(routeDefinition, routeHandler);
break;
default:
throw new Error(`Handler for method ${method} is not defined`);
}
};
private addBatchProcessingRoute =

View file

@ -21,7 +21,8 @@
"triggersActionsUi",
"usageCollection",
"unifiedSearch",
"spaces"
"spaces",
"bfetch"
],
"server": true,
"ui": true,

View file

@ -29,6 +29,7 @@ import { SecurityPluginStart } from '@kbn/security-plugin/server';
import { CloudSetup } from '@kbn/cloud-plugin/server';
import { SpacesPluginSetup } from '@kbn/spaces-plugin/server';
import { FleetStartContract } from '@kbn/fleet-plugin/server';
import { BfetchServerSetup } from '@kbn/bfetch-plugin/server';
import { UptimeESClient } from '../../lib';
import type { TelemetryEventsSender } from '../../telemetry/sender';
import type { UptimeRouter } from '../../../../types';
@ -76,6 +77,7 @@ export interface UptimeCorePluginsSetup {
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup;
taskManager: TaskManagerSetupContract;
telemetry: TelemetryPluginSetup;
bfetch: BfetchServerSetup;
}
export interface UptimeCorePluginsStart {

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import type { Subject } from 'rxjs';
import { ObjectType } from '@kbn/config-schema';
import {
RequestHandler,
@ -20,13 +21,24 @@ import { UMServerLibs, UptimeESClient } from '../lib/lib';
import type { UptimeRequestHandlerContext } from '../../types';
import { UptimeServerSetup } from '../lib/adapters';
export type SyntheticsRequest = KibanaRequest<
Record<string, any>,
Record<string, any>,
Record<string, any>
>;
/**
* Defines the basic properties employed by Uptime routes.
*/
export interface UMServerRoute<T> {
method: string;
method: 'GET' | 'PUT' | 'POST' | 'DELETE';
writeAccess?: boolean;
handler: T;
streamHandler?: (
context: UptimeRequestHandlerContext,
request: SyntheticsRequest,
subject: Subject<unknown>
) => IKibanaResponse<any> | Promise<IKibanaResponse<any>>;
}
/**
@ -56,6 +68,7 @@ export type UptimeRoute = UMRouteDefinition<UMRouteHandler>;
*/
export type UMRestApiRouteFactory = (libs: UMServerLibs) => UptimeRoute;
export type SyntheticsRestApiRouteFactory = (libs: UMServerLibs) => SyntheticsRoute;
export type SyntheticsStreamingRouteFactory = (libs: UMServerLibs) => SyntheticsStreamingRoute;
/**
* Functions of this type accept our internal route format and output a route
@ -67,9 +80,10 @@ export type UMKibanaRouteWrapper = (
) => UMKibanaRoute;
export type SyntheticsRoute = UMRouteDefinition<SyntheticsRouteHandler>;
export type SyntheticsStreamingRoute = UMRouteDefinition<SyntheticsStreamingRouteHandler>;
export type SyntheticsRouteWrapper = (
uptimeRoute: SyntheticsRoute,
uptimeRoute: SyntheticsRoute | SyntheticsStreamingRoute,
server: UptimeServerSetup,
syntheticsMonitorClient: SyntheticsMonitorClient
) => UMKibanaRoute;
@ -84,13 +98,15 @@ export type UMRouteHandler = ({
response,
server,
savedObjectsClient,
subject,
}: {
uptimeEsClient: UptimeESClient;
context: UptimeRequestHandlerContext;
request: KibanaRequest<Record<string, any>, Record<string, any>, Record<string, any>>;
request: SyntheticsRequest;
response: KibanaResponseFactory;
savedObjectsClient: SavedObjectsClientContract;
server: UptimeServerSetup;
subject?: Subject<unknown>;
}) => IKibanaResponse<any> | Promise<IKibanaResponse<any>>;
export type SyntheticsRouteHandler = ({
@ -100,12 +116,31 @@ export type SyntheticsRouteHandler = ({
response,
server,
savedObjectsClient,
subject: Subject,
}: {
uptimeEsClient: UptimeESClient;
context: UptimeRequestHandlerContext;
request: KibanaRequest<Record<string, any>, Record<string, any>, Record<string, any>>;
request: SyntheticsRequest;
response: KibanaResponseFactory;
savedObjectsClient: SavedObjectsClientContract;
server: UptimeServerSetup;
syntheticsMonitorClient: SyntheticsMonitorClient;
subject?: Subject<unknown>;
}) => IKibanaResponse<any> | Promise<IKibanaResponse<any>>;
export type SyntheticsStreamingRouteHandler = ({
uptimeEsClient,
context,
request,
server,
savedObjectsClient,
subject: Subject,
}: {
uptimeEsClient: UptimeESClient;
context: UptimeRequestHandlerContext;
request: SyntheticsRequest;
savedObjectsClient: SavedObjectsClientContract;
server: UptimeServerSetup;
syntheticsMonitorClient: SyntheticsMonitorClient;
subject?: Subject<unknown>;
}) => IKibanaResponse<any> | Promise<IKibanaResponse<any>>;

View file

@ -101,7 +101,7 @@ export class Plugin implements PluginType {
initUptimeServer(this.server, plugins, ruleDataClient, this.logger);
initSyntheticsServer(this.server, this.syntheticsMonitorClient);
initSyntheticsServer(this.server, this.syntheticsMonitorClient, plugins);
registerUptimeSavedObjects(core.savedObjects, plugins.encryptedSavedObjects);

View file

@ -8,13 +8,14 @@
import { UMServerLibs } from '../legacy_uptime/lib/lib';
import {
SyntheticsRestApiRouteFactory,
SyntheticsStreamingRouteFactory,
SyntheticsRoute,
SyntheticsRouteHandler,
} from '../legacy_uptime/routes';
export const createSyntheticsRouteWithAuth = (
libs: UMServerLibs,
routeCreator: SyntheticsRestApiRouteFactory
routeCreator: SyntheticsRestApiRouteFactory | SyntheticsStreamingRouteFactory
): SyntheticsRoute => {
const restRoute = routeCreator(libs);
const { handler, method, path, options, ...rest } = restRoute;

View file

@ -26,10 +26,12 @@ import { installIndexTemplatesRoute } from './synthetics_service/install_index_t
import { editSyntheticsMonitorRoute } from './monitor_cruds/edit_monitor';
import { addSyntheticsMonitorRoute } from './monitor_cruds/add_monitor';
import { addSyntheticsProjectMonitorRoute } from './monitor_cruds/add_monitor_project';
import { SyntheticsRestApiRouteFactory } from '../legacy_uptime/routes';
import {
SyntheticsRestApiRouteFactory,
SyntheticsStreamingRouteFactory,
} from '../legacy_uptime/routes';
export const syntheticsAppRestApiRoutes: SyntheticsRestApiRouteFactory[] = [
addSyntheticsProjectMonitorRoute,
addSyntheticsMonitorRoute,
getSyntheticsEnablementRoute,
deleteSyntheticsMonitorRoute,
@ -47,3 +49,7 @@ export const syntheticsAppRestApiRoutes: SyntheticsRestApiRouteFactory[] = [
getAPIKeySyntheticsRoute,
createGetMonitorStatusRoute,
];
export const syntheticsAppStreamingApiRoutes: SyntheticsStreamingRouteFactory[] = [
addSyntheticsProjectMonitorRoute,
];

View file

@ -8,12 +8,12 @@ import { schema } from '@kbn/config-schema';
import { UMServerLibs } from '../../legacy_uptime/lib/lib';
import { ProjectBrowserMonitor } from '../../../common/runtime_types';
import { SyntheticsRestApiRouteFactory } from '../../legacy_uptime/routes/types';
import { SyntheticsStreamingRouteFactory } from '../../legacy_uptime/routes/types';
import { API_URLS } from '../../../common/constants';
import { getAllLocations } from '../../synthetics_service/get_all_locations';
import { ProjectMonitorFormatter } from '../../synthetics_service/project_monitor_formatter';
export const addSyntheticsProjectMonitorRoute: SyntheticsRestApiRouteFactory = (
export const addSyntheticsProjectMonitorRoute: SyntheticsStreamingRouteFactory = (
libs: UMServerLibs
) => ({
method: 'PUT',
@ -27,46 +27,51 @@ export const addSyntheticsProjectMonitorRoute: SyntheticsRestApiRouteFactory = (
},
handler: async ({
request,
response,
savedObjectsClient,
server,
syntheticsMonitorClient,
subject,
}): Promise<any> => {
const monitors = (request.body?.monitors as ProjectBrowserMonitor[]) || [];
const spaceId = server.spaces.spacesService.getSpaceId(request);
const { keep_stale: keepStale, project: projectId } = request.body || {};
const { publicLocations, privateLocations } = await getAllLocations(
server,
syntheticsMonitorClient,
savedObjectsClient
);
const encryptedSavedObjectsClient = server.encryptedSavedObjects.getClient();
try {
const monitors = (request.body?.monitors as ProjectBrowserMonitor[]) || [];
const spaceId = server.spaces.spacesService.getSpaceId(request);
const { keep_stale: keepStale, project: projectId } = request.body || {};
const { publicLocations, privateLocations } = await getAllLocations(
server,
syntheticsMonitorClient,
savedObjectsClient
);
const encryptedSavedObjectsClient = server.encryptedSavedObjects.getClient();
const pushMonitorFormatter = new ProjectMonitorFormatter({
projectId,
spaceId,
keepStale,
locations: publicLocations,
privateLocations,
encryptedSavedObjectsClient,
savedObjectsClient,
monitors,
server,
syntheticsMonitorClient,
request,
});
const pushMonitorFormatter = new ProjectMonitorFormatter({
projectId,
spaceId,
keepStale,
locations: publicLocations,
privateLocations,
encryptedSavedObjectsClient,
savedObjectsClient,
monitors,
server,
syntheticsMonitorClient,
request,
subject,
});
await pushMonitorFormatter.configureAllProjectMonitors();
await pushMonitorFormatter.configureAllProjectMonitors();
return response.ok({
body: {
subject?.next({
createdMonitors: pushMonitorFormatter.createdMonitors,
updatedMonitors: pushMonitorFormatter.updatedMonitors,
staleMonitors: pushMonitorFormatter.staleMonitors,
deletedMonitors: pushMonitorFormatter.deletedMonitors,
failedMonitors: pushMonitorFormatter.failedMonitors,
failedStaleMonitors: pushMonitorFormatter.failedStaleMonitors,
},
});
});
} catch (error) {
subject?.error(error);
} finally {
subject?.complete();
}
},
});

View file

@ -4,18 +4,21 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subject } from 'rxjs';
import { UptimeRequestHandlerContext } from './types';
import { createSyntheticsRouteWithAuth } from './routes/create_route_with_auth';
import { SyntheticsMonitorClient } from './synthetics_service/synthetics_monitor/synthetics_monitor_client';
import { syntheticsRouteWrapper } from './synthetics_route_wrapper';
import { uptimeRequests } from './legacy_uptime/lib/requests';
import { syntheticsAppRestApiRoutes } from './routes';
import { UptimeServerSetup } from './legacy_uptime/lib/adapters';
import { syntheticsAppRestApiRoutes, syntheticsAppStreamingApiRoutes } from './routes';
import { UptimeServerSetup, UptimeCorePluginsSetup } from './legacy_uptime/lib/adapters';
import { licenseCheck } from './legacy_uptime/lib/domains';
import type { SyntheticsRequest } from './legacy_uptime/routes/types';
export const initSyntheticsServer = (
server: UptimeServerSetup,
syntheticsMonitorClient: SyntheticsMonitorClient
syntheticsMonitorClient: SyntheticsMonitorClient,
plugins: UptimeCorePluginsSetup
) => {
const libs = {
requests: uptimeRequests,
@ -52,4 +55,34 @@ export const initSyntheticsServer = (
throw new Error(`Handler for method ${method} is not defined`);
}
});
syntheticsAppStreamingApiRoutes.forEach((route) => {
const { method, streamHandler, path } = syntheticsRouteWrapper(
createSyntheticsRouteWithAuth(libs, route),
server,
syntheticsMonitorClient
);
plugins.bfetch.addStreamingResponseRoute<string, unknown>(
path,
(request, context) => {
return {
getResponseStream: ({ data }: any) => {
const subject = new Subject<unknown>();
if (streamHandler) {
streamHandler(
context as UptimeRequestHandlerContext,
request as SyntheticsRequest,
subject
);
}
return subject;
},
};
},
method,
server.router
);
});
};

View file

@ -4,12 +4,11 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { KibanaResponse } from '@kbn/core-http-router-server-internal';
import { enableInspectEsQueries } from '@kbn/observability-plugin/common';
import { createUptimeESClient, inspectableEsQueriesMap } from './legacy_uptime/lib/lib';
import { syntheticsServiceApiKey } from './legacy_uptime/lib/saved_objects/service_api_key';
import { SyntheticsRouteWrapper } from './legacy_uptime/routes';
import { SyntheticsRouteWrapper, SyntheticsStreamingRouteHandler } from './legacy_uptime/routes';
import { API_URLS } from '../common/constants';
export const syntheticsRouteWrapper: SyntheticsRouteWrapper = (
@ -21,6 +20,48 @@ export const syntheticsRouteWrapper: SyntheticsRouteWrapper = (
options: {
tags: ['access:uptime-read', ...(uptimeRoute?.writeAccess ? ['access:uptime-write'] : [])],
},
streamHandler: async (context, request, subject) => {
const coreContext = await context.core;
const { client: esClient } = coreContext.elasticsearch;
const savedObjectsClient = coreContext.savedObjects.getClient({
includedHiddenTypes: [syntheticsServiceApiKey.name],
});
// specifically needed for the synthetics service api key generation
server.authSavedObjectsClient = savedObjectsClient;
const isInspectorEnabled = await coreContext.uiSettings.client.get<boolean>(
enableInspectEsQueries
);
const uptimeEsClient = createUptimeESClient({
request,
savedObjectsClient,
isInspectorEnabled,
esClient: esClient.asCurrentUser,
});
server.uptimeEsClient = uptimeEsClient;
if (
(isInspectorEnabled || server.isDev) &&
server.config.service?.username !== 'localKibanaIntegrationTestsUser'
) {
inspectableEsQueriesMap.set(request, []);
}
const res = await (uptimeRoute.handler as SyntheticsStreamingRouteHandler)({
uptimeEsClient,
savedObjectsClient,
context,
request,
server,
syntheticsMonitorClient,
subject,
});
return res;
},
handler: async (context, request, response) => {
const coreContext = await context.core;
const { client: esClient } = coreContext.elasticsearch;

View file

@ -4,6 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Subject } from 'rxjs';
import { isEqual } from 'lodash';
import { KibanaRequest } from '@kbn/core/server';
import {
@ -63,6 +64,7 @@ export class ProjectMonitorFormatter {
private projectFilter: string;
private syntheticsMonitorClient: SyntheticsMonitorClient;
private request: KibanaRequest;
private subject?: Subject<unknown>;
constructor({
locations,
@ -76,6 +78,7 @@ export class ProjectMonitorFormatter {
server,
syntheticsMonitorClient,
request,
subject,
}: {
locations: Locations;
privateLocations: Locations;
@ -88,6 +91,7 @@ export class ProjectMonitorFormatter {
server: UptimeServerSetup;
syntheticsMonitorClient: SyntheticsMonitorClient;
request: KibanaRequest;
subject?: Subject<unknown>;
}) {
this.projectId = projectId;
this.spaceId = spaceId;
@ -101,6 +105,7 @@ export class ProjectMonitorFormatter {
this.server = server;
this.projectFilter = `${syntheticsMonitorType}.attributes.${ConfigKey.PROJECT_ID}: "${this.projectId}"`;
this.request = request;
this.subject = subject;
}
public configureAllProjectMonitors = async () => {
@ -158,23 +163,11 @@ export class ProjectMonitorFormatter {
if (this.staleMonitorsMap[monitor.id]) {
this.staleMonitorsMap[monitor.id].stale = false;
}
this.handleStreamingMessage({ message: `${monitor.id}: monitor updated successfully` });
} else {
const newMonitor = await this.savedObjectsClient.create<EncryptedSyntheticsMonitor>(
syntheticsMonitorType,
formatSecrets({
...normalizedMonitor,
revision: 1,
})
);
await syncNewMonitor({
server: this.server,
monitor: normalizedMonitor,
monitorSavedObject: newMonitor,
syntheticsMonitorClient: this.syntheticsMonitorClient,
savedObjectsClient: this.savedObjectsClient,
request: this.request,
});
await this.createMonitor(normalizedMonitor);
this.createdMonitors.push(monitor.id);
this.handleStreamingMessage({ message: `${monitor.id}: monitor created successfully` });
}
} catch (e) {
this.server.logger.error(e);
@ -184,6 +177,7 @@ export class ProjectMonitorFormatter {
details: e.message,
payload: monitor,
});
this.handleStreamingMessage({ message: `${monitor.id}: failed to create or update monitor` });
if (this.staleMonitorsMap[monitor.id]) {
this.staleMonitorsMap[monitor.id].stale = false;
}
@ -235,6 +229,24 @@ export class ProjectMonitorFormatter {
return savedObjects?.[0];
};
private createMonitor = async (normalizedMonitor: BrowserFields) => {
const newMonitor = await this.savedObjectsClient.create<EncryptedSyntheticsMonitor>(
syntheticsMonitorType,
formatSecrets({
...normalizedMonitor,
revision: 1,
})
);
await syncNewMonitor({
server: this.server,
monitor: normalizedMonitor,
monitorSavedObject: newMonitor,
syntheticsMonitorClient: this.syntheticsMonitorClient,
savedObjectsClient: this.savedObjectsClient,
request: this.request,
});
};
private updateMonitor = async (
previousMonitor: SavedObjectsFindResult<EncryptedSyntheticsMonitor>,
normalizedMonitor: BrowserFields
@ -325,7 +337,9 @@ export class ProjectMonitorFormatter {
request: this.request,
});
this.deletedMonitors.push(journeyId);
this.handleStreamingMessage({ message: `Monitor ${monitorId} deleted successfully` });
} catch (e) {
this.handleStreamingMessage({ message: `Monitor ${monitorId} could not be deleted` });
this.failedStaleMonitors.push({
id: monitorId,
reason: 'Failed to delete stale monitor',
@ -333,4 +347,10 @@ export class ProjectMonitorFormatter {
});
}
};
private handleStreamingMessage = async ({ message }: { message: string }) => {
if (this.subject) {
this.subject?.next(message);
}
};
}

View file

@ -4,8 +4,10 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import fetch, { BodyInit, HeadersInit, Response } from 'node-fetch';
import uuid from 'uuid';
import expect from '@kbn/expect';
import { format as formatUrl } from 'url';
import { ConfigKey, ProjectMonitorsRequest } from '@kbn/synthetics-plugin/common/runtime_types';
import { API_URLS } from '@kbn/synthetics-plugin/common/constants';
import { syntheticsMonitorType } from '@kbn/synthetics-plugin/server/legacy_uptime/lib/saved_objects/synthetics_monitor';
@ -20,9 +22,12 @@ export default function ({ getService }: FtrProviderContext) {
this.tags('skipCloud');
const supertest = getService('supertest');
const config = getService('config');
const kibanaServerUrl = formatUrl(config.get('servers.kibana'));
const supertestWithoutAuth = getService('supertestWithoutAuth');
const security = getService('security');
const kibanaServer = getService('kibanaServer');
const projectMonitorEndpoint = kibanaServerUrl + API_URLS.SYNTHETICS_MONITORS_PROJECT;
let projectMonitors: ProjectMonitorsRequest;
@ -86,14 +91,15 @@ export default function ({ getService }: FtrProviderContext) {
it('project monitors - returns a list of successfully created monitors', async () => {
try {
const apiResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send(projectMonitors);
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify(projectMonitors)
);
expect(apiResponse.body.updatedMonitors).eql([]);
expect(apiResponse.body.failedMonitors).eql([]);
expect(apiResponse.body.createdMonitors).eql(
expect(messages).to.have.length(2);
expect(messages[1].updatedMonitors).eql([]);
expect(messages[1].failedMonitors).eql([]);
expect(messages[1].createdMonitors).eql(
projectMonitors.monitors.map((monitor) => monitor.id)
);
} finally {
@ -112,14 +118,15 @@ export default function ({ getService }: FtrProviderContext) {
.set('kbn-xsrf', 'true')
.send(projectMonitors);
const apiResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send(projectMonitors);
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify(projectMonitors)
);
expect(apiResponse.body.createdMonitors).eql([]);
expect(apiResponse.body.failedMonitors).eql([]);
expect(apiResponse.body.updatedMonitors).eql(
expect(messages).to.have.length(2);
expect(messages[1].createdMonitors).eql([]);
expect(messages[1].failedMonitors).eql([]);
expect(messages[1].updatedMonitors).eql(
projectMonitors.monitors.map((monitor) => monitor.id)
);
} finally {
@ -218,15 +225,19 @@ export default function ({ getService }: FtrProviderContext) {
.send({
...projectMonitors,
monitors: testMonitors,
})
.expect(200);
});
const apiResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send(projectMonitors)
.expect(200);
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify(projectMonitors)
);
expect(messages).to.have.length(2);
expect(messages[1].createdMonitors).eql([]);
expect(messages[1].failedMonitors).eql([]);
expect(messages[1].deletedMonitors).eql([]);
expect(messages[1].updatedMonitors).eql([projectMonitors.monitors[0].id]);
expect(messages[1].staleMonitors).eql([secondMonitor.id]);
// does not delete the stale monitor
const getResponse = await supertest
.get(API_URLS.SYNTHETICS_MONITORS)
@ -239,12 +250,6 @@ export default function ({ getService }: FtrProviderContext) {
const { monitors } = getResponse.body;
expect(monitors.length).eql(1);
expect(apiResponse.body.createdMonitors).eql([]);
expect(apiResponse.body.failedMonitors).eql([]);
expect(apiResponse.body.deletedMonitors).eql([]);
expect(apiResponse.body.updatedMonitors).eql([projectMonitors.monitors[0].id]);
expect(apiResponse.body.staleMonitors).eql([secondMonitor.id]);
} finally {
await Promise.all([
testMonitors.map((monitor) => {
@ -266,14 +271,16 @@ export default function ({ getService }: FtrProviderContext) {
...projectMonitors,
keep_stale: false,
monitors: testMonitors,
})
.expect(200);
});
const projectResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send({ ...projectMonitors, keep_stale: false })
.expect(200);
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify({
...projectMonitors,
keep_stale: false,
})
);
expect(messages).to.have.length(3);
// expect monitor to have been deleted
const getResponse = await supertest
@ -288,11 +295,11 @@ export default function ({ getService }: FtrProviderContext) {
expect(monitors[0]).eql(undefined);
expect(projectResponse.body.createdMonitors).eql([]);
expect(projectResponse.body.failedMonitors).eql([]);
expect(projectResponse.body.updatedMonitors).eql([projectMonitors.monitors[0].id]);
expect(projectResponse.body.deletedMonitors).eql([secondMonitor.id]);
expect(projectResponse.body.staleMonitors).eql([]);
expect(messages[2].createdMonitors).eql([]);
expect(messages[2].failedMonitors).eql([]);
expect(messages[2].updatedMonitors).eql([projectMonitors.monitors[0].id]);
expect(messages[2].deletedMonitors).eql([secondMonitor.id]);
expect(messages[2].staleMonitors).eql([]);
} finally {
await Promise.all([
testMonitors.map((monitor) => {
@ -307,21 +314,30 @@ export default function ({ getService }: FtrProviderContext) {
const testMonitors = [projectMonitors.monitors[0], secondMonitor];
const testprojectId = 'test-suite-2';
try {
await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send({
await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify({
...projectMonitors,
keep_stale: false,
monitors: testMonitors,
})
.expect(200);
);
const projectResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send({ ...projectMonitors, keep_stale: false, project: testprojectId })
.expect(200);
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify({
...projectMonitors,
keep_stale: false,
project: testprojectId,
})
);
expect(messages).to.have.length(2);
expect(messages[1].createdMonitors).eql([projectMonitors.monitors[0].id]);
expect(messages[1].failedMonitors).eql([]);
expect(messages[1].deletedMonitors).eql([]);
expect(messages[1].updatedMonitors).eql([]);
expect(messages[1].staleMonitors).eql([]);
// expect monitor not to have been deleted
const getResponse = await supertest
@ -335,12 +351,6 @@ export default function ({ getService }: FtrProviderContext) {
const { monitors } = getResponse.body;
expect(monitors.length).eql(1);
expect(projectResponse.body.createdMonitors).eql([projectMonitors.monitors[0].id]);
expect(projectResponse.body.failedMonitors).eql([]);
expect(projectResponse.body.deletedMonitors).eql([]);
expect(projectResponse.body.updatedMonitors).eql([]);
expect(projectResponse.body.staleMonitors).eql([]);
} finally {
await Promise.all([
testMonitors.map((monitor) => {
@ -381,23 +391,32 @@ export default function ({ getService }: FtrProviderContext) {
roles: [roleName],
full_name: 'a kibana user',
});
await supertestWithoutAuth
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.auth(username, password)
.set('kbn-xsrf', 'true')
.send({
...projectMonitors,
keep_stale: false,
monitors: testMonitors,
})
.expect(200);
const projectResponse = await supertestWithoutAuth
.put(`/s/${SPACE_ID}${API_URLS.SYNTHETICS_MONITORS_PROJECT}`)
.auth(username, password)
.set('kbn-xsrf', 'true')
.send({ ...projectMonitors, keep_stale: false })
.expect(200);
// expect monitor not to have been deleted
await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify({ ...projectMonitors, keep_stale: false, monitors: testMonitors }),
{
Authorization:
'Basic ' + Buffer.from(`${username}:${password}`, 'binary').toString('base64'),
}
);
const spaceUrl = kibanaServerUrl + `/s/${SPACE_ID}${API_URLS.SYNTHETICS_MONITORS_PROJECT}`;
const messages = await parseStreamApiResponse(
spaceUrl,
JSON.stringify({ ...projectMonitors, keep_stale: false }),
{
Authorization:
'Basic ' + Buffer.from(`${username}:${password}`, 'binary').toString('base64'),
}
);
expect(messages).to.have.length(2);
expect(messages[1].createdMonitors).eql([projectMonitors.monitors[0].id]);
expect(messages[1].failedMonitors).eql([]);
expect(messages[1].deletedMonitors).eql([]);
expect(messages[1].updatedMonitors).eql([]);
expect(messages[1].staleMonitors).eql([]);
const getResponse = await supertestWithoutAuth
.get(API_URLS.SYNTHETICS_MONITORS)
.auth(username, password)
@ -408,11 +427,6 @@ export default function ({ getService }: FtrProviderContext) {
.expect(200);
const { monitors } = getResponse.body;
expect(monitors.length).eql(1);
expect(projectResponse.body.createdMonitors).eql([projectMonitors.monitors[0].id]);
expect(projectResponse.body.failedMonitors).eql([]);
expect(projectResponse.body.deletedMonitors).eql([]);
expect(projectResponse.body.updatedMonitors).eql([]);
expect(projectResponse.body.staleMonitors).eql([]);
} finally {
await Promise.all([
testMonitors.map((monitor) => {
@ -439,16 +453,17 @@ export default function ({ getService }: FtrProviderContext) {
it('project monitors - validates monitor type', async () => {
try {
const apiResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send({
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify({
...projectMonitors,
monitors: [{ ...projectMonitors.monitors[0], schedule: '3m', tags: '' }],
});
})
);
expect(apiResponse.body.updatedMonitors).eql([]);
expect(apiResponse.body.failedMonitors).eql([
expect(messages).to.have.length(1);
expect(messages[0].updatedMonitors).eql([]);
expect(messages[0].failedMonitors).eql([
{
details:
'Invalid value "3m" supplied to "schedule" | Invalid value "" supplied to "tags"',
@ -478,7 +493,7 @@ export default function ({ getService }: FtrProviderContext) {
reason: 'Failed to save or update monitor. Configuration is not valid',
},
]);
expect(apiResponse.body.createdMonitors).eql([]);
expect(messages[0].createdMonitors).eql([]);
} finally {
await Promise.all([
projectMonitors.monitors.map((monitor) => {
@ -630,12 +645,12 @@ export default function ({ getService }: FtrProviderContext) {
.expect(200);
// update project monitor via push api
const apiResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send(projectMonitors)
.expect(200);
expect(apiResponse.body.updatedMonitors).eql([projectMonitors.monitors[0].id]);
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify(projectMonitors)
);
expect(messages).to.have.length(2);
expect(messages[1].updatedMonitors).eql([projectMonitors.monitors[0].id]);
// ensure that monitor can still be decrypted
await supertest
@ -689,62 +704,6 @@ export default function ({ getService }: FtrProviderContext) {
}
});
it('handles location formatting for both private and public locations', async () => {
try {
await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send({
...projectMonitors,
monitors: [
{ ...projectMonitors.monitors[0], privateLocations: ['Test private location 0'] },
],
});
const updatedMonitorsResponse = await Promise.all(
projectMonitors.monitors.map((monitor) => {
return supertest
.get(API_URLS.SYNTHETICS_MONITORS)
.query({ filter: `${syntheticsMonitorType}.attributes.journey_id: ${monitor.id}` })
.set('kbn-xsrf', 'true')
.expect(200);
})
);
updatedMonitorsResponse.forEach((response) => {
expect(response.body.monitors[0].attributes.locations).eql([
{
id: 'localhost',
label: 'Local Synthetics Service',
geo: { lat: 0, lon: 0 },
url: 'mockDevUrl',
isServiceManaged: true,
status: 'experimental',
isInvalid: false,
},
{
label: 'Test private location 0',
isServiceManaged: false,
isInvalid: false,
agentPolicyId: testPolicyId,
id: testPolicyId,
geo: {
lat: '',
lon: '',
},
concurrentMonitors: 1,
},
]);
});
} finally {
await Promise.all([
projectMonitors.monitors.map((monitor) => {
return deleteMonitor(monitor.id, projectMonitors.project);
}),
]);
}
});
it('project monitors - returns a failed monitor when user defines a private location without fleet permissions', async () => {
const secondMonitor = {
...projectMonitors.monitors[0],
@ -771,51 +730,61 @@ export default function ({ getService }: FtrProviderContext) {
roles: [roleName],
full_name: 'a kibana user',
});
const projectResponse = await supertestWithoutAuth
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.auth(username, password)
.set('kbn-xsrf', 'true')
.send({
const messages = await parseStreamApiResponse(
kibanaServerUrl + API_URLS.SYNTHETICS_MONITORS_PROJECT,
JSON.stringify({
...projectMonitors,
keep_stale: false,
monitors: testMonitors,
});
expect(projectResponse.body.createdMonitors).eql([testMonitors[0].id]);
expect(projectResponse.body.failedMonitors).eql([
}),
{
details:
'Insufficient permissions. In order to configure private locations, you must have Fleet and Integrations write permissions. To resolve, please generate a new API key with a user who has Fleet and Integrations write permissions.',
id: 'test-id-2',
payload: {
content:
'UEsDBBQACAAIAON5qVQAAAAAAAAAAAAAAAAfAAAAZXhhbXBsZXMvdG9kb3MvYmFzaWMuam91cm5leS50c22Q0WrDMAxF3/sVF7MHB0LMXlc6RvcN+wDPVWNviW0sdUsp/fe5SSiD7UFCWFfHujIGlpnkybwxFTZfoY/E3hsaLEtwhs9RPNWKDU12zAOxkXRIbN4tB9d9pFOJdO6EN2HMqQguWN9asFBuQVMmJ7jiWNII9fIXrbabdUYr58l9IhwhQQZCYORCTFFUC31Btj21NRc7Mq4Nds+4bDD/pNVgT9F52Jyr2Fa+g75LAPttg8yErk+S9ELpTmVotlVwnfNCuh2lepl3+JflUmSBJ3uggt1v9INW/lHNLKze9dJe1J3QJK8pSvWkm6aTtCet5puq+x63+AFQSwcIAPQ3VfcAAACcAQAAUEsBAi0DFAAIAAgA43mpVAD0N1X3AAAAnAEAAB8AAAAAAAAAAAAgAKSBAAAAAGV4YW1wbGVzL3RvZG9zL2Jhc2ljLmpvdXJuZXkudHNQSwUGAAAAAAEAAQBNAAAARAEAAAAA',
filter: {
match: 'check if title is present',
},
Authorization:
'Basic ' + Buffer.from(`${username}:${password}`, 'binary').toString('base64'),
}
);
expect(messages).to.have.length(3);
expect(messages[0]).to.eql(`${testMonitors[0].id}: monitor created successfully`);
expect(messages[1]).to.eql('test-id-2: failed to create or update monitor');
expect(messages[2]).to.eql({
createdMonitors: [testMonitors[0].id],
updatedMonitors: [],
staleMonitors: [],
deletedMonitors: [],
failedMonitors: [
{
details:
'Insufficient permissions. In order to configure private locations, you must have Fleet and Integrations write permissions. To resolve, please generate a new API key with a user who has Fleet and Integrations write permissions.',
id: 'test-id-2',
locations: ['localhost'],
name: 'check if title is present',
params: {},
playwrightOptions: {
chromiumSandbox: false,
headless: true,
},
privateLocations: ['Test private location 0'],
schedule: 10,
tags: [],
throttling: {
download: 5,
latency: 20,
upload: 3,
payload: {
content:
'UEsDBBQACAAIAON5qVQAAAAAAAAAAAAAAAAfAAAAZXhhbXBsZXMvdG9kb3MvYmFzaWMuam91cm5leS50c22Q0WrDMAxF3/sVF7MHB0LMXlc6RvcN+wDPVWNviW0sdUsp/fe5SSiD7UFCWFfHujIGlpnkybwxFTZfoY/E3hsaLEtwhs9RPNWKDU12zAOxkXRIbN4tB9d9pFOJdO6EN2HMqQguWN9asFBuQVMmJ7jiWNII9fIXrbabdUYr58l9IhwhQQZCYORCTFFUC31Btj21NRc7Mq4Nds+4bDD/pNVgT9F52Jyr2Fa+g75LAPttg8yErk+S9ELpTmVotlVwnfNCuh2lepl3+JflUmSBJ3uggt1v9INW/lHNLKze9dJe1J3QJK8pSvWkm6aTtCet5puq+x63+AFQSwcIAPQ3VfcAAACcAQAAUEsBAi0DFAAIAAgA43mpVAD0N1X3AAAAnAEAAB8AAAAAAAAAAAAgAKSBAAAAAGV4YW1wbGVzL3RvZG9zL2Jhc2ljLmpvdXJuZXkudHNQSwUGAAAAAAEAAQBNAAAARAEAAAAA',
filter: {
match: 'check if title is present',
},
id: 'test-id-2',
locations: ['localhost'],
name: 'check if title is present',
params: {},
playwrightOptions: {
chromiumSandbox: false,
headless: true,
},
privateLocations: ['Test private location 0'],
schedule: 10,
tags: [],
throttling: {
download: 5,
latency: 20,
upload: 3,
},
},
reason: 'Failed to create or update monitor',
},
reason: 'Failed to create or update monitor',
},
]);
expect(projectResponse.body.deletedMonitors).eql([]);
expect(projectResponse.body.updatedMonitors).eql([]);
expect(projectResponse.body.staleMonitors).eql([]);
],
failedStaleMonitors: [],
});
} finally {
await Promise.all([
testMonitors.map((monitor) => {
@ -861,21 +830,27 @@ export default function ({ getService }: FtrProviderContext) {
roles: [roleName],
full_name: 'a kibana user',
});
const projectResponse = await supertestWithoutAuth
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.auth(username, password)
.set('kbn-xsrf', 'true')
.send({
const messages = await parseStreamApiResponse(
projectMonitorEndpoint,
JSON.stringify({
...projectMonitors,
keep_stale: false,
monitors: testMonitors,
});
expect(projectResponse.body.createdMonitors).eql([testMonitors[0].id, testMonitors[1].id]);
expect(projectResponse.body.failedMonitors).eql([]);
expect(projectResponse.body.deletedMonitors).eql([]);
expect(projectResponse.body.updatedMonitors).eql([]);
expect(projectResponse.body.staleMonitors).eql([]);
})
);
expect(messages).to.have.length(3);
expect(messages).to.eql([
`${testMonitors[0].id}: monitor created successfully`,
'test-id-2: monitor created successfully',
{
createdMonitors: [testMonitors[0].id, 'test-id-2'],
updatedMonitors: [],
staleMonitors: [],
deletedMonitors: [],
failedMonitors: [],
failedStaleMonitors: [],
},
]);
} finally {
await Promise.all([
testMonitors.map((monitor) => {
@ -1170,5 +1145,145 @@ export default function ({ getService }: FtrProviderContext) {
expect(apiResponsePolicy2.body.items.length).eql(0);
}
});
it('handles location formatting for both private and public locations', async () => {
try {
await supertest
.put(API_URLS.SYNTHETICS_MONITORS_PROJECT)
.set('kbn-xsrf', 'true')
.send({
...projectMonitors,
monitors: [
{ ...projectMonitors.monitors[0], privateLocations: ['Test private location 0'] },
],
});
const updatedMonitorsResponse = await Promise.all(
projectMonitors.monitors.map((monitor) => {
return supertest
.get(API_URLS.SYNTHETICS_MONITORS)
.query({ filter: `${syntheticsMonitorType}.attributes.journey_id: ${monitor.id}` })
.set('kbn-xsrf', 'true')
.expect(200);
})
);
updatedMonitorsResponse.forEach((response) => {
expect(response.body.monitors[0].attributes.locations).eql([
{
id: 'localhost',
label: 'Local Synthetics Service',
geo: { lat: 0, lon: 0 },
url: 'mockDevUrl',
isServiceManaged: true,
status: 'experimental',
isInvalid: false,
},
{
label: 'Test private location 0',
isServiceManaged: false,
isInvalid: false,
agentPolicyId: testPolicyId,
id: testPolicyId,
geo: {
lat: '',
lon: '',
},
concurrentMonitors: 1,
},
]);
});
} finally {
await Promise.all([
projectMonitors.monitors.map((monitor) => {
return deleteMonitor(monitor.id, projectMonitors.project);
}),
]);
}
});
});
}
/**
* Borrowed from AIOPS test code: https://github.com/elastic/kibana/blob/23a7ac2c2e2b1f64daa17b914e86989b1fde750c/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts
* Receives a stream and parses the messages until the stream closes.
*/
async function* parseStream(stream: NodeJS.ReadableStream) {
let partial = '';
try {
for await (const value of stream) {
const full = `${partial}${value}`;
const parts = full.split('\n');
const last = parts.pop();
partial = last ?? '';
const event = parts.map((p) => JSON.parse(p));
for (const events of event) {
yield events;
}
}
} catch (error) {
yield { type: 'error', payload: error.toString() };
}
}
/**
* Helper function to process the results of the module's stream parsing helper function.
*/
async function getMessages(stream: NodeJS.ReadableStream | null) {
if (stream === null) return [];
const data: any[] = [];
for await (const action of parseStream(stream)) {
data.push(action);
}
return data;
}
/**
* This type is intended to highlight any break between shared parameter contracts defined in
* the module's streaming endpoint helper functions.
*/
type StreamApiFunction<T = unknown> = (
url: string,
body?: BodyInit,
extraHeaders?: HeadersInit,
method?: string
) => T;
/**
* This helps the test file have DRY code when it comes to calling
* the same streaming endpoint over and over by defining some selective defaults.
*/
const parseStreamApiResponse: StreamApiFunction<Promise<any[]>> = async (
url: string,
body?: BodyInit,
extraHeaders?: HeadersInit,
method = 'PUT'
) => {
const streamResponse = await callStreamApi(url, body, extraHeaders, method);
return getMessages(streamResponse.body);
};
/**
* This helps the test file have DRY code when it comes to calling
* the same streaming endpoint over and over by defining some selective defaults.
*/
const callStreamApi: StreamApiFunction<Promise<Response>> = async (
url: string,
body?: BodyInit,
extraHeaders?: HeadersInit,
method = 'PUT'
) => {
return fetch(url, {
method,
headers: {
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
...extraHeaders,
},
body,
});
};