[EBT] Validate event&context payloads in dev-mode (#135061)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2022-07-04 15:42:08 +02:00 committed by GitHub
parent 364ab545a4
commit dd43c2d129
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 1177 additions and 327 deletions

View file

@ -36,6 +36,8 @@ NPM_MODULE_EXTRA_FILES = [
# "@npm//name-of-package"
# eg. "@npm//lodash"
RUNTIME_DEPS = [
"@npm//fp-ts",
"@npm//io-ts",
"@npm//rxjs",
]
@ -51,6 +53,8 @@ RUNTIME_DEPS = [
TYPES_DEPS = [
"@npm//@types/node",
"@npm//@types/jest",
"@npm//fp-ts",
"@npm//io-ts",
"@npm//rxjs",
"//packages/kbn-logging:npm_module_types",
"//packages/kbn-logging-mocks:npm_module_types",

View file

@ -329,3 +329,11 @@ The `_meta` adds the invaluable information of a `description` and whether a fie
It can be attached to any schema definition as seen in the examples above. For high-order types, like arrays or objects, the `_meta` field is optional. For first-order types, like numbers, strings, booleans or `pass_through`, the `_meta` key is mandatory.
The field `_meta.optional` is not required unless the schema is describing an optional field. In that case, `_meta.optional: true` is required. However, it's highly encouraged to be explicit about declaring it even when the described field is not optional.
### Schema Validation
Apart from documentation, the schema is used to validate the payload during the dev cycle. This adds an extra layer of confidence over the data to be sent.
The validation, however, is disabled in production because users cannot do anything to fix the bug after it is released. Additionally, receiving _buggy_ events can be considered an additional insight into how our users use our products. For example, the buggy event can be caused by a user following an unexpected path in the UI like clicking an "Upload" button when the file has not been selected [#125013](https://github.com/elastic/kibana/issues/125013). In those cases, receiving the _incomplete_ event tells us the user didn't select a file, but they still hit the "Upload" button.
The validation is performed with the `io-ts` library. In order to do that, the schema is firstly parsed into the `io-ts` equivalent, and then used to validate the event & context payloads.

View file

@ -7,16 +7,14 @@
*/
// eslint-disable-next-line max-classes-per-file
import type { Observable } from 'rxjs';
import { BehaviorSubject, firstValueFrom, lastValueFrom, Subject } from 'rxjs';
import { Subject, lastValueFrom, take, toArray } from 'rxjs';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import type { MockedLogger } from '@kbn/logging-mocks';
import { loggerMock } from '@kbn/logging-mocks';
import { AnalyticsClient } from './analytics_client';
import { take, toArray } from 'rxjs/operators';
import { shippersMock } from '../shippers/mocks';
import type { EventContext, TelemetryCounter } from '../events';
import { TelemetryCounterType } from '../events';
import type { TelemetryCounter } from '../events';
import { ContextService } from './context_service';
describe('AnalyticsClient', () => {
let analyticsClient: AnalyticsClient;
@ -105,6 +103,28 @@ describe('AnalyticsClient', () => {
);
});
test('fails to validate the event and throws', () => {
analyticsClient.registerEventType({
eventType: 'testEvent',
schema: {
a_field: {
type: 'keyword',
_meta: {
description: 'description of a_field',
},
},
},
});
analyticsClient.optIn({ global: { enabled: true } });
expect(
() => analyticsClient.reportEvent('testEvent', { a_field: 100 }) // a_field is expected to be a string
).toThrowErrorMatchingInlineSnapshot(`
"Failed to validate payload coming from \\"Event Type 'testEvent'\\":
- [a_field]: {\\"expected\\":\\"string\\",\\"actual\\":\\"number\\",\\"value\\":100}"
`);
});
test('enqueues multiple events before specifying the optIn consent and registering a shipper', async () => {
analyticsClient.registerEventType({
eventType: 'testEvent',
@ -267,7 +287,7 @@ describe('AnalyticsClient', () => {
const counterEventPromise = lastValueFrom(analyticsClient.telemetryCounter$.pipe(take(1)));
const counter: TelemetryCounter = {
type: TelemetryCounterType.succeeded,
type: 'succeeded',
source: 'a random value',
event_type: 'eventTypeA',
code: '200',
@ -390,15 +410,16 @@ describe('AnalyticsClient', () => {
);
});
describe('registerContextProvider', () => {
let globalContext$: Observable<Partial<EventContext>>;
describe('ContextProvider APIs', () => {
let contextService: ContextService;
beforeEach(() => {
// eslint-disable-next-line dot-notation
globalContext$ = analyticsClient['context$'];
contextService = analyticsClient['contextService'];
});
test('Registers a context provider', async () => {
test('Registers a context provider', () => {
const registerContextProviderSpy = jest.spyOn(contextService, 'registerContextProvider');
const context$ = new Subject<{ a_field: boolean }>();
analyticsClient.registerContextProvider({
name: 'contextProviderA',
@ -413,17 +434,8 @@ describe('AnalyticsClient', () => {
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(2), toArray()));
context$.next({ a_field: true });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ a_field: true },
]);
});
test('It does not break if context emits `undefined`', async () => {
const context$ = new Subject<{ a_field: boolean } | undefined | void>();
analyticsClient.registerContextProvider({
expect(registerContextProviderSpy).toHaveBeenCalledTimes(1);
expect(registerContextProviderSpy).toHaveBeenCalledWith({
name: 'contextProviderA',
schema: {
a_field: {
@ -435,241 +447,13 @@ describe('AnalyticsClient', () => {
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(3), toArray()));
context$.next();
context$.next(undefined);
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{},
{},
]);
});
test('It does not break for BehaviourSubjects (emitting as soon as they connect)', async () => {
const context$ = new BehaviorSubject<{ a_field: boolean }>({ a_field: true });
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(1), toArray()));
await expect(globalContextPromise).resolves.toEqual([
{ a_field: true }, // No original empty state
]);
});
test('Merges all the contexts together', async () => {
const contextA$ = new Subject<{ a_field: boolean }>();
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$: contextA$,
});
const contextB$ = new Subject<{ a_field?: boolean; b_field: number }>();
analyticsClient.registerContextProvider({
name: 'contextProviderB',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
optional: true,
},
},
b_field: {
type: 'long',
_meta: {
description: 'b_field description',
},
},
},
context$: contextB$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(6), toArray()));
contextA$.next({ a_field: true });
contextB$.next({ b_field: 1 });
contextB$.next({ a_field: false, b_field: 1 });
contextA$.next({ a_field: true });
contextB$.next({ b_field: 2 });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ a_field: true },
{ a_field: true, b_field: 1 }, // Merged A & B
{ a_field: false, b_field: 1 }, // a_field updated from B
{ a_field: false, b_field: 1 }, // a_field still from B because it was registered later.
// We may want to change this last behaviour in the future but, for now, it's fine.
{ a_field: true, b_field: 2 }, // a_field is now taken from A because B is not providing it yet.
]);
});
test('The global context is not polluted by context providers removing reported fields', async () => {
const context$ = new Subject<{ a_field?: boolean; b_field: number }>();
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
optional: true,
},
},
b_field: {
type: 'long',
_meta: {
description: 'b_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(6), toArray()));
context$.next({ b_field: 1 });
context$.next({ a_field: false, b_field: 1 });
context$.next({ a_field: true, b_field: 1 });
context$.next({ b_field: 1 });
context$.next({ a_field: true, b_field: 2 });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ b_field: 1 },
{ a_field: false, b_field: 1 },
{ a_field: true, b_field: 1 },
{ b_field: 1 }, // a_field is removed because the context provider removed it.
{ a_field: true, b_field: 2 },
]);
});
test('The undefined values are not forwarded to the global context', async () => {
const context$ = new Subject<{ a_field?: boolean; b_field: number }>();
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
optional: true,
},
},
b_field: {
type: 'long',
_meta: {
description: 'b_field description',
},
},
},
context$,
});
const globalContextPromise = firstValueFrom(globalContext$.pipe(take(6), toArray()));
context$.next({ b_field: 1 });
context$.next({ a_field: false, b_field: 1 });
context$.next({ a_field: true, b_field: 1 });
context$.next({ b_field: 1 });
context$.next({ a_field: undefined, b_field: 2 });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ b_field: 1 },
{ a_field: false, b_field: 1 },
{ a_field: true, b_field: 1 },
{ b_field: 1 }, // a_field is removed because the context provider removed it.
{ b_field: 2 }, // a_field is not forwarded because it is `undefined`
]);
});
test('Fails to register 2 context providers with the same name', () => {
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$: new Subject<{ a_field: boolean }>(),
});
expect(() => {
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$: new Subject<{ a_field: boolean }>(),
});
}).toThrowErrorMatchingInlineSnapshot(
`"Context provider with name 'contextProviderA' already registered"`
);
});
test('Does not remove the context provider after it completes', async () => {
const context$ = new Subject<{ a_field: boolean }>();
const contextProvidersRegistry =
// eslint-disable-next-line dot-notation
analyticsClient['contextService']['contextProvidersRegistry'];
// The context registry is empty
expect(contextProvidersRegistry.size).toBe(0);
analyticsClient.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(4), toArray()));
context$.next({ a_field: true });
// The size of the registry grows on the first emission
expect(contextProvidersRegistry.size).toBe(1);
context$.next({ a_field: true });
// Still in the registry
expect(contextProvidersRegistry.size).toBe(1);
context$.complete();
// Still in the registry
expect(contextProvidersRegistry.size).toBe(1);
test('Removes a context provider', () => {
const removeContextProviderSpy = jest.spyOn(contextService, 'removeContextProvider');
analyticsClient.removeContextProvider('contextProviderA');
// The context provider is removed from the registry
expect(contextProvidersRegistry.size).toBe(0);
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ a_field: true },
{ a_field: true },
{},
]);
expect(removeContextProviderSpy).toHaveBeenCalledTimes(1);
expect(removeContextProviderSpy).toHaveBeenCalledWith('contextProviderA');
});
});

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import type { Type } from 'io-ts';
import type { Observable } from 'rxjs';
import { BehaviorSubject, Subject, combineLatest, from, merge } from 'rxjs';
import {
@ -23,6 +24,7 @@ import {
takeUntil,
tap,
} from 'rxjs/operators';
import type { LogMeta } from '@kbn/logging';
import type { IShipper } from '../shippers';
import type {
AnalyticsClientInitContext,
@ -35,10 +37,14 @@ import type {
ShipperClassConstructor,
} from './types';
import type { Event, EventContext, EventType, TelemetryCounter } from '../events';
import { TelemetryCounterType } from '../events';
import { ShippersRegistry } from './shippers_registry';
import { OptInConfigService } from './opt_in_config';
import { ContextService } from './context_service';
import { schemaToIoTs, validateSchema } from '../schema/validation';
interface EventDebugLogMeta extends LogMeta {
ebt_event: Event;
}
export class AnalyticsClient implements IAnalyticsClient {
private readonly internalTelemetryCounter$ = new Subject<TelemetryCounter>();
@ -57,7 +63,10 @@ export class AnalyticsClient implements IAnalyticsClient {
* @private
*/
private readonly shipperRegistered$ = new Subject<void>();
private readonly eventTypeRegistry = new Map<EventType, EventTypeOpts<unknown>>();
private readonly eventTypeRegistry = new Map<
EventType,
EventTypeOpts<unknown> & { validator?: Type<Record<string, unknown>> }
>();
private readonly contextService: ContextService;
private readonly context$ = new BehaviorSubject<Partial<EventContext>>({});
private readonly optInConfig$ = new BehaviorSubject<OptInConfigService | undefined>(undefined);
@ -71,7 +80,11 @@ export class AnalyticsClient implements IAnalyticsClient {
);
constructor(private readonly initContext: AnalyticsClientInitContext) {
this.contextService = new ContextService(this.context$, this.initContext.isDev);
this.contextService = new ContextService(
this.context$,
this.initContext.isDev,
this.initContext.logger.get('context-service')
);
this.reportEnqueuedEventsWhenClientIsReady();
}
@ -83,16 +96,17 @@ export class AnalyticsClient implements IAnalyticsClient {
const timestamp = new Date().toISOString();
this.internalTelemetryCounter$.next({
type: TelemetryCounterType.enqueued,
type: 'enqueued',
source: 'client',
event_type: eventType,
code: 'enqueued',
count: 1,
});
if (!this.eventTypeRegistry.get(eventType)) {
const eventTypeOpts = this.eventTypeRegistry.get(eventType);
if (!eventTypeOpts) {
this.internalTelemetryCounter$.next({
type: TelemetryCounterType.dropped,
type: 'dropped',
source: 'client',
event_type: eventType,
code: 'UnregisteredType',
@ -103,15 +117,9 @@ export class AnalyticsClient implements IAnalyticsClient {
);
}
if (this.initContext.isDev) {
// TODO: In the future we may need to validate the eventData based on the eventType's registered schema (only if isDev)
}
const optInConfig = this.optInConfig$.value;
if (optInConfig?.isEventTypeOptedIn(eventType) === false) {
// If opted out, skip early
return;
// If the validator is registered (dev-mode only), perform the validation.
if (eventTypeOpts.validator) {
validateSchema(`Event Type '${eventType}'`, eventTypeOpts.validator, eventData);
}
const event: Event = {
@ -121,6 +129,20 @@ export class AnalyticsClient implements IAnalyticsClient {
properties: eventData,
};
// debug-logging before checking the opt-in status to help during development
if (this.initContext.isDev) {
this.initContext.logger.debug<EventDebugLogMeta>(`Report event "${eventType}"`, {
ebt_event: event,
});
}
const optInConfig = this.optInConfig$.value;
if (optInConfig?.isEventTypeOptedIn(eventType) === false) {
// If opted out, skip early
return;
}
if (typeof optInConfig === 'undefined') {
// If the opt-in config is not provided yet, we need to enqueue the event to an internal queue
this.internalEventQueue$.next(event);
@ -133,7 +155,11 @@ export class AnalyticsClient implements IAnalyticsClient {
if (this.eventTypeRegistry.get(eventTypeOps.eventType)) {
throw new Error(`Event Type "${eventTypeOps.eventType}" is already registered.`);
}
this.eventTypeRegistry.set(eventTypeOps.eventType, eventTypeOps);
this.eventTypeRegistry.set(eventTypeOps.eventType, {
...eventTypeOps,
validator: this.initContext.isDev ? schemaToIoTs(eventTypeOps.schema) : undefined,
});
};
public optIn = (optInConfig: OptInConfig) => {
@ -249,7 +275,7 @@ export class AnalyticsClient implements IAnalyticsClient {
});
if (sentToShipper) {
this.internalTelemetryCounter$.next({
type: TelemetryCounterType.sent_to_shipper,
type: 'sent_to_shipper',
source: 'client',
event_type: eventType,
code: 'OK',

View file

@ -0,0 +1,353 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { loggerMock, type MockedLogger } from '@kbn/logging-mocks';
import { BehaviorSubject, firstValueFrom, lastValueFrom, Subject, take, toArray } from 'rxjs';
import type { EventContext } from '../events';
import { ContextService } from './context_service';
describe('ContextService', () => {
let globalContext$: Subject<Partial<EventContext>>;
let contextService: ContextService;
let logger: MockedLogger;
beforeEach(() => {
globalContext$ = new BehaviorSubject<Partial<EventContext>>({});
logger = loggerMock.create();
contextService = new ContextService(globalContext$, true, logger);
});
test('Registers a context provider', async () => {
const context$ = new Subject<{ a_field: boolean }>();
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(2), toArray()));
context$.next({ a_field: true });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ a_field: true },
]);
});
test('It does not break if context emits `undefined`', async () => {
contextService = new ContextService(
globalContext$,
false, // setting to `false` so the validation piece of logic does not kick in.
logger
);
const context$ = new Subject<{ a_field: boolean } | undefined | void>();
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(3), toArray()));
context$.next();
context$.next(undefined);
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{},
{},
]);
});
test('It does not break for BehaviourSubjects (emitting as soon as they connect)', async () => {
const context$ = new BehaviorSubject<{ a_field: boolean }>({ a_field: true });
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(1), toArray()));
await expect(globalContextPromise).resolves.toEqual([
{ a_field: true }, // No original empty state
]);
});
test('Merges all the contexts together', async () => {
const contextA$ = new Subject<{ a_field: boolean }>();
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$: contextA$,
});
const contextB$ = new Subject<{ a_field?: boolean; b_field: number }>();
contextService.registerContextProvider({
name: 'contextProviderB',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
optional: true,
},
},
b_field: {
type: 'long',
_meta: {
description: 'b_field description',
},
},
},
context$: contextB$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(6), toArray()));
contextA$.next({ a_field: true });
contextB$.next({ b_field: 1 });
contextB$.next({ a_field: false, b_field: 1 });
contextA$.next({ a_field: true });
contextB$.next({ b_field: 2 });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ a_field: true },
{ a_field: true, b_field: 1 }, // Merged A & B
{ a_field: false, b_field: 1 }, // a_field updated from B
{ a_field: false, b_field: 1 }, // a_field still from B because it was registered later.
// We may want to change this last behaviour in the future but, for now, it's fine.
{ a_field: true, b_field: 2 }, // a_field is now taken from A because B is not providing it yet.
]);
});
test('The global context is not polluted by context providers removing reported fields', async () => {
const context$ = new Subject<{ a_field?: boolean; b_field: number }>();
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
optional: true,
},
},
b_field: {
type: 'long',
_meta: {
description: 'b_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(6), toArray()));
context$.next({ b_field: 1 });
context$.next({ a_field: false, b_field: 1 });
context$.next({ a_field: true, b_field: 1 });
context$.next({ b_field: 1 });
context$.next({ a_field: true, b_field: 2 });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ b_field: 1 },
{ a_field: false, b_field: 1 },
{ a_field: true, b_field: 1 },
{ b_field: 1 }, // a_field is removed because the context provider removed it.
{ a_field: true, b_field: 2 },
]);
});
test('The undefined values are not forwarded to the global context', async () => {
const context$ = new Subject<{ a_field?: boolean; b_field: number }>();
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
optional: true,
},
},
b_field: {
type: 'long',
_meta: {
description: 'b_field description',
},
},
},
context$,
});
const globalContextPromise = firstValueFrom(globalContext$.pipe(take(6), toArray()));
context$.next({ b_field: 1 });
context$.next({ a_field: false, b_field: 1 });
context$.next({ a_field: true, b_field: 1 });
context$.next({ b_field: 1 });
context$.next({ a_field: undefined, b_field: 2 });
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ b_field: 1 },
{ a_field: false, b_field: 1 },
{ a_field: true, b_field: 1 },
{ b_field: 1 }, // a_field is removed because the context provider removed it.
{ b_field: 2 }, // a_field is not forwarded because it is `undefined`
]);
});
test('Fails to register 2 context providers with the same name', () => {
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$: new Subject<{ a_field: boolean }>(),
});
expect(() => {
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$: new Subject<{ a_field: boolean }>(),
});
}).toThrowErrorMatchingInlineSnapshot(
`"Context provider with name 'contextProviderA' already registered"`
);
});
test('Does not remove the context provider after it completes', async () => {
const context$ = new Subject<{ a_field: boolean }>();
// eslint-disable-next-line dot-notation
const contextProvidersRegistry = contextService['contextProvidersRegistry'];
// The context registry is empty
expect(contextProvidersRegistry.size).toBe(0);
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(4), toArray()));
context$.next({ a_field: true });
// The size of the registry grows on the first emission
expect(contextProvidersRegistry.size).toBe(1);
context$.next({ a_field: true });
// Still in the registry
expect(contextProvidersRegistry.size).toBe(1);
context$.complete();
// Still in the registry
expect(contextProvidersRegistry.size).toBe(1);
contextService.removeContextProvider('contextProviderA');
// The context provider is removed from the registry
expect(contextProvidersRegistry.size).toBe(0);
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ a_field: true },
{ a_field: true },
{},
]);
});
test('validates the input and logs the error if invalid', () => {
const context$ = new Subject<{ a_field: boolean } | undefined>();
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
context$.next(undefined);
expect(logger.error).toHaveBeenCalledTimes(1);
expect((logger.error.mock.calls[0][0] as Error).message).toContain(
`Failed to validate payload coming from "Context Provider 'contextProviderA'"`
);
});
test('it does not stop the subscription after an error', async () => {
const context$ = new Subject<{ a_field: boolean } | undefined>();
contextService.registerContextProvider({
name: 'contextProviderA',
schema: {
a_field: {
type: 'boolean',
_meta: {
description: 'a_field description',
},
},
},
context$,
});
const globalContextPromise = lastValueFrom(globalContext$.pipe(take(2), toArray()));
context$.next({ a_field: '123' as unknown as boolean }); // cause the error
expect(logger.error).toHaveBeenCalledTimes(1);
expect((logger.error.mock.calls[0][0] as Error).message).toContain(
`Failed to validate payload coming from "Context Provider 'contextProviderA'"`
);
context$.next({ a_field: true }); // send a good one
await expect(globalContextPromise).resolves.toEqual([
{}, // Original empty state
{ a_field: true }, // 2nd emission (the errored one does not spread)
]);
});
});

View file

@ -7,9 +7,11 @@
*/
import type { Subject, Subscription } from 'rxjs';
import { tap } from 'rxjs/operators';
import { filter } from 'rxjs';
import type { Logger } from '@kbn/logging';
import type { EventContext } from '../events';
import type { ContextProviderName, ContextProviderOpts } from './types';
import { schemaToIoTs, validateSchema } from '../schema/validation';
export class ContextService {
private readonly contextProvidersRegistry = new Map<ContextProviderName, Partial<EventContext>>();
@ -17,24 +19,42 @@ export class ContextService {
constructor(
private readonly context$: Subject<Partial<EventContext>>,
private readonly isDevMode: boolean
private readonly isDevMode: boolean,
private readonly logger: Logger
) {}
/**
* Registers a context provider, and subscribes to any updates from it.
* @param contextProviderOpts The options to register the context provider {@link ContextProviderOpts}
*/
public registerContextProvider<Context>({ name, context$ }: ContextProviderOpts<Context>) {
public registerContextProvider<Context>({
name,
context$,
schema,
}: ContextProviderOpts<Context>) {
if (this.contextProvidersSubscriptions.has(name)) {
throw new Error(`Context provider with name '${name}' already registered`);
}
// Declare the validator only in dev-mode
const validator = this.isDevMode ? schemaToIoTs(schema) : undefined;
const subscription = context$
.pipe(
tap((ctx) => {
if (this.isDevMode) {
// TODO: In the future we may need to validate the input of the context based on the schema (only if isDev)
filter((context) => {
if (validator) {
try {
validateSchema(
`Context Provider '${name}'`,
validator,
context as Record<string, unknown>
);
} catch (validationError) {
this.logger.error(validationError);
return false;
}
}
return true;
})
)
.subscribe((context) => {

View file

@ -6,5 +6,10 @@
* Side Public License, v 1.
*/
export type { Event, EventType, EventContext, TelemetryCounter } from './types';
export { TelemetryCounterType } from './types';
export type {
Event,
EventType,
EventContext,
TelemetryCounter,
TelemetryCounterType,
} from './types';

View file

@ -65,39 +65,26 @@ export interface EventContext {
export type EventType = string;
/**
* Types of the Telemetry Counter: It allows to differentiate what happened to the events
* Indicates if the event contains data about succeeded, failed or dropped events:
* - enqueued: The event was accepted and will be sent to the shippers when they become available (and opt-in === true).
* - sent_to_shipper: The event was sent to at least one shipper.
* - succeeded: The event was successfully sent by the shipper.
* - failed: There was an error when processing/shipping the event. Refer to the Telemetry Counter's code for the reason.
* - dropped: The event was dropped from the queue. Refer to the Telemetry Counter's code for the reason.
*/
export enum TelemetryCounterType {
/**
* The event was accepted and will be sent to the shippers when they become available (and opt-in === true).
*/
enqueued = 'enqueued',
/**
* The event was sent to at least one shipper.
*/
sent_to_shipper = 'sent_to_shipper',
/**
* The event was successfully sent by the shipper.
*/
succeeded = 'succeeded',
/**
* There was an error when processing/shipping the event.
* Refer to the Telemetry Counter's code for the reason.
*/
failed = 'failed',
/**
* The event was dropped from the queue.
* Refer to the Telemetry Counter's code for the reason.
*/
dropped = 'dropped',
}
export type TelemetryCounterType =
| 'enqueued'
| 'sent_to_shipper'
| 'succeeded'
| 'failed'
| 'dropped';
/**
* Shape of the events emitted by the telemetryCounter$ observable
*/
export interface TelemetryCounter {
/**
* Indicates if the event contains data about succeeded, failed or dropped events.
* {@link TelemetryCounterType}
*/
type: TelemetryCounterType;
/**

View file

@ -37,8 +37,13 @@ export type {
EventTypeOpts,
} from './analytics_client';
export type { Event, EventContext, EventType, TelemetryCounter } from './events';
export { TelemetryCounterType } from './events';
export type {
Event,
EventContext,
EventType,
TelemetryCounter,
TelemetryCounterType,
} from './events';
export type {
RootSchema,

View file

@ -54,7 +54,7 @@ export interface SchemaChildValue<Value> {
}
/**
* Type that defines all the possible values that the Schema accepts.
* Type that defines all the possible values that the Schema accepts.
* These types definitions are helping to identify earlier the possible missing `properties` nesting when
* manually defining the schemas.
*/

View file

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as t from 'io-ts';
import { either, isLeft } from 'fp-ts/lib/Either';
import { excess } from './excess';
describe('excess', () => {
test('should pass validation when not found extra properties', () => {
const validator = excess(t.interface({ a_string: t.string, a_number: t.number }));
const invalidObj = { a_string: 'test', a_number: 1 };
expect(validator.is(invalidObj)).toBe(true);
const result = validator.decode(invalidObj);
expect(isLeft(result)).toBe(false);
});
test('should not pass validation when found extra properties', () => {
const validator = excess(t.interface({ a_string: t.string, a_number: t.number }));
const invalidObj = { a_string: 'test', a_number: 1, another_string: 'test' };
expect(validator.is(invalidObj)).toBe(false);
const result = validator.decode(invalidObj);
expect(isLeft(result)).toBe(true);
either.mapLeft(result, (validationError) =>
expect(validationError[0].message).toBe(`excess key 'another_string' found`)
);
});
test('should not pass validation when found a non-declared property in an all-optional object', () => {
const validator = excess(t.partial({ a_string: t.string, a_number: t.number }));
const invalidObj = { another_string: 'test' };
expect(validator.is(invalidObj)).toBe(false);
const result = validator.decode(invalidObj);
expect(isLeft(result)).toBe(true);
either.mapLeft(result, (validationErrors) =>
expect(validationErrors.map((err) => err.message)).toStrictEqual([
`excess key 'another_string' found`,
])
);
});
});

View file

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
// Extra IO-TS type to not allow more keys than the defined ones.
// Extracted from https://github.com/gcanti/io-ts/issues/322
import * as t from 'io-ts';
import { either, Either, isRight, left, right, Right } from 'fp-ts/lib/Either';
const getIsCodec =
<T extends t.Any>(tag: string) =>
(codec: t.Any): codec is T =>
(codec as t.Any & { _tag: string })._tag === tag;
const isInterfaceCodec = getIsCodec<t.InterfaceType<t.Props>>('InterfaceType');
const isPartialCodec = getIsCodec<t.PartialType<t.Props>>('PartialType');
const isIntersectionType = getIsCodec<t.IntersectionType<t.Mixed[]>>('IntersectionType');
const getProps = (codec: t.HasProps): t.Props => {
switch (codec._tag) {
case 'RefinementType':
case 'ReadonlyType':
return getProps(codec.type);
case 'InterfaceType':
case 'StrictType':
case 'PartialType':
return codec.props;
case 'IntersectionType':
return codec.types.reduce<t.Props>((props, type) => Object.assign(props, getProps(type)), {});
}
};
const getNameFromProps = (props: t.Props, isPartial: boolean): string =>
Object.keys(props)
.map((k) => `${k}${isPartial ? '?' : ''}: ${props[k].name}`)
.join(', ');
/**
* Provides a human-readable definition of the io-ts validator.
* @param codec The io-ts declaration passed as an argument to the Excess method.
* @remarks Since we currently use it only with objects, we'll cover the IntersectionType and PartialType
*/
const getExcessTypeName = (codec: t.Any): string => {
if (isIntersectionType(codec)) {
return `{ ${codec.types
.map((subCodec) => {
if (isInterfaceCodec(subCodec)) {
return getNameFromProps(subCodec.props, false);
}
if (isPartialCodec(subCodec)) {
return getNameFromProps(subCodec.props, true);
}
return subCodec.name;
})
.filter(Boolean)
.join(', ')} }`;
}
return `Excess<${codec.name}>`;
};
const stripKeys = <T>(o: T, props: t.Props): Either<string[], T> => {
const keys = Object.getOwnPropertyNames(o);
const propsKeys = Object.getOwnPropertyNames(props);
propsKeys.forEach((pk) => {
const index = keys.indexOf(pk);
if (index !== -1) {
keys.splice(index, 1);
}
});
return keys.length ? left(keys) : right(o);
};
/**
* Validate if there are any keys that exist in the validated object, but they don't in the validation object.
* @param codec The io-ts schema to wrap with this validation
* @param name (optional) Replace the custom logic to name the validation error by providing a static name.
*/
export const excess = <C extends t.HasProps>(
codec: C,
name: string = getExcessTypeName(codec)
): ExcessType<C> => {
const props: t.Props = getProps(codec);
return new ExcessType<C>(
name,
(u): u is C => isRight(stripKeys(u, props)) && codec.is(u),
(u, c) =>
either.chain(t.UnknownRecord.validate(u, c), () =>
either.chain(codec.validate(u, c), (a) =>
either.mapLeft(stripKeys<C>(a, props), (keys) =>
keys.map((k) => ({
value: a[k],
context: c,
message: `excess key '${k}' found`,
}))
)
)
),
(a) => codec.encode((stripKeys(a, props) as Right<any>).right),
codec
);
};
class ExcessType<C extends t.Any, A = C['_A'], O = A, I = unknown> extends t.Type<A, O, I> {
public readonly _tag: 'ExcessType' = 'ExcessType';
constructor(
name: string,
is: ExcessType<C, A, O, I>['is'],
validate: ExcessType<C, A, O, I>['validate'],
encode: ExcessType<C, A, O, I>['encode'],
public readonly type: C
) {
super(name, is, validate, encode);
}
}

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { schemaToIoTs } from './schema_to_io_ts';
export { validateSchema } from './validate_schema';

View file

@ -0,0 +1,177 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { AllowedSchemaTypes, RootSchema } from '../types';
import { schemaToIoTs } from './schema_to_io_ts';
describe(`convertSchemaToIoTs`, () => {
test('fail with anything other than an object', () => {
// @ts-expect-error
expect(() => schemaToIoTs(null)).toThrow();
});
test('invalid type => errors with malformed schema', () => {
expect(() =>
schemaToIoTs({
// @ts-expect-error Non-valid type
an_invalid_field: { type: 'invalid', _meta: { description: 'Test description' } },
})
).toThrow(/Malformed schema/);
});
test('array type missing `items` => errors with malformed schema', () => {
expect(() =>
schemaToIoTs({
// @ts-expect-error Non-valid array-construct
an_invalid_field: { type: 'array' },
})
).toThrow(/Malformed schema/);
});
test('minimal schemas and empty value => pass', () => {
const validator = schemaToIoTs({});
expect(validator.is({})).toBe(true);
});
test('value has fields not defined in the schema => fail', () => {
const validator = schemaToIoTs({});
expect(validator.is({ version: 'some-version' })).toBe(false);
expect(validator.is({ an_array: [{ docs: { missing: 1 } }] })).toBe(false);
});
test('support optional fields', () => {
const validator = schemaToIoTs<unknown>({
an_optional_field: {
type: 'keyword',
_meta: {
description: 'An optional field',
optional: true,
},
},
an_optional_obj: {
_meta: { optional: true },
properties: {
other_field: { type: 'short', _meta: { description: 'Test description' } },
},
},
an_optional_array: {
type: 'array',
items: { type: 'short', _meta: { description: 'Test description' } },
_meta: { optional: true },
},
});
expect(validator.is({})).toBe(true);
});
test('value has nested-fields not defined in the schema => fail', () => {
const schemas: Array<RootSchema<unknown>> = [
{
an_array: {
type: 'array',
_meta: { description: 'Test description' },
items: {
properties: {},
},
},
},
{
an_array: {
type: 'array',
_meta: { description: 'Test description' },
items: {
properties: { docs: { properties: {} } },
},
},
},
];
schemas.forEach((schema) => {
const validator = schemaToIoTs(schema);
expect(validator.is({ an_array: [{ docs: { missing: 1 } }] })).toBe(false);
});
});
test('value has nested-fields defined in the schema, but with wrong type => fail', () => {
const validator = schemaToIoTs({
an_array: {
type: 'array',
items: {
properties: {
docs: {
properties: {
field: { type: 'short', _meta: { description: 'Test description' } },
},
},
},
},
},
});
expect(validator.is({ an_array: [{ docs: { field: 'abc' } }] })).toBe(false);
});
test.each([
'boolean',
'byte',
'double',
'float',
'integer',
'long',
'short',
] as AllowedSchemaTypes[])('Expected type %s, but got string', (type) => {
const validator = schemaToIoTs({
a_field: { type, _meta: { description: 'Test description' } },
});
expect(validator.is({ a_field: 'abc' })).toBe(false);
});
test.each(['keyword', 'text', 'date'] as AllowedSchemaTypes[])(
'Expected type %s, but got number',
(type) => {
const validator = schemaToIoTs({
a_field: { type, _meta: { description: 'Test description' } },
});
expect(validator.is({ a_field: 1234 })).toBe(false);
}
);
test('Support DYNAMIC_KEY', () => {
const validator = schemaToIoTs({
a_field: {
properties: { DYNAMIC_KEY: { type: 'short', _meta: { description: 'Test description' } } },
},
});
expect(validator.is({ a_field: { some_key: 1234 } })).toBe(true);
});
test('Support DYNAMIC_KEY + known props', () => {
const validator = schemaToIoTs({
a_field: {
properties: {
DYNAMIC_KEY: { type: 'short', _meta: { description: 'Test description' } },
known_prop: { type: 'short', _meta: { description: 'Test description' } },
},
},
});
expect(validator.is({ a_field: { some_key: 1234, known_prop: 1234 } })).toBe(true);
});
test('value has nested-fields defined in the schema => succeed', () => {
const validator = schemaToIoTs({
an_array: {
type: 'array',
items: {
properties: {
docs: {
properties: {
field: { type: 'short', _meta: { description: 'Test description' } },
},
},
},
},
},
});
expect(validator.is({ an_array: [{ docs: { field: 1 } }] })).toBe(true);
});
test('allow pass_through properties', () => {
const validator = schemaToIoTs({
im_only_passing_through_data: {
type: 'pass_through',
_meta: { description: 'Test description' },
},
});
expect(validator.is({ im_only_passing_through_data: [{ docs: { field: 1 } }] })).toBe(true);
});
});

View file

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as t from 'io-ts';
import type { RootSchema, SchemaArray, SchemaObject, SchemaValue } from '../types';
import { excess } from './excess';
/**
* Is it a tuple of t.Mixed?
* @param schemas Array of io-ts schemas
*/
function isOneOfCandidate(schemas: t.Mixed[]): schemas is [t.Mixed, t.Mixed] {
return schemas.length === 2;
}
/**
* Converts each {@link SchemaValue} to the io-ts equivalent
* @param value The {@link SchemaValue} to parse
*/
function schemaValueToIoTs<Value>(value: SchemaValue<Value>): t.Mixed {
// We need to check the pass_through type on top of everything
if ((value as { type: 'pass_through' }).type === 'pass_through') {
return t.unknown;
}
if ('properties' in value) {
const { DYNAMIC_KEY, ...properties } = value.properties as SchemaObject<Value>['properties'] & {
DYNAMIC_KEY?: SchemaValue<unknown>;
};
const schemas: t.Mixed[] = [schemaObjectToIoTs<Record<string, unknown>>({ properties })];
if (DYNAMIC_KEY) {
schemas.push(t.record(t.string, schemaValueToIoTs(DYNAMIC_KEY)));
}
return isOneOfCandidate(schemas) ? t.union(schemas) : schemas[0];
} else {
const valueType = value.type; // Copied in here because of TS reasons, it's not available in the `default` case
switch (valueType) {
case 'boolean':
return t.boolean;
case 'keyword':
case 'text':
case 'date':
return t.string;
case 'byte':
case 'double':
case 'float':
case 'integer':
case 'long':
case 'short':
return t.number;
case 'array':
if ('items' in value) {
return t.array(schemaValueToIoTs((value as SchemaArray<unknown, unknown>).items));
}
throw new Error(`Schema type must include the "items" declaration.`);
default:
throw new Error(`Unsupported schema type ${valueType}.`);
}
}
}
/**
* Loops through a list of [key, SchemaValue] tuples to convert them into a valid io-ts parameter to define objects.
* @param entries Array of tuples [key, {@link SchemaValue}]. Typically, coming from Object.entries(SchemaObject).
*/
function entriesToObjectIoTs<Value>(
entries: Array<[string, SchemaValue<Value>]>
): Record<string, t.Mixed> {
return Object.fromEntries(
entries.map(([key, value]) => {
try {
return [key, schemaValueToIoTs(value)];
} catch (err) {
err.failedKey = [key, ...(err.failedKey || [])];
throw err;
}
})
);
}
/**
* Converts a {@link SchemaObject} to the io-ts equivalent.
* @param schemaObject The {@link SchemaObject} to parse.
*/
function schemaObjectToIoTs<Value>(
schemaObject: SchemaObject<Value>
): t.Type<Record<string, unknown>> {
const objectEntries: Array<[string, SchemaValue<unknown>]> = Object.entries(
schemaObject.properties
);
const requiredFields = objectEntries.filter(([key, { _meta }]) => _meta?.optional !== true);
const optionalFields = objectEntries.filter(([key, { _meta }]) => _meta?.optional === true);
return excess(
t.intersection([
t.interface(entriesToObjectIoTs(requiredFields)),
t.partial(entriesToObjectIoTs(optionalFields)),
])
);
}
/**
* Converts a {@link RootSchema} to an io-ts validation object.
* @param rootSchema The {@link RootSchema} to be parsed.
*/
export function schemaToIoTs<Base>(rootSchema: RootSchema<Base>): t.Type<Record<string, unknown>> {
try {
return schemaObjectToIoTs({ properties: rootSchema });
} catch (err) {
if (err.failedKey) {
err.message = `Malformed schema for key [${err.failedKey.join('.')}]: ${err.message}`;
}
throw err;
}
}

View file

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { validateSchema } from './validate_schema';
import { schemaToIoTs } from './schema_to_io_ts';
describe('validateSchema', () => {
describe('successful', () => {
test('valid object', () => {
expect(() =>
validateSchema(
'test source',
schemaToIoTs({
an_object: {
properties: { a_field: { type: 'keyword', _meta: { description: 'A test field' } } },
},
}),
{ an_object: { a_field: 'test' } }
)
).not.toThrow();
});
});
describe('failed', () => {
test('object is valid but it has some extra fields not declared in the schema', () => {
expect(() =>
validateSchema(
'test source',
schemaToIoTs({
an_object: {
properties: { a_field: { type: 'keyword', _meta: { description: 'A test field' } } },
},
}),
{ an_object: { a_field: 'test' }, another_object: { a_field: 'test' } }
)
).toThrowErrorMatchingInlineSnapshot(`
"Failed to validate payload coming from \\"test source\\":
- []: excess key 'another_object' found"
`);
});
test('object is valid but it has some extra nested fields not declared in the schema', () => {
expect(() =>
validateSchema(
'test source',
schemaToIoTs({
an_object: {
properties: { a_field: { type: 'keyword', _meta: { description: 'A test field' } } },
},
}),
{ an_object: { a_field: 'test', an_extra_field: 'test' } }
)
).toThrowErrorMatchingInlineSnapshot(`
"Failed to validate payload coming from \\"test source\\":
- [an_object]: excess key 'an_extra_field' found"
`);
});
test('the object is not valid because it is missing a key', () => {
expect(() =>
validateSchema(
'test source',
schemaToIoTs<unknown>({
an_object: {
properties: { a_field: { type: 'keyword', _meta: { description: 'A test field' } } },
},
an_optional_object: {
properties: { a_field: { type: 'keyword', _meta: { description: 'A test field' } } },
_meta: { optional: true },
},
}),
{ another_object: { a_field: 'test' } }
)
).toThrowErrorMatchingInlineSnapshot(`
"Failed to validate payload coming from \\"test source\\":
- [an_object]: {\\"expected\\":\\"{ a_field: string }\\",\\"actual\\":\\"undefined\\",\\"value\\":\\"undefined\\"}"
`);
});
test('lists multiple errors', () => {
expect(() =>
validateSchema(
'test source',
schemaToIoTs<unknown>({
an_object: {
properties: { a_field: { type: 'keyword', _meta: { description: 'A test field' } } },
},
an_optional_object: {
properties: { a_field: { type: 'keyword', _meta: { description: 'A test field' } } },
_meta: { optional: true },
},
}),
{
an_object: { a_field: 'test', an_extra_field: 'test' },
an_optional_object: {},
another_object: { a_field: 'test' },
}
)
).toThrowErrorMatchingInlineSnapshot(`
"Failed to validate payload coming from \\"test source\\":
- [an_object]: excess key 'an_extra_field' found
- [an_optional_object.a_field]: {\\"expected\\":\\"string\\",\\"actual\\":\\"undefined\\",\\"value\\":\\"undefined\\"}"
`);
});
});
});

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Context, Type } from 'io-ts';
import { either } from 'fp-ts/lib/Either';
/**
* Validates the event according to the schema validator generated by {@link convertSchemaToIoTs}.
* @throws Error when the event does not comply with the schema.
* @param validator The io-ts validator for the event.
* @param payload The payload to validate.
*/
export function validateSchema<Payload>(
sourceName: string,
validator: Type<Payload>,
payload: Payload
): void {
// Run io-ts validation to the event
const result = validator.decode(payload);
either.mapLeft(result, (validationErrors) => {
const humanFriendlyErrors = validationErrors
.map(
(err) => `[${getFullPathKey(err.context)}]: ${err.message ?? readableContext(err.context)}`
)
.filter((errMsg, idx, listOfErrMsgs) => listOfErrMsgs.indexOf(errMsg, idx + 1) === -1);
throw new Error(
`Failed to validate payload coming from "${sourceName}":\n\t- ${humanFriendlyErrors.join(
'\n\t- '
)}`
);
});
}
/**
* Picks the relevant fields of the validation error's context
* @param context The {@link Context} coming from the validation error
*/
function readableContext(context: Context) {
// The information provided, the last context is good enough.
// Otherwise, repeating the values for every nested key is too noisy.
const last = context[context.length - 1];
return JSON.stringify({
expected: last.type.name,
// Explicitly printing `undefined` to make it more obvious in the message
actual: typeof last.actual,
value: last.actual === undefined ? 'undefined' : last.actual,
});
}
/**
* Prints the full path to the key that raised the validation error.
* @param context The {@link Context} coming from the validation error
*/
function getFullPathKey(context: Context): string {
return (
context
// Remove the context provided by InterfaceType and PartialType because their keys are simply numeric indices
.filter(
(ctx) =>
!['InterfaceType', 'PartialType'].includes(
(ctx.type as Type<unknown> & { _tag: string })._tag
)
)
.map(({ key }) => key)
.filter(Boolean)
.join('.')
);
}

View file

@ -37,7 +37,6 @@ NPM_MODULE_EXTRA_FILES = [
# eg. "@npm//lodash"
RUNTIME_DEPS = [
"@npm//rxjs",
"//packages/analytics/client",
]
# In this array place dependencies necessary to build the types, which will include the

View file

@ -8,7 +8,6 @@
import { firstValueFrom, Subject, take, toArray } from 'rxjs';
import type { Event, TelemetryCounter } from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
import { createTelemetryCounterHelper } from './report_telemetry_counters';
describe('reportTelemetryCounters', () => {
@ -83,7 +82,7 @@ describe('reportTelemetryCounters', () => {
const counters = firstValueFrom(telemetryCounter$);
reportTelemetryCounters(events, {
type: TelemetryCounterType.dropped,
type: 'dropped',
code: 'my_code',
});

View file

@ -7,8 +7,7 @@
*/
import type { Subject } from 'rxjs';
import type { Event, TelemetryCounter } from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
import type { Event, TelemetryCounter, TelemetryCounterType } from '@kbn/analytics-client';
/**
* Creates a telemetry counter helper to make it easier to generate them
@ -28,13 +27,21 @@ export function createTelemetryCounterHelper(
*/
return (
events: Event[],
{ type, code, error }: { type?: TelemetryCounterType; code?: string; error?: Error } = {}
{
type,
code,
error,
}: {
type?: TelemetryCounterType;
code?: string;
error?: Error;
} = {}
) => {
const eventTypeCounts = countEventTypes(events);
Object.entries(eventTypeCounts).forEach(([eventType, count]) => {
telemetryCounter$.next({
source,
type: type ?? (error ? TelemetryCounterType.failed : TelemetryCounterType.succeeded),
type: type ?? (error ? 'failed' : 'succeeded'),
code: code ?? error?.message ?? 'OK',
count,
event_type: eventType,

View file

@ -29,7 +29,6 @@ import type {
IShipper,
TelemetryCounter,
} from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
import type { ElasticV3ShipperOptions } from '@kbn/analytics-shippers-elastic-v3-common';
import {
buildHeaders,
@ -142,7 +141,7 @@ export class ElasticV3ServerShipper implements IShipper {
const toDrop = events.length - freeSpace;
const droppedEvents = events.splice(-toDrop, toDrop);
this.reportTelemetryCounters(droppedEvents, {
type: TelemetryCounterType.dropped,
type: 'dropped',
code: 'queue_full',
});
}

View file

@ -95,8 +95,8 @@ export type {
OptInConfig,
ContextProviderOpts,
TelemetryCounter,
TelemetryCounterType,
} from '@kbn/analytics-client';
export { TelemetryCounterType } from '@kbn/analytics-client';
export { AppNavLinkStatus, AppStatus, ScopedHistory } from './application';
export type {

View file

@ -466,8 +466,8 @@ export type {
OptInConfig,
ShipperClassConstructor,
TelemetryCounter,
TelemetryCounterType,
} from '@kbn/analytics-client';
export { TelemetryCounterType } from '@kbn/analytics-client';
export type {
AnalyticsServiceSetup,
AnalyticsServicePreboot,

View file

@ -7,7 +7,6 @@
*/
import type { TelemetryCounter } from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
import { coreMock } from '@kbn/core/public/mocks';
import { usageCollectionPluginMock } from '@kbn/usage-collection-plugin/public/mocks';
import { registerEbtCounters } from './register_ebt_counters';
@ -37,7 +36,7 @@ describe('registerEbtCounters', () => {
registerEbtCounters(core.analytics, usageCollection);
expect(telemetryCounter$Spy).toHaveBeenCalledTimes(1);
internalListener({
type: TelemetryCounterType.succeeded,
type: 'succeeded',
source: 'test-shipper',
event_type: 'test-event',
code: 'test-code',

View file

@ -7,7 +7,6 @@
*/
import type { TelemetryCounter } from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
import { coreMock } from '@kbn/core/server/mocks';
import { createUsageCollectionSetupMock } from '@kbn/usage-collection-plugin/server/mocks';
import { registerEbtCounters } from './register_ebt_counters';
@ -37,7 +36,7 @@ describe('registerEbtCounters', () => {
registerEbtCounters(core.analytics, usageCollection);
expect(telemetryCounter$Spy).toHaveBeenCalledTimes(1);
internalListener({
type: TelemetryCounterType.succeeded,
type: 'succeeded',
source: 'test-shipper',
event_type: 'test-event',
code: 'test-code',
@ -57,7 +56,7 @@ describe('registerEbtCounters', () => {
registerEbtCounters(core.analytics, usageCollection);
expect(telemetryCounter$Spy).toHaveBeenCalledTimes(1);
internalListener({
type: TelemetryCounterType.succeeded,
type: 'succeeded',
source: 'test-shipper',
event_type: 'test-event',
code: 'test-code',

View file

@ -7,7 +7,6 @@
*/
import { Subject } from 'rxjs';
import { TelemetryCounterType } from '@kbn/analytics-client';
import type { Event, EventContext, IShipper, TelemetryCounter } from '@kbn/core/public';
export interface Action {
@ -26,7 +25,7 @@ export class CustomShipper implements IShipper {
this.actions$.next({ action: 'reportEvents', meta: events });
events.forEach((event) => {
this.telemetryCounter$.next({
type: TelemetryCounterType.succeeded,
type: `succeeded`,
event_type: event.event_type,
code: '200',
count: 1,

View file

@ -7,7 +7,6 @@
*/
import { Subject } from 'rxjs';
import { TelemetryCounterType } from '@kbn/analytics-client';
import type { IShipper, Event, EventContext, TelemetryCounter } from '@kbn/core/server';
export interface Action {
@ -26,7 +25,7 @@ export class CustomShipper implements IShipper {
this.actions$.next({ action: 'reportEvents', meta: events });
events.forEach((event) => {
this.telemetryCounter$.next({
type: TelemetryCounterType.succeeded,
type: `succeeded`,
event_type: event.event_type,
code: '200',
count: 1,