[Uptime] implement monitor management telemetry (#124473)

* uptime - implement monitor management telemetry

* adjust log level

* adjust types

* add telemetry dependency

* remove extra log

* update locationsCount

* add current channel

* adjust types

* update location errors

* enhance uptime telemetry adapter for synthetics use case

* fix test

* update xpack_plugins.json

* fix tests

* fix tests

* update snapshots

* remove telemetry collectors implementation

* update tests and mappings

* Update x-pack/plugins/uptime/server/lib/adapters/telemetry/kibana_telemetry_adapter.ts

* Update x-pack/plugins/uptime/server/lib/telemetry/sender.ts

* move telemetry setup to service conditional

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Dominique Clarke 2022-02-16 12:38:50 -05:00 committed by GitHub
parent 4137dd8de3
commit bcfed9fed3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 960 additions and 54 deletions

View file

@ -37,6 +37,7 @@ export enum ConfigKey {
REQUEST_HEADERS_CHECK = 'check.request.headers',
REQUEST_METHOD_CHECK = 'check.request.method',
REQUEST_SEND_CHECK = 'check.send',
REVISION = 'revision',
SCHEDULE = 'schedule',
SCREENSHOTS = 'screenshots',
SOURCE_INLINE = 'source.inline.script',

View file

@ -30,27 +30,34 @@ export const ServiceLocationCodec = t.interface({
});
export const ServiceLocationErrors = t.array(
t.intersection([
t.interface({
locationId: t.string,
error: t.interface({
t.interface({
locationId: t.string,
error: t.intersection([
t.interface({
reason: t.string,
status: t.number,
}),
}),
t.partial({
failed_monitors: t.array(
t.interface({
id: t.string,
message: t.string,
})
),
}),
])
t.partial({
failed_monitors: t.array(
t.interface({
id: t.string,
message: t.string,
})
),
}),
]),
})
);
export const ServiceLocationsCodec = t.array(ServiceLocationCodec);
export const LocationCodec = t.intersection([
ServiceLocationCodec,
t.partial({ isServiceManaged: t.boolean }),
]);
export const LocationsCodec = t.array(LocationCodec);
export const isServiceLocationInvalid = (location: ServiceLocation) =>
isLeft(ServiceLocationCodec.decode(location));
@ -63,3 +70,4 @@ export type ServiceLocation = t.TypeOf<typeof ServiceLocationCodec>;
export type ServiceLocations = t.TypeOf<typeof ServiceLocationsCodec>;
export type ServiceLocationsApiResponse = t.TypeOf<typeof ServiceLocationsApiResponseCodec>;
export type ServiceLocationErrors = t.TypeOf<typeof ServiceLocationErrors>;
export type Locations = t.TypeOf<typeof LocationsCodec>;

View file

@ -7,7 +7,7 @@
import * as t from 'io-ts';
import { ConfigKey } from './config_key';
import { ServiceLocationsCodec } from './locations';
import { LocationsCodec } from './locations';
import {
DataStreamCodec,
ModeCodec,
@ -55,13 +55,13 @@ export const CommonFieldsCodec = t.intersection([
[ConfigKey.MONITOR_TYPE]: DataStreamCodec,
[ConfigKey.ENABLED]: t.boolean,
[ConfigKey.SCHEDULE]: Schedule,
[ConfigKey.LOCATIONS]: t.array(t.string),
[ConfigKey.APM_SERVICE_NAME]: t.string,
[ConfigKey.TAGS]: t.array(t.string),
[ConfigKey.LOCATIONS]: ServiceLocationsCodec,
[ConfigKey.LOCATIONS]: LocationsCodec,
}),
t.partial({
[ConfigKey.TIMEOUT]: t.union([t.string, t.null]),
[ConfigKey.REVISION]: t.number,
}),
]);

View file

@ -2,7 +2,7 @@
"configPath": ["xpack", "uptime"],
"id": "uptime",
"kibanaVersion": "kibana",
"optionalPlugins": ["cloud", "data", "fleet", "home", "ml"],
"optionalPlugins": ["cloud", "data", "fleet", "home", "ml", "telemetry"],
"requiredPlugins": [
"alerting",
"cases",

View file

@ -24,6 +24,7 @@ export const commonFormatters: CommonFormatMap = {
[ConfigKey.TAGS]: (fields) => arrayToJsonFormatter(fields[ConfigKey.TAGS]),
[ConfigKey.TIMEOUT]: (fields) => secondsToCronFormatter(fields[ConfigKey.TIMEOUT] || undefined),
[ConfigKey.NAMESPACE]: null,
[ConfigKey.REVISION]: null,
};
export const arrayToJsonFormatter = (value: string[] = []) =>

View file

@ -82,4 +82,5 @@ export const commonNormalizers: CommonNormalizerMap = {
[ConfigKey.TIMEOUT]: getCommonCronToSecondsNormalizer(ConfigKey.TIMEOUT),
[ConfigKey.NAMESPACE]: (fields) =>
fields?.[ConfigKey.NAMESPACE]?.value ?? DEFAULT_NAMESPACE_STRING,
[ConfigKey.REVISION]: getCommonNormalizer(ConfigKey.REVISION),
};

View file

@ -7,6 +7,7 @@
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import type { SavedObjectsClientContract, IScopedClusterClient, Logger } from 'src/core/server';
import type { TelemetryPluginSetup, TelemetryPluginStart } from 'src/plugins/telemetry/server';
import { ObservabilityPluginSetup } from '../../../../../observability/server';
import {
EncryptedSavedObjectsPluginSetup,
@ -21,6 +22,7 @@ import { PluginSetupContract } from '../../../../../features/server';
import { MlPluginSetup as MlSetup } from '../../../../../ml/server';
import { RuleRegistryPluginSetupContract } from '../../../../../rule_registry/server';
import { UptimeESClient } from '../../lib';
import type { TelemetryEventsSender } from '../../telemetry/sender';
import type { UptimeRouter } from '../../../types';
import { SecurityPluginStart } from '../../../../../security/server';
import { CloudSetup } from '../../../../../cloud/server';
@ -52,6 +54,7 @@ export interface UptimeServerSetup {
syntheticsService: SyntheticsService;
kibanaVersion: string;
logger: Logger;
telemetry: TelemetryEventsSender;
uptimeEsClient: UptimeESClient;
}
@ -65,6 +68,7 @@ export interface UptimeCorePluginsSetup {
ruleRegistry: RuleRegistryPluginSetupContract;
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup;
taskManager: TaskManagerSetupContract;
telemetry: TelemetryPluginSetup;
}
export interface UptimeCorePluginsStart {
@ -72,6 +76,7 @@ export interface UptimeCorePluginsStart {
fleet: FleetStartContract;
encryptedSavedObjects: EncryptedSavedObjectsPluginStart;
taskManager: TaskManagerStartContract;
telemetry: TelemetryPluginStart;
}
export interface UMBackendFrameworkAdapter {

View file

@ -25,6 +25,7 @@ export const commonFormatters: CommonFormatMap = {
[ConfigKey.TAGS]: (fields) => arrayFormatter(fields[ConfigKey.TAGS]),
[ConfigKey.TIMEOUT]: (fields) => secondsToCronFormatter(fields[ConfigKey.TIMEOUT] || undefined),
[ConfigKey.NAMESPACE]: null,
[ConfigKey.REVISION]: null,
};
export const arrayFormatter = (value: string[] = []) => (value.length ? value : null);

View file

@ -49,6 +49,7 @@ describe('getServiceLocations', function () {
id: 'us_central',
label: 'US Central',
url: 'https://local.dev',
isServiceManaged: true,
},
],
});

View file

@ -8,13 +8,13 @@
import axios from 'axios';
import {
ManifestLocation,
ServiceLocations,
Locations,
ServiceLocationsApiResponse,
} from '../../../common/runtime_types';
import { UptimeServerSetup } from '../adapters/framework';
export async function getServiceLocations(server: UptimeServerSetup) {
const locations: ServiceLocations = [];
const locations: Locations = [];
if (!server.config.service?.manifestUrl) {
return { locations };
@ -31,6 +31,7 @@ export async function getServiceLocations(server: UptimeServerSetup) {
label: location.geo.name,
geo: location.geo.location,
url: location.url,
isServiceManaged: true,
});
});

View file

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { TelemetryEventsSender } from '../sender';
/**
* Creates a mocked Telemetry Events Sender
*/
export const createMockTelemetryEventsSender = (
enableTelemetry?: boolean
): jest.Mocked<TelemetryEventsSender> => {
return {
setup: jest.fn(),
start: jest.fn(),
stop: jest.fn(),
fetchTelemetryUrl: jest.fn(),
queueTelemetryEvents: jest.fn(),
isTelemetryOptedIn: jest.fn().mockReturnValue(enableTelemetry ?? jest.fn()),
sendIfDue: jest.fn(),
sendEvents: jest.fn(),
} as unknown as jest.Mocked<TelemetryEventsSender>;
};

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const MONITOR_UPDATE_CHANNEL = 'synthetics-monitor-update';
export const MONITOR_CURRENT_CHANNEL = 'synthetics-monitor-current';

View file

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/* eslint-disable dot-notation */
import { TelemetryQueue } from './queue';
describe('TelemetryQueue', () => {
describe('queueTelemetryEvents', () => {
it('queues two events', () => {
const queue = new TelemetryQueue();
queue.addEvents([{ 'event.kind': '1' }, { 'event.kind': '2' }]);
expect(queue['queue'].length).toBe(2);
});
it('queues more than maxQueueSize events', () => {
const queue = new TelemetryQueue();
queue.addEvents([{ 'event.kind': '1' }, { 'event.kind': '2' }]);
queue['maxQueueSize'] = 5;
queue.addEvents([{ 'event.kind': '3' }, { 'event.kind': '4' }]);
queue.addEvents([{ 'event.kind': '5' }, { 'event.kind': '6' }]);
queue.addEvents([{ 'event.kind': '7' }, { 'event.kind': '8' }]);
expect(queue['queue'].length).toBe(5);
});
it('get and clear events', async () => {
const queue = new TelemetryQueue();
queue.addEvents([{ 'event.kind': '1' }, { 'event.kind': '2' }]);
expect(queue.getEvents().length).toBe(2);
queue.clearEvents();
expect(queue['queue'].length).toBe(0);
});
});
});

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const TELEMETRY_MAX_QUEUE_SIZE = 100;
export class TelemetryQueue<T> {
private maxQueueSize = TELEMETRY_MAX_QUEUE_SIZE;
private queue: T[] = [];
public addEvents(events: T[]) {
const qlength = this.queue.length;
if (events.length === 0) {
return;
}
if (qlength >= this.maxQueueSize) {
// we're full already
return;
}
if (events.length > this.maxQueueSize - qlength) {
this.queue.push(...events.slice(0, this.maxQueueSize - qlength));
} else {
this.queue.push(...events);
}
}
public clearEvents() {
this.queue = [];
}
public getEvents(): T[] {
return this.queue;
}
}

View file

@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/* eslint-disable dot-notation */
import { URL } from 'url';
import axios from 'axios';
import type { InfoResponse } from '@elastic/elasticsearch/lib/api/types';
import { loggingSystemMock } from 'src/core/server/mocks';
import { MONITOR_UPDATE_CHANNEL } from './constants';
import { TelemetryEventsSender } from './sender';
jest.mock('axios', () => {
return {
post: jest.fn(),
};
});
describe('TelemetryEventsSender', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
let sender: TelemetryEventsSender;
const sampleEvent = {
configId: '12345',
stackVersion: '8.1.0',
type: 'http',
locations: ['us_central'],
locationsCount: 1,
monitorNameLength: 8,
monitorInterval: 180000,
revision: 1,
};
beforeEach(() => {
logger = loggingSystemMock.createLogger();
sender = new TelemetryEventsSender(logger);
sender['fetchClusterInfo'] = jest.fn(async () => {
return {
cluster_uuid: '1',
cluster_name: 'name',
version: {
number: '8.0.0',
},
} as InfoResponse;
});
sender.start(undefined, {
elasticsearch: { client: { asInternalUser: { info: jest.fn(async () => ({})) } } },
} as any);
});
describe('queueTelemetryEvents', () => {
it('queues two events', () => {
sender.queueTelemetryEvents(MONITOR_UPDATE_CHANNEL, [sampleEvent]);
expect(sender['queuesPerChannel'][MONITOR_UPDATE_CHANNEL]).toBeDefined();
});
it('should send events when due', async () => {
sender['telemetryStart'] = {
getIsOptedIn: jest.fn(async () => true),
};
sender['telemetrySetup'] = {
getTelemetryUrl: jest.fn(
async () => new URL('https://telemetry-staging.elastic.co/v3/send/snapshot')
),
};
sender.queueTelemetryEvents(MONITOR_UPDATE_CHANNEL, [sampleEvent]);
sender['sendEvents'] = jest.fn();
await sender['sendIfDue']();
expect(sender['sendEvents']).toHaveBeenCalledWith(
`https://telemetry-staging.elastic.co/v3-dev/send/${MONITOR_UPDATE_CHANNEL}`,
{ cluster_name: 'name', cluster_uuid: '1', version: { number: '8.0.0' } },
expect.anything()
);
});
it("shouldn't send when telemetry is disabled", async () => {
const telemetryStart = {
getIsOptedIn: jest.fn(async () => false),
};
sender['telemetryStart'] = telemetryStart;
sender.queueTelemetryEvents(MONITOR_UPDATE_CHANNEL, [sampleEvent]);
sender['sendEvents'] = jest.fn();
await sender['sendIfDue']();
expect(sender['sendEvents']).toBeCalledTimes(0);
});
it('should send events to separate channels', async () => {
sender['telemetryStart'] = {
getIsOptedIn: jest.fn(async () => true),
};
sender['telemetrySetup'] = {
getTelemetryUrl: jest.fn(
async () => new URL('https://telemetry.elastic.co/v3/send/snapshot')
),
};
const myChannelEvents = [{ 'event.kind': '1' }, { 'event.kind': '2' }];
// @ts-ignore
sender.queueTelemetryEvents('my-channel', myChannelEvents);
sender['queuesPerChannel']['my-channel']['getEvents'] = jest.fn(() => myChannelEvents);
expect(sender['queuesPerChannel']['my-channel']['queue'].length).toBe(2);
const myChannel2Events = [{ 'event.kind': '3' }];
// @ts-ignore
sender.queueTelemetryEvents('my-channel2', myChannel2Events);
sender['queuesPerChannel']['my-channel2']['getEvents'] = jest.fn(() => myChannel2Events);
expect(sender['queuesPerChannel']['my-channel2']['queue'].length).toBe(1);
await sender['sendIfDue']();
expect(sender['queuesPerChannel']['my-channel']['getEvents']).toBeCalledTimes(1);
expect(sender['queuesPerChannel']['my-channel2']['getEvents']).toBeCalledTimes(1);
const headers = {
headers: {
'Content-Type': 'application/x-ndjson',
'X-Elastic-Cluster-ID': '1',
'X-Elastic-Stack-Version': '8.0.0',
},
};
expect(axios.post).toHaveBeenCalledWith(
'https://telemetry.elastic.co/v3/send/my-channel',
'{"event.kind":"1"}\n{"event.kind":"2"}\n',
headers
);
expect(axios.post).toHaveBeenCalledWith(
'https://telemetry.elastic.co/v3/send/my-channel2',
'{"event.kind":"3"}\n',
headers
);
});
});
});

View file

@ -0,0 +1,195 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { CoreStart, ElasticsearchClient, Logger } from 'src/core/server';
import type { TelemetryPluginStart, TelemetryPluginSetup } from 'src/plugins/telemetry/server';
import { cloneDeep } from 'lodash';
import axios from 'axios';
import type { InfoResponse } from '@elastic/elasticsearch/lib/api/types';
import { TelemetryQueue } from './queue';
import type { MonitorUpdateTelemetryChannel, MonitorUpdateTelemetryChannelEvents } from './types';
/**
* Simplified version of https://github.com/elastic/kibana/blob/master/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts
* Sends batched events to telemetry v3 api
*/
export class TelemetryEventsSender {
private readonly initialCheckDelayMs = 10 * 1000;
private readonly checkIntervalMs = 30 * 1000;
private readonly logger: Logger;
private telemetryStart?: TelemetryPluginStart;
private telemetrySetup?: TelemetryPluginSetup;
private intervalId?: NodeJS.Timeout;
private isSending = false;
private queuesPerChannel: { [channel: string]: TelemetryQueue<any> } = {};
private isOptedIn?: boolean = true; // Assume true until the first check
private esClient?: ElasticsearchClient;
private clusterInfo?: InfoResponse;
constructor(logger: Logger) {
this.logger = logger;
}
public setup(telemetrySetup?: TelemetryPluginSetup) {
this.telemetrySetup = telemetrySetup;
}
public async start(telemetryStart?: TelemetryPluginStart, core?: CoreStart) {
this.telemetryStart = telemetryStart;
this.esClient = core?.elasticsearch.client.asInternalUser;
this.clusterInfo = await this.fetchClusterInfo();
this.logger.debug(`Starting local task`);
setTimeout(() => {
this.sendIfDue();
this.intervalId = setInterval(() => this.sendIfDue(), this.checkIntervalMs);
}, this.initialCheckDelayMs);
}
public stop() {
if (this.intervalId) {
clearInterval(this.intervalId);
}
}
public queueTelemetryEvents<T extends MonitorUpdateTelemetryChannel>(
channel: T,
events: Array<MonitorUpdateTelemetryChannelEvents[T]>
) {
if (!this.queuesPerChannel[channel]) {
this.queuesPerChannel[channel] = new TelemetryQueue<MonitorUpdateTelemetryChannelEvents[T]>();
}
this.queuesPerChannel[channel].addEvents(cloneDeep(events));
}
public async isTelemetryOptedIn() {
this.isOptedIn = await this.telemetryStart?.getIsOptedIn();
return this.isOptedIn === true;
}
private async sendIfDue() {
if (this.isSending) {
return;
}
this.isSending = true;
this.isOptedIn = await this.isTelemetryOptedIn();
if (!this.isOptedIn) {
this.logger.debug(`Telemetry is not opted-in.`);
for (const channel of Object.keys(this.queuesPerChannel)) {
this.queuesPerChannel[channel].clearEvents();
}
this.isSending = false;
return;
}
for (const channel of Object.keys(this.queuesPerChannel)) {
await this.sendEvents(
await this.fetchTelemetryUrl(channel),
this.clusterInfo,
this.queuesPerChannel[channel]
);
}
this.isSending = false;
}
private async fetchClusterInfo(): Promise<InfoResponse> {
if (this.esClient === undefined || this.esClient === null) {
throw Error('elasticsearch client is unavailable: cannot retrieve cluster infomation');
}
return await this.esClient.info();
}
public async sendEvents(
telemetryUrl: string,
clusterInfo: InfoResponse | undefined,
queue: TelemetryQueue<any>
) {
const events = queue.getEvents();
if (events.length === 0) {
return;
}
try {
this.logger.debug(`Telemetry URL: ${telemetryUrl}`);
queue.clearEvents();
this.logger.debug(JSON.stringify(events));
await this.send(
events,
telemetryUrl,
clusterInfo?.cluster_uuid,
clusterInfo?.version?.number
);
} catch (err) {
this.logger.debug(`Error sending telemetry events data: ${err}`);
queue.clearEvents();
}
}
// Forms URLs like:
// https://telemetry.elastic.co/v3/send/my-channel-name or
// https://telemetry-staging.elastic.co/v3/send/my-channel-name
private async fetchTelemetryUrl(channel: string): Promise<string> {
const telemetryUrl = await this.telemetrySetup?.getTelemetryUrl();
if (!telemetryUrl) {
throw Error("Couldn't get telemetry URL");
}
if (!telemetryUrl.hostname.includes('staging')) {
telemetryUrl.pathname = `/v3/send/${channel}`;
} else {
telemetryUrl.pathname = `/v3-dev/send/${channel}`;
}
return telemetryUrl.toString();
}
private async send(
events: unknown[],
telemetryUrl: string,
clusterUuid: string | undefined,
clusterVersionNumber: string | undefined
) {
// using ndjson so that each line will be wrapped in json envelope on server side
// see https://github.com/elastic/infra/blob/master/docs/telemetry/telemetry-next-dataflow.md#json-envelope
const ndjson = this.transformDataToNdjson(events);
try {
const resp = await axios.post(telemetryUrl, ndjson, {
headers: {
'Content-Type': 'application/x-ndjson',
'X-Elastic-Cluster-ID': clusterUuid,
'X-Elastic-Stack-Version': clusterVersionNumber ? clusterVersionNumber : '8.2.0',
},
});
this.logger.debug(`Events sent!. Response: ${resp.status} ${JSON.stringify(resp.data)}`);
} catch (err) {
this.logger.debug(
`Error sending events: ${err.response.status} ${JSON.stringify(err.response.data)}`
);
}
}
private transformDataToNdjson = (data: unknown[]): string => {
if (data.length !== 0) {
const dataString = data.map((dataItem) => JSON.stringify(dataItem)).join('\n');
return `${dataString}\n`;
} else {
return '';
}
};
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ServiceLocationErrors } from '../../../common/runtime_types/monitor_management';
export interface MonitorUpdateEvent {
updatedAt?: string;
lastUpdatedAt?: string;
durationSinceLastUpdated?: number;
deletedAt?: string;
type: string;
stackVersion: string;
monitorNameLength: number;
monitorInterval: number;
locations: string[];
locationsCount: number;
scriptType?: 'inline' | 'recorder' | 'zip';
revision?: number;
errors?: ServiceLocationErrors;
configId: string;
}
export interface MonitorUpdateTelemetryChannelEvents {
// channel name => event type
'synthetics-monitor-update': MonitorUpdateEvent;
'synthetics-monitor-current': MonitorUpdateEvent;
}
export type MonitorUpdateTelemetryChannel = keyof MonitorUpdateTelemetryChannelEvents;

View file

@ -21,6 +21,7 @@ import {
UptimeCorePluginsStart,
UptimeServerSetup,
} from './lib/adapters';
import { TelemetryEventsSender } from './lib/telemetry/sender';
import { registerUptimeSavedObjects, savedObjectsAdapter } from './lib/saved_objects/saved_objects';
import { mappingFromFieldMap } from '../../rule_registry/common/mapping_from_field_map';
import { experimentalRuleFieldMap } from '../../rule_registry/common/assets/field_maps/experimental_rule_field_map';
@ -37,11 +38,13 @@ export class Plugin implements PluginType {
private logger: Logger;
private server?: UptimeServerSetup;
private syntheticService?: SyntheticsService;
private readonly telemetryEventsSender: TelemetryEventsSender;
private readonly isServiceEnabled?: boolean;
constructor(initializerContext: PluginInitializerContext<UptimeConfig>) {
this.initContext = initializerContext;
this.logger = initializerContext.logger.get();
this.telemetryEventsSender = new TelemetryEventsSender(this.logger);
const config = this.initContext.config.get<UptimeConfig>();
this.isServiceEnabled = config?.ui?.monitorManagement?.enabled && Boolean(config.service);
}
@ -76,6 +79,7 @@ export class Plugin implements PluginType {
cloud: plugins.cloud,
kibanaVersion: this.initContext.env.packageInfo.version,
logger: this.logger,
telemetry: this.telemetryEventsSender,
} as UptimeServerSetup;
if (this.isServiceEnabled && this.server.config.service) {
@ -86,6 +90,7 @@ export class Plugin implements PluginType {
);
this.syntheticService.registerSyncTask(plugins.taskManager);
this.telemetryEventsSender.setup(plugins.telemetry);
}
initServerWithKibana(this.server, plugins, ruleDataClient, this.logger);
@ -130,6 +135,7 @@ export class Plugin implements PluginType {
if (this.server && this.syntheticService) {
this.server.syntheticsService = this.syntheticService;
}
this.telemetryEventsSender.start(plugins.telemetry, coreStart);
}
}

View file

@ -5,11 +5,13 @@
* 2.0.
*/
import { schema } from '@kbn/config-schema';
import { SavedObject } from 'kibana/server';
import { MonitorFields, SyntheticsMonitor } from '../../../common/runtime_types';
import { UMRestApiRouteFactory } from '../types';
import { API_URLS } from '../../../common/constants';
import { syntheticsMonitorType } from '../../lib/saved_objects/synthetics_monitor';
import { validateMonitor } from './monitor_validation';
import { sendTelemetryEvents, formatTelemetryEvent } from './telemetry/monitor_upgrade_sender';
export const addSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
method: 'POST',
@ -27,10 +29,11 @@ export const addSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
return response.badRequest({ body: { message, attributes: { details, ...payload } } });
}
const newMonitor = await savedObjectsClient.create<SyntheticsMonitor>(
syntheticsMonitorType,
monitor
);
const newMonitor: SavedObject<SyntheticsMonitor> =
await savedObjectsClient.create<SyntheticsMonitor>(syntheticsMonitorType, {
...monitor,
revision: 1,
});
const { syntheticsService } = server;
@ -45,6 +48,12 @@ export const addSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
},
]);
sendTelemetryEvents(
server.logger,
server.telemetry,
formatTelemetryEvent({ monitor: newMonitor, errors, kibanaVersion: server.kibanaVersion })
);
if (errors) {
return errors;
}

View file

@ -11,6 +11,10 @@ import { UMRestApiRouteFactory } from '../types';
import { API_URLS } from '../../../common/constants';
import { syntheticsMonitorType } from '../../lib/saved_objects/synthetics_monitor';
import { getMonitorNotFoundResponse } from './service_errors';
import {
sendTelemetryEvents,
formatTelemetryDeleteEvent,
} from './telemetry/monitor_upgrade_sender';
export const deleteSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
method: 'DELETE',
@ -36,6 +40,12 @@ export const deleteSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
{ ...monitor.attributes, id: monitorId },
]);
sendTelemetryEvents(
server.logger,
server.telemetry,
formatTelemetryDeleteEvent(monitor, server.kibanaVersion, new Date().toISOString(), errors)
);
if (errors) {
return errors;
}

View file

@ -6,14 +6,18 @@
*/
import { schema } from '@kbn/config-schema';
import { SavedObjectsUpdateResponse } from 'kibana/server';
import { SavedObjectsUpdateResponse, SavedObject } from 'kibana/server';
import { SavedObjectsErrorHelpers } from '../../../../../../src/core/server';
import { MonitorFields, SyntheticsMonitor } from '../../../common/runtime_types';
import { MonitorFields, SyntheticsMonitor, ConfigKey } from '../../../common/runtime_types';
import { UMRestApiRouteFactory } from '../types';
import { API_URLS } from '../../../common/constants';
import { syntheticsMonitorType } from '../../lib/saved_objects/synthetics_monitor';
import { validateMonitor } from './monitor_validation';
import { getMonitorNotFoundResponse } from './service_errors';
import {
sendTelemetryEvents,
formatTelemetryUpdateEvent,
} from './telemetry/monitor_upgrade_sender';
// Simplify return promise type and type it with runtime_types
export const editSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
@ -40,11 +44,20 @@ export const editSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
const { syntheticsService } = server;
try {
const previousMonitor: SavedObject<MonitorFields> = await savedObjectsClient.get(
syntheticsMonitorType,
monitorId
);
const monitorWithRevision = {
...monitor,
revision: (previousMonitor.attributes[ConfigKey.REVISION] || 0) + 1,
};
const editMonitor: SavedObjectsUpdateResponse<MonitorFields> =
await savedObjectsClient.update<MonitorFields>(
syntheticsMonitorType,
monitorId,
monitor.type === 'browser' ? { ...monitor, urls: '' } : monitor
monitor.type === 'browser' ? { ...monitorWithRevision, urls: '' } : monitorWithRevision
);
const errors = await syntheticsService.pushConfigs(request, [
@ -58,6 +71,12 @@ export const editSyntheticsMonitorRoute: UMRestApiRouteFactory = () => ({
},
]);
sendTelemetryEvents(
server.logger,
server.telemetry,
formatTelemetryUpdateEvent(editMonitor, previousMonitor, server.kibanaVersion, errors)
);
// Return service sync errors in OK response
if (errors) {
return errors;

View file

@ -0,0 +1,215 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { sha256 } from 'js-sha256';
import type { Logger } from 'src/core/server';
import { loggingSystemMock } from 'src/core/server/mocks';
import { SavedObject } from 'kibana/server';
import {
SyntheticsMonitor,
ConfigKey,
DataStream,
ScheduleUnit,
} from '../../../../common/runtime_types/monitor_management';
import type { TelemetryEventsSender } from '../../../lib/telemetry/sender';
import { createMockTelemetryEventsSender } from '../../../lib/telemetry/__mocks__';
import { MONITOR_UPDATE_CHANNEL, MONITOR_CURRENT_CHANNEL } from '../../../lib/telemetry/constants';
import {
formatTelemetryEvent,
formatTelemetryUpdateEvent,
formatTelemetryDeleteEvent,
sendTelemetryEvents,
} from './monitor_upgrade_sender';
const kibanaVersion = '8.2.0';
const id = '123456';
const errors = [
{
locationId: 'us_central',
error: {
reason: 'my reason',
status: 400,
},
},
];
const testConfig: SavedObject<SyntheticsMonitor> = {
updated_at: '2011-10-05T14:48:00.000Z',
id,
attributes: {
[ConfigKey.MONITOR_TYPE]: DataStream.HTTP,
[ConfigKey.LOCATIONS]: [
{
id: 'us_central',
label: 'US Central',
url: 'testurl.com',
geo: {
lat: 0,
lon: 0,
},
isServiceManaged: true,
},
{
id: 'custom',
label: 'Custom US Central',
url: 'testurl.com',
geo: {
lat: 0,
lon: 0,
},
},
],
[ConfigKey.SCHEDULE]: { number: '3', unit: ScheduleUnit.MINUTES },
[ConfigKey.URLS]: 'https://elastic.co',
[ConfigKey.NAME]: 'Test',
[ConfigKey.REVISION]: 1,
} as SyntheticsMonitor,
} as SavedObject<SyntheticsMonitor>;
const createTestConfig = (extraConfigs: Record<string, any>, updatedAt?: string) => {
return {
...testConfig,
updated_at: updatedAt || testConfig.updated_at,
attributes: {
...testConfig.attributes,
...extraConfigs,
},
} as SavedObject<SyntheticsMonitor>;
};
describe('monitor upgrade telemetry helpers', () => {
it('formats telemetry events', () => {
const actual = formatTelemetryEvent({ monitor: testConfig, kibanaVersion, errors });
expect(actual).toEqual({
stackVersion: kibanaVersion,
configId: sha256.create().update(testConfig.id).hex(),
locations: ['us_central', 'other'],
locationsCount: 2,
monitorNameLength: testConfig.attributes[ConfigKey.NAME].length,
updatedAt: testConfig.updated_at,
type: testConfig.attributes[ConfigKey.MONITOR_TYPE],
scriptType: undefined,
monitorInterval: 180000,
lastUpdatedAt: undefined,
deletedAt: undefined,
errors,
durationSinceLastUpdated: undefined,
revision: 1,
});
});
it.each([
[ConfigKey.SOURCE_INLINE, 'recorder', true],
[ConfigKey.SOURCE_INLINE, 'inline', false],
[ConfigKey.SOURCE_ZIP_URL, 'zip', false],
])('handles formatting scriptType for browser monitors', (config, scriptType, isRecorder) => {
const actual = formatTelemetryEvent({
monitor: createTestConfig({
[config]: 'test',
[ConfigKey.METADATA]: {
script_source: {
is_generated_script: isRecorder,
},
},
}),
kibanaVersion,
errors,
});
expect(actual).toEqual({
stackVersion: kibanaVersion,
configId: sha256.create().update(testConfig.id).hex(),
locations: ['us_central', 'other'],
locationsCount: 2,
monitorNameLength: testConfig.attributes[ConfigKey.NAME].length,
updatedAt: testConfig.updated_at,
type: testConfig.attributes[ConfigKey.MONITOR_TYPE],
scriptType,
monitorInterval: 180000,
lastUpdatedAt: undefined,
deletedAt: undefined,
errors,
durationSinceLastUpdated: undefined,
revision: 1,
});
});
it('handles formatting update events', () => {
const actual = formatTelemetryUpdateEvent(
createTestConfig({}, '2011-10-05T16:48:00.000Z'),
testConfig,
kibanaVersion,
errors
);
expect(actual).toEqual({
stackVersion: kibanaVersion,
configId: sha256.create().update(testConfig.id).hex(),
locations: ['us_central', 'other'],
locationsCount: 2,
monitorNameLength: testConfig.attributes[ConfigKey.NAME].length,
updatedAt: '2011-10-05T16:48:00.000Z',
type: testConfig.attributes[ConfigKey.MONITOR_TYPE],
scriptType: undefined,
monitorInterval: 180000,
lastUpdatedAt: testConfig.updated_at,
deletedAt: undefined,
errors,
durationSinceLastUpdated: 7200000,
revision: 1,
});
});
it('handles formatting delete events', () => {
const actual = formatTelemetryDeleteEvent(
testConfig,
kibanaVersion,
'2011-10-05T16:48:00.000Z',
errors
);
expect(actual).toEqual({
stackVersion: kibanaVersion,
configId: sha256.create().update(testConfig.id).hex(),
locations: ['us_central', 'other'],
locationsCount: 2,
monitorNameLength: testConfig.attributes[ConfigKey.NAME].length,
updatedAt: '2011-10-05T16:48:00.000Z',
type: testConfig.attributes[ConfigKey.MONITOR_TYPE],
scriptType: undefined,
monitorInterval: 180000,
lastUpdatedAt: testConfig.updated_at,
deletedAt: '2011-10-05T16:48:00.000Z',
errors,
durationSinceLastUpdated: 7200000,
revision: 1,
});
});
});
describe('sendTelemetryEvents', () => {
let eventsTelemetryMock: jest.Mocked<TelemetryEventsSender>;
let loggerMock: jest.Mocked<Logger>;
beforeEach(() => {
eventsTelemetryMock = createMockTelemetryEventsSender();
loggerMock = loggingSystemMock.createLogger();
});
it('should queue telemetry events with generic error', () => {
const event = formatTelemetryEvent({ monitor: testConfig, kibanaVersion, errors });
sendTelemetryEvents(
loggerMock,
eventsTelemetryMock,
formatTelemetryEvent({ monitor: testConfig, kibanaVersion, errors })
);
expect(eventsTelemetryMock.queueTelemetryEvents).toHaveBeenCalledWith(MONITOR_UPDATE_CHANNEL, [
event,
]);
expect(eventsTelemetryMock.queueTelemetryEvents).toHaveBeenCalledWith(MONITOR_CURRENT_CHANNEL, [
event,
]);
});
});

View file

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { sha256 } from 'js-sha256';
import type { Logger } from 'src/core/server';
import { SavedObjectsUpdateResponse, SavedObject } from 'kibana/server';
import {
MonitorFields,
SyntheticsMonitor,
ConfigKey,
ServiceLocationErrors,
} from '../../../../common/runtime_types';
import type { MonitorUpdateEvent } from '../../../lib/telemetry/types';
import { TelemetryEventsSender } from '../../../lib/telemetry/sender';
import { MONITOR_UPDATE_CHANNEL, MONITOR_CURRENT_CHANNEL } from '../../../lib/telemetry/constants';
export interface UpgradeError {
key?: string;
message: string | string[];
}
export function sendTelemetryEvents(
logger: Logger,
eventsTelemetry: TelemetryEventsSender | undefined,
updateEvent: MonitorUpdateEvent
) {
if (eventsTelemetry === undefined) {
return;
}
try {
eventsTelemetry.queueTelemetryEvents(MONITOR_UPDATE_CHANNEL, [updateEvent]);
eventsTelemetry.queueTelemetryEvents(MONITOR_CURRENT_CHANNEL, [updateEvent]);
} catch (exc) {
logger.error(`queing telemetry events failed ${exc}`);
}
}
export function formatTelemetryEvent({
monitor,
kibanaVersion,
lastUpdatedAt,
durationSinceLastUpdated,
deletedAt,
errors,
}: {
monitor: SavedObject<SyntheticsMonitor>;
kibanaVersion: string;
lastUpdatedAt?: string;
durationSinceLastUpdated?: number;
deletedAt?: string;
errors?: ServiceLocationErrors;
}) {
const { attributes } = monitor;
return {
updatedAt: deletedAt || monitor.updated_at,
lastUpdatedAt,
durationSinceLastUpdated,
deletedAt,
type: attributes[ConfigKey.MONITOR_TYPE],
locations: attributes[ConfigKey.LOCATIONS].map((location) =>
location.isServiceManaged ? location.id : 'other'
), // mark self-managed locations as other
locationsCount: attributes[ConfigKey.LOCATIONS].length,
monitorNameLength: attributes[ConfigKey.NAME].length,
monitorInterval: parseInt(attributes[ConfigKey.SCHEDULE].number, 10) * 60 * 1000,
stackVersion: kibanaVersion,
scriptType: getScriptType(attributes as MonitorFields),
errors:
errors && errors?.length
? errors.map((e) => ({
locationId: e.locationId,
error: {
// don't expose failed_monitors on error object
status: e.error?.status,
reason: e.error?.reason,
},
}))
: undefined,
configId: sha256.create().update(monitor.id).hex(),
revision: attributes[ConfigKey.REVISION],
};
}
export function formatTelemetryUpdateEvent(
currentMonitor: SavedObjectsUpdateResponse<SyntheticsMonitor>,
previousMonitor: SavedObject<SyntheticsMonitor>,
kibanaVersion: string,
errors?: ServiceLocationErrors
) {
let durationSinceLastUpdated: number = 0;
if (currentMonitor.updated_at && previousMonitor.updated_at) {
durationSinceLastUpdated =
new Date(currentMonitor.updated_at).getTime() -
new Date(previousMonitor.updated_at).getTime();
}
return formatTelemetryEvent({
monitor: currentMonitor as SavedObject<SyntheticsMonitor>,
kibanaVersion,
durationSinceLastUpdated,
lastUpdatedAt: previousMonitor.updated_at,
errors,
});
}
export function formatTelemetryDeleteEvent(
previousMonitor: SavedObject<SyntheticsMonitor>,
kibanaVersion: string,
deletedAt: string,
errors?: ServiceLocationErrors
) {
let durationSinceLastUpdated: number = 0;
if (deletedAt && previousMonitor.updated_at) {
durationSinceLastUpdated =
new Date(deletedAt).getTime() - new Date(previousMonitor.updated_at).getTime();
}
return formatTelemetryEvent({
monitor: previousMonitor as SavedObject<SyntheticsMonitor>,
kibanaVersion,
durationSinceLastUpdated,
lastUpdatedAt: previousMonitor.updated_at,
deletedAt,
errors,
});
}
function getScriptType(attributes: MonitorFields): 'inline' | 'recorder' | 'zip' | undefined {
if (attributes[ConfigKey.SOURCE_ZIP_URL]) {
return 'zip';
} else if (
attributes[ConfigKey.SOURCE_INLINE] &&
attributes[ConfigKey.METADATA].script_source?.is_generated_script
) {
return 'recorder';
} else if (attributes[ConfigKey.SOURCE_INLINE]) {
return 'inline';
}
return undefined;
}

View file

@ -53,7 +53,7 @@ export default function ({ getService }: FtrProviderContext) {
[ConfigKey.NAME]: 'Modified name',
};
const modifiedMonitor = { ...savedMonitor, ...updates };
const modifiedMonitor = { ...savedMonitor, ...updates, revision: 2 };
const editResponse = await supertest
.put(API_URLS.SYNTHETICS_MONITORS + '/' + monitorId)

View file

@ -58,5 +58,6 @@
},
"url": "https://example-url.com"
}],
"namespace": "testnamespace"
"namespace": "testnamespace",
"revision": 1
}

View file

@ -19,7 +19,7 @@ interface CheckProps {
mogrify?: (doc: any) => any;
refresh?: boolean;
tls?: boolean | TlsProps;
isFleetManaged?: boolean;
customIndex?: string;
}
const getRandomMonitorId = () => {
@ -33,11 +33,11 @@ export const makeCheck = async ({
mogrify = (d) => d,
refresh = true,
tls = false,
isFleetManaged = false,
customIndex,
}: CheckProps): Promise<{ monitorId: string; docs: any }> => {
const cgFields = {
monitor: {
check_group: uuid.v4(),
check_group: fields.monitor?.check_group || uuid.v4(),
},
};
@ -52,18 +52,10 @@ export const makeCheck = async ({
ip: `127.0.0.${i}`,
},
});
if (i === numIps - 1) {
if (i === numIps - 1 && fields.summary !== null) {
pingFields.summary = summary;
}
const doc = await makePing(
es,
monitorId,
pingFields,
mogrify,
false,
tls as any,
isFleetManaged
);
const doc = await makePing(es, monitorId, pingFields, mogrify, false, tls as any, customIndex);
docs.push(doc);
// @ts-ignore
summary[doc.monitor.status]++;
@ -85,7 +77,7 @@ export const makeChecks = async (
fields: { [key: string]: any } = {},
mogrify: (doc: any) => any = (d) => d,
refresh: boolean = true,
isFleetManaged: boolean = false
customIndex?: string
) => {
const checks = [];
const oldestTime = new Date().getTime() - numChecks * every;
@ -109,7 +101,7 @@ export const makeChecks = async (
fields,
mogrify,
refresh: false,
isFleetManaged,
customIndex,
});
checks.push(docs);
}
@ -131,7 +123,7 @@ export const makeChecksWithStatus = async (
status: 'up' | 'down',
mogrify: (doc: any) => any = (d) => d,
refresh: boolean = true,
isFleetManaged: boolean = false
customIndex?: string
) => {
const oppositeStatus = status === 'up' ? 'down' : 'up';
@ -152,7 +144,7 @@ export const makeChecksWithStatus = async (
return mogrify(d);
},
refresh,
isFleetManaged
customIndex
);
};

View file

@ -11,7 +11,6 @@ import type { Client } from '@elastic/elasticsearch';
import { makeTls, TlsProps } from './make_tls';
const DEFAULT_INDEX_NAME = 'heartbeat-8-generated-test';
const DATA_STREAM_INDEX_NAME = 'synthetics-http-default';
export const makePing = async (
es: Client,
@ -20,7 +19,7 @@ export const makePing = async (
mogrify: (doc: any) => any,
refresh: boolean = true,
tls: boolean | TlsProps = false,
isFleetManaged: boolean | undefined = false
customIndex?: string
) => {
const timestamp = new Date();
const baseDoc: any = {
@ -118,7 +117,7 @@ export const makePing = async (
const doc = mogrify(merge(baseDoc, fields));
await es.index({
index: isFleetManaged ? DATA_STREAM_INDEX_NAME : DEFAULT_INDEX_NAME,
index: customIndex || DEFAULT_INDEX_NAME,
refresh,
body: doc,
});

View file

@ -81,7 +81,7 @@ export default function ({ getService }: FtrProviderContext) {
'up',
undefined,
undefined,
true
testDataStreamName
);
await makeChecksWithStatus(
@ -100,7 +100,7 @@ export default function ({ getService }: FtrProviderContext) {
'down',
undefined,
undefined,
true
testDataStreamName
);
await makeChecksWithStatus(
@ -118,7 +118,7 @@ export default function ({ getService }: FtrProviderContext) {
'down',
undefined,
undefined,
true
testDataStreamName
);
await makeChecksWithStatus(
@ -137,7 +137,7 @@ export default function ({ getService }: FtrProviderContext) {
'down',
undefined,
undefined,
true
testDataStreamName
);
await makeChecksWithStatus(
@ -150,7 +150,7 @@ export default function ({ getService }: FtrProviderContext) {
'down',
undefined,
undefined,
true
testDataStreamName
);
await client.indices.refresh();
});