[EBT] Add Elastic V3 Shippers (#130696)

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2022-05-04 18:22:34 +02:00 committed by GitHub
parent 143fe7f6e7
commit 7226982759
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
69 changed files with 2498 additions and 43 deletions

View file

@ -9,5 +9,5 @@ Exposes the public APIs of the AnalyticsClient during the setup phase.
<b>Signature:</b>
```typescript
export declare type AnalyticsServiceSetup = AnalyticsClient;
export declare type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
```

View file

@ -9,5 +9,5 @@ Exposes the public APIs of the AnalyticsClient during the preboot phase
<b>Signature:</b>
```typescript
export declare type AnalyticsServicePreboot = AnalyticsClient;
export declare type AnalyticsServicePreboot = Omit<AnalyticsClient, 'shutdown'>;
```

View file

@ -9,5 +9,5 @@ Exposes the public APIs of the AnalyticsClient during the setup phase.
<b>Signature:</b>
```typescript
export declare type AnalyticsServiceSetup = AnalyticsClient;
export declare type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
```

View file

@ -134,6 +134,9 @@
"@kbn/ambient-ui-types": "link:bazel-bin/packages/kbn-ambient-ui-types",
"@kbn/analytics": "link:bazel-bin/packages/kbn-analytics",
"@kbn/analytics-client": "link:bazel-bin/packages/analytics/client",
"@kbn/analytics-shippers-elastic-v3-browser": "link:bazel-bin/packages/analytics/shippers/elastic_v3/browser",
"@kbn/analytics-shippers-elastic-v3-common": "link:bazel-bin/packages/analytics/shippers/elastic_v3/common",
"@kbn/analytics-shippers-elastic-v3-server": "link:bazel-bin/packages/analytics/shippers/elastic_v3/server",
"@kbn/analytics-shippers-fullstory": "link:bazel-bin/packages/analytics/shippers/fullstory",
"@kbn/apm-config-loader": "link:bazel-bin/packages/kbn-apm-config-loader",
"@kbn/apm-utils": "link:bazel-bin/packages/kbn-apm-utils",
@ -605,6 +608,9 @@
"@types/kbn__alerts": "link:bazel-bin/packages/kbn-alerts/npm_module_types",
"@types/kbn__analytics": "link:bazel-bin/packages/kbn-analytics/npm_module_types",
"@types/kbn__analytics-client": "link:bazel-bin/packages/analytics/client/npm_module_types",
"@types/kbn__analytics-shippers-elastic-v3-browser": "link:bazel-bin/packages/analytics/shippers/elastic_v3/browser/npm_module_types",
"@types/kbn__analytics-shippers-elastic-v3-common": "link:bazel-bin/packages/analytics/shippers/elastic_v3/common/npm_module_types",
"@types/kbn__analytics-shippers-elastic-v3-server": "link:bazel-bin/packages/analytics/shippers/elastic_v3/server/npm_module_types",
"@types/kbn__analytics-shippers-fullstory": "link:bazel-bin/packages/analytics/shippers/fullstory/npm_module_types",
"@types/kbn__apm-config-loader": "link:bazel-bin/packages/kbn-apm-config-loader/npm_module_types",
"@types/kbn__apm-utils": "link:bazel-bin/packages/kbn-apm-utils/npm_module_types",

View file

@ -10,6 +10,9 @@ filegroup(
name = "build_pkg_code",
srcs = [
"//packages/analytics/client:build",
"//packages/analytics/shippers/elastic_v3/browser:build",
"//packages/analytics/shippers/elastic_v3/common:build",
"//packages/analytics/shippers/elastic_v3/server:build",
"//packages/analytics/shippers/fullstory:build",
"//packages/elastic-apm-synthtrace:build",
"//packages/elastic-eslint-config-kibana:build",
@ -114,6 +117,9 @@ filegroup(
name = "build_pkg_types",
srcs = [
"//packages/analytics/client:build_types",
"//packages/analytics/shippers/elastic_v3/browser:build_types",
"//packages/analytics/shippers/elastic_v3/common:build_types",
"//packages/analytics/shippers/elastic_v3/server:build_types",
"//packages/analytics/shippers/fullstory:build_types",
"//packages/elastic-apm-synthtrace:build_types",
"//packages/elastic-safer-lodash-set:build_types",

View file

@ -33,6 +33,7 @@ describe('AnalyticsClient', () => {
});
afterEach(() => {
analyticsClient.shutdown();
jest.useRealTimers();
});
@ -285,13 +286,16 @@ describe('AnalyticsClient', () => {
constructor({
optInMock,
extendContextMock,
shutdownMock,
}: {
optInMock?: jest.Mock;
extendContextMock?: jest.Mock;
shutdownMock?: jest.Mock;
}) {
super();
if (optInMock) this.optIn = optInMock;
if (extendContextMock) this.extendContext = extendContextMock;
if (shutdownMock) this.shutdown = shutdownMock;
}
}
@ -358,17 +362,30 @@ describe('AnalyticsClient', () => {
test(
'Handles errors in the shipper',
fakeSchedulers((advance) => {
const optInMock = jest.fn().mockImplementation(() => {
throw new Error('Something went terribly wrong');
});
const extendContextMock = jest.fn().mockImplementation(() => {
throw new Error('Something went terribly wrong');
});
analyticsClient.registerShipper(MockedShipper, { extendContextMock });
analyticsClient.optIn({ global: { enabled: true } });
const shutdownMock = jest.fn().mockImplementation(() => {
throw new Error('Something went terribly wrong');
});
analyticsClient.registerShipper(MockedShipper, {
optInMock,
extendContextMock,
shutdownMock,
});
expect(() => analyticsClient.optIn({ global: { enabled: true } })).not.toThrow();
advance(10);
expect(optInMock).toHaveBeenCalledWith(true);
expect(extendContextMock).toHaveBeenCalledWith({}); // The initial context
expect(logger.warn).toHaveBeenCalledWith(
`Shipper "${MockedShipper.shipperName}" failed to extend the context`,
expect.any(Error)
);
expect(() => analyticsClient.shutdown()).not.toThrow();
expect(shutdownMock).toHaveBeenCalled();
})
);
});
@ -731,6 +748,7 @@ describe('AnalyticsClient', () => {
expect(optInMock1).toHaveBeenCalledWith(false); // Using global and shipper-specific
expect(optInMock2).toHaveBeenCalledWith(false); // Using only global
});
test('Catches error in the shipper.optIn method', () => {
optInMock1.mockImplementation(() => {
throw new Error('Something went terribly wrong');

View file

@ -208,6 +208,21 @@ export class AnalyticsClient implements IAnalyticsClient {
this.shipperRegistered$.next();
};
public shutdown = () => {
this.shippersRegistry.allShippers.forEach((shipper, shipperName) => {
try {
shipper.shutdown();
} catch (err) {
this.initContext.logger.warn(`Failed to shutdown shipper "${shipperName}"`, err);
}
});
this.internalEventQueue$.complete();
this.internalTelemetryCounter$.complete();
this.shipperRegistered$.complete();
this.optInConfig$.complete();
this.context$.complete();
};
/**
* Forwards the `events` to the registered shippers, bearing in mind if the shipper is opted-in for that eventType.
* @param eventType The event type's name

View file

@ -18,6 +18,7 @@ function createMockedAnalyticsClient(): jest.Mocked<IAnalyticsClient> {
removeContextProvider: jest.fn(),
registerShipper: jest.fn(),
telemetryCounter$: new Subject(),
shutdown: jest.fn(),
};
}

View file

@ -211,4 +211,8 @@ export interface IAnalyticsClient {
* Observable to emit the stats of the processed events.
*/
readonly telemetryCounter$: Observable<TelemetryCounter>;
/**
* Stops the client.
*/
shutdown: () => void;
}

View file

@ -12,6 +12,18 @@ import type { ShipperName } from '../analytics_client';
* Definition of the context that can be appended to the events through the {@link IAnalyticsClient.registerContextProvider}.
*/
export interface EventContext {
/**
* The UUID of the cluster
*/
cluster_uuid?: string;
/**
* The name of the cluster.
*/
cluster_name?: string;
/**
* The license ID.
*/
license_id?: string;
/**
* The unique user ID.
*/

View file

@ -23,6 +23,7 @@ class MockedShipper implements IShipper {
public reportEvents = jest.fn();
public extendContext = jest.fn();
public telemetryCounter$ = new Subject<TelemetryCounter>();
public shutdown = jest.fn();
}
export const shippersMock = {

View file

@ -32,4 +32,8 @@ export interface IShipper {
* Observable to emit the stats of the processed events.
*/
telemetryCounter$?: Observable<TelemetryCounter>;
/**
* Shutdown the shipper.
*/
shutdown: () => void;
}

View file

@ -3,3 +3,5 @@
This directory holds the implementation of the _built-in_ shippers provided by the Analytics client. At the moment, the shippers are:
* [FullStory](./fullstory/README.md)
* [Elastic V3 (Browser shipper)](./elastic_v3/browser/README.md)
* [Elastic V3 (Server-side shipper)](./elastic_v3/server/README.md)

View file

@ -0,0 +1,128 @@
load("@npm//@bazel/typescript:index.bzl", "ts_config")
load("@build_bazel_rules_nodejs//:index.bzl", "js_library")
load("//src/dev/bazel:index.bzl", "jsts_transpiler", "pkg_npm", "pkg_npm_types", "ts_project")
PKG_DIRNAME = "browser"
PKG_REQUIRE_NAME = "@kbn/analytics-shippers-elastic-v3-browser"
SOURCE_FILES = glob(
[
"src/**/*.ts",
],
exclude = [
"**/*.test.*",
],
)
SRCS = SOURCE_FILES
filegroup(
name = "srcs",
srcs = SRCS,
)
NPM_MODULE_EXTRA_FILES = [
"package.json",
]
# In this array place runtime dependencies, including other packages and NPM packages
# which must be available for this code to run.
#
# To reference other packages use:
# "//repo/relative/path/to/package"
# eg. "//packages/kbn-utils"
#
# To reference a NPM package use:
# "@npm//name-of-package"
# eg. "@npm//lodash"
RUNTIME_DEPS = [
"@npm//rxjs",
"//packages/analytics/client",
"//packages/analytics/shippers/elastic_v3/common",
]
# In this array place dependencies necessary to build the types, which will include the
# :npm_module_types target of other packages and packages from NPM, including @types/*
# packages.
#
# To reference the types for another package use:
# "//repo/relative/path/to/package:npm_module_types"
# eg. "//packages/kbn-utils:npm_module_types"
#
# References to NPM packages work the same as RUNTIME_DEPS
TYPES_DEPS = [
"@npm//@types/node",
"@npm//@types/jest",
"@npm//rxjs",
"//packages/analytics/client:npm_module_types",
"//packages/analytics/shippers/elastic_v3/common:npm_module_types",
"//packages/kbn-logging-mocks:npm_module_types",
]
jsts_transpiler(
name = "target_web",
srcs = SRCS,
build_pkg_name = package_name(),
web = True,
)
jsts_transpiler(
name = "target_node",
srcs = SRCS,
build_pkg_name = package_name(),
)
ts_config(
name = "tsconfig",
src = "tsconfig.json",
deps = [
"//:tsconfig.base.json",
"//:tsconfig.bazel.json",
],
)
ts_project(
name = "tsc_types",
args = ['--pretty'],
srcs = SRCS,
deps = TYPES_DEPS,
declaration = True,
emit_declaration_only = True,
out_dir = "target_types",
root_dir = "src",
tsconfig = ":tsconfig",
)
js_library(
name = PKG_DIRNAME,
srcs = NPM_MODULE_EXTRA_FILES,
deps = RUNTIME_DEPS + [":target_node", ":target_web"],
package_name = PKG_REQUIRE_NAME,
visibility = ["//visibility:public"],
)
pkg_npm(
name = "npm_module",
deps = [":" + PKG_DIRNAME],
)
filegroup(
name = "build",
srcs = [":npm_module"],
visibility = ["//visibility:public"],
)
pkg_npm_types(
name = "npm_module_types",
srcs = SRCS,
deps = [":tsc_types"],
package_name = PKG_REQUIRE_NAME,
tsconfig = ":tsconfig",
visibility = ["//visibility:public"],
)
filegroup(
name = "build_types",
srcs = [":npm_module_types"],
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,25 @@
# @kbn/analytics-shippers-elastic-v3-browser
UI-side implementation of the Elastic V3 shipper for the `@kbn/analytics-client`.
## How to use it
This module is intended to be used **on the browser only**. Due to the nature of the UI events, they are usually more scattered in time, and we can assume a much lower load than the server. For that reason, it doesn't apply the necessary backpressure mechanisms to prevent the server from getting overloaded with too many events neither identifies if the server sits behind a firewall to discard any incoming events. Refer to `@kbn/analytics-shippers-elastic-v3-server` for the server-side implementation.
```typescript
import { ElasticV3BrowserShipper } from "@kbn/analytics-shippers-elastic-v3-browser";
analytics.registerShipper(ElasticV3BrowserShipper, { channelName: 'myChannel', version: '1.0.0' });
```
## Configuration
| Name | Description |
|:-------------:|:-------------------------------------------------------------------------------------------|
| `channelName` | The name of the channel to send the events. |
| `version` | The version of the application generating the events. |
| `debug` | When `true`, it logs the responses from the remote Telemetry Service. Defaults to `false`. |
## Transmission protocol
This shipper sends the events to the Elastic Internal Telemetry Service. The incoming events are buffered for up to 1 second to attempt to send them in a single request.

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
module.exports = {
preset: '@kbn/test/jest_node',
rootDir: '../../../../../',
roots: ['<rootDir>/packages/analytics/shippers/elastic_v3/browser'],
};

View file

@ -0,0 +1,8 @@
{
"name": "@kbn/analytics-shippers-elastic-v3-browser",
"private": true,
"version": "1.0.0",
"browser": "./target_web/index.js",
"main": "./target_node/index.js",
"license": "SSPL-1.0 OR Elastic License 2.0"
}

View file

@ -0,0 +1,291 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { loggerMock } from '@kbn/logging-mocks';
import type { AnalyticsClientInitContext, Event } from '@kbn/analytics-client';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { firstValueFrom } from 'rxjs';
import { ElasticV3BrowserShipper } from './browser_shipper';
describe('ElasticV3BrowserShipper', () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'test-event-type',
context: {},
properties: {},
},
];
const initContext: AnalyticsClientInitContext = {
sendTo: 'staging',
isDev: true,
logger: loggerMock.create(),
};
let shipper: ElasticV3BrowserShipper;
let fetchMock: jest.Mock;
beforeEach(() => {
jest.useFakeTimers();
fetchMock = jest.fn().mockResolvedValue({
status: 200,
ok: true,
text: () => Promise.resolve('{"status": "ok"}'),
});
Object.defineProperty(global, 'fetch', {
value: fetchMock,
writable: true,
});
shipper = new ElasticV3BrowserShipper(
{ version: '1.2.3', channelName: 'test-channel', debug: true },
initContext
);
});
afterEach(() => {
shipper.shutdown();
jest.useRealTimers();
});
test("custom sendTo overrides Analytics client's", () => {
const prodShipper = new ElasticV3BrowserShipper(
{ version: '1.2.3', channelName: 'test-channel', debug: true, sendTo: 'production' },
initContext
);
// eslint-disable-next-line dot-notation
expect(prodShipper['url']).not.toEqual(shipper['url']);
});
test('set optIn should update the isOptedIn$ observable', () => {
// eslint-disable-next-line dot-notation
const internalOptIn$ = shipper['isOptedIn$'];
// Initially undefined
expect(internalOptIn$.value).toBeUndefined();
shipper.optIn(true);
expect(internalOptIn$.value).toBe(true);
shipper.optIn(false);
expect(internalOptIn$.value).toBe(false);
});
test('set extendContext should store local values: clusterUuid and licenseId', () => {
// eslint-disable-next-line dot-notation
const getInternalClusterUuid = () => shipper['clusterUuid'];
// eslint-disable-next-line dot-notation
const getInternalLicenseId = () => shipper['licenseId'];
// Initial values
expect(getInternalClusterUuid()).toBe('UNKNOWN');
expect(getInternalLicenseId()).toBeUndefined();
shipper.extendContext({ cluster_uuid: 'test-cluster-uuid' });
expect(getInternalClusterUuid()).toBe('test-cluster-uuid');
expect(getInternalLicenseId()).toBeUndefined();
shipper.extendContext({ license_id: 'test-license-id' });
expect(getInternalClusterUuid()).toBe('test-cluster-uuid');
expect(getInternalLicenseId()).toBe('test-license-id');
shipper.extendContext({ cluster_uuid: 'test-cluster-uuid-2', license_id: 'test-license-id-2' });
expect(getInternalClusterUuid()).toBe('test-cluster-uuid-2');
expect(getInternalLicenseId()).toBe('test-license-id-2');
});
test('calls to reportEvents do not call `fetch` straight away (buffer of 1s)', () => {
shipper.reportEvents(events);
expect(fetchMock).not.toHaveBeenCalled();
});
test(
'calls to reportEvents do not call `fetch` after 1s because no optIn value is set yet',
fakeSchedulers((advance) => {
shipper.reportEvents(events);
advance(1000);
expect(fetchMock).not.toHaveBeenCalled();
})
);
test(
'calls to reportEvents call `fetch` after 1s when optIn value is set to true',
fakeSchedulers(async (advance) => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
advance(1000);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
keepalive: true,
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "200",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_browser",
"type": "succeeded",
}
`);
})
);
test(
'calls to reportEvents do not call `fetch` after 1s when optIn value is set to false',
fakeSchedulers((advance) => {
shipper.reportEvents(events);
shipper.optIn(false);
advance(1000);
expect(fetchMock).not.toHaveBeenCalled();
})
);
test('calls to reportEvents call `fetch` when shutting down if optIn value is set to true', async () => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
shipper.shutdown();
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
keepalive: true,
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "200",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_browser",
"type": "succeeded",
}
`);
});
test(
'does not add the query.debug: true property to the request if the shipper is not set with the debug flag',
fakeSchedulers((advance) => {
shipper = new ElasticV3BrowserShipper(
{ version: '1.2.3', channelName: 'test-channel' },
initContext
);
shipper.reportEvents(events);
shipper.optIn(true);
advance(1000);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
keepalive: true,
method: 'POST',
}
);
})
);
test(
'handles when the fetch request fails',
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
advance(1000);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
keepalive: true,
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "Failed to fetch",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_browser",
"type": "failed",
}
`);
})
);
test(
'handles when the fetch request fails (request completes but not OK response)',
fakeSchedulers(async (advance) => {
fetchMock.mockResolvedValue({
ok: false,
status: 400,
text: () => Promise.resolve('{"status": "not ok"}'),
});
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
advance(1000);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
keepalive: true,
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "400",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_browser",
"type": "failed",
}
`);
})
);
});

View file

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { BehaviorSubject, interval, Subject, bufferWhen, concatMap, filter, skipWhile } from 'rxjs';
import type {
AnalyticsClientInitContext,
Event,
EventContext,
IShipper,
TelemetryCounter,
} from '@kbn/analytics-client';
import { ElasticV3ShipperOptions, ErrorWithCode } from '@kbn/analytics-shippers-elastic-v3-common';
import {
buildHeaders,
buildUrl,
createTelemetryCounterHelper,
eventsToNDJSON,
} from '@kbn/analytics-shippers-elastic-v3-common';
export class ElasticV3BrowserShipper implements IShipper {
public static shipperName = 'elastic_v3_browser';
public readonly telemetryCounter$ = new Subject<TelemetryCounter>();
private readonly reportTelemetryCounters = createTelemetryCounterHelper(
this.telemetryCounter$,
ElasticV3BrowserShipper.shipperName
);
private readonly url: string;
private readonly internalQueue$ = new Subject<Event>();
private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);
private clusterUuid: string = 'UNKNOWN';
private licenseId: string | undefined;
constructor(
private readonly options: ElasticV3ShipperOptions,
private readonly initContext: AnalyticsClientInitContext
) {
this.setUpInternalQueueSubscriber();
this.url = buildUrl({
sendTo: options.sendTo ?? initContext.sendTo,
channelName: options.channelName,
});
}
public extendContext(newContext: EventContext) {
if (newContext.cluster_uuid) {
this.clusterUuid = newContext.cluster_uuid;
}
if (newContext.license_id) {
this.licenseId = newContext.license_id;
}
}
public optIn(isOptedIn: boolean) {
this.isOptedIn$.next(isOptedIn);
}
public reportEvents(events: Event[]) {
events.forEach((event) => {
this.internalQueue$.next(event);
});
}
public shutdown() {
this.internalQueue$.complete(); // NOTE: When completing the observable, the buffer logic does not wait and releases any buffered events.
}
private setUpInternalQueueSubscriber() {
this.internalQueue$
.pipe(
// Buffer events for 1 second or until we have an optIn value
bufferWhen(() => interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))),
// Discard any events if we are not opted in
skipWhile(() => this.isOptedIn$.value === false),
// Skip empty buffers
filter((events) => events.length > 0),
// Send events
concatMap(async (events) => this.sendEvents(events))
)
.subscribe();
}
private async sendEvents(events: Event[]) {
try {
const code = await this.makeRequest(events);
this.reportTelemetryCounters(events, { code });
} catch (error) {
this.reportTelemetryCounters(events, { code: error.code, error });
}
}
private async makeRequest(events: Event[]): Promise<string> {
const { status, text, ok } = await fetch(this.url, {
method: 'POST',
body: eventsToNDJSON(events),
headers: buildHeaders(this.clusterUuid, this.options.version, this.licenseId),
...(this.options.debug && { query: { debug: true } }),
// Allow the request to outlive the page in case the tab is closed
keepalive: true,
});
if (this.options.debug) {
this.initContext.logger.debug(
`[${ElasticV3BrowserShipper.shipperName}]: ${status} - ${await text()}`
);
}
if (!ok) {
throw new ErrorWithCode(`${status} - ${await text()}`, `${status}`);
}
return `${status}`;
}
}

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export type { ElasticV3ShipperOptions } from '@kbn/analytics-shippers-elastic-v3-common';
export { ElasticV3BrowserShipper } from './browser_shipper';

View file

@ -0,0 +1,17 @@
{
"extends": "../../../../../tsconfig.bazel.json",
"compilerOptions": {
"declaration": true,
"emitDeclarationOnly": true,
"outDir": "target_types",
"rootDir": "src",
"stripInternal": false,
"types": [
"jest",
"node"
]
},
"include": [
"src/**/*"
]
}

View file

@ -0,0 +1,125 @@
load("@npm//@bazel/typescript:index.bzl", "ts_config")
load("@build_bazel_rules_nodejs//:index.bzl", "js_library")
load("//src/dev/bazel:index.bzl", "jsts_transpiler", "pkg_npm", "pkg_npm_types", "ts_project")
PKG_DIRNAME = "common"
PKG_REQUIRE_NAME = "@kbn/analytics-shippers-elastic-v3-common"
SOURCE_FILES = glob(
[
"src/**/*.ts",
],
exclude = [
"**/*.test.*",
],
)
SRCS = SOURCE_FILES
filegroup(
name = "srcs",
srcs = SRCS,
)
NPM_MODULE_EXTRA_FILES = [
"package.json",
]
# In this array place runtime dependencies, including other packages and NPM packages
# which must be available for this code to run.
#
# To reference other packages use:
# "//repo/relative/path/to/package"
# eg. "//packages/kbn-utils"
#
# To reference a NPM package use:
# "@npm//name-of-package"
# eg. "@npm//lodash"
RUNTIME_DEPS = [
"@npm//rxjs",
"//packages/analytics/client",
]
# In this array place dependencies necessary to build the types, which will include the
# :npm_module_types target of other packages and packages from NPM, including @types/*
# packages.
#
# To reference the types for another package use:
# "//repo/relative/path/to/package:npm_module_types"
# eg. "//packages/kbn-utils:npm_module_types"
#
# References to NPM packages work the same as RUNTIME_DEPS
TYPES_DEPS = [
"@npm//@types/node",
"@npm//@types/jest",
"@npm//rxjs",
"//packages/analytics/client:npm_module_types",
]
jsts_transpiler(
name = "target_node",
srcs = SRCS,
build_pkg_name = package_name(),
)
jsts_transpiler(
name = "target_web",
srcs = SRCS,
build_pkg_name = package_name(),
web = True,
)
ts_config(
name = "tsconfig",
src = "tsconfig.json",
deps = [
"//:tsconfig.base.json",
"//:tsconfig.bazel.json",
],
)
ts_project(
name = "tsc_types",
args = ['--pretty'],
srcs = SRCS,
deps = TYPES_DEPS,
declaration = True,
emit_declaration_only = True,
out_dir = "target_types",
root_dir = "src",
tsconfig = ":tsconfig",
)
js_library(
name = PKG_DIRNAME,
srcs = NPM_MODULE_EXTRA_FILES,
deps = RUNTIME_DEPS + [":target_node", ":target_web"],
package_name = PKG_REQUIRE_NAME,
visibility = ["//visibility:public"],
)
pkg_npm(
name = "npm_module",
deps = [":" + PKG_DIRNAME],
)
filegroup(
name = "build",
srcs = [":npm_module"],
visibility = ["//visibility:public"],
)
pkg_npm_types(
name = "npm_module_types",
srcs = SRCS,
deps = [":tsc_types"],
package_name = PKG_REQUIRE_NAME,
tsconfig = ":tsconfig",
visibility = ["//visibility:public"],
)
filegroup(
name = "build_types",
srcs = [":npm_module_types"],
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,10 @@
# @kbn/analytics-shippers-elastic-v3-common
This package holds the common code for the Elastic V3 shippers:
- Types defining the Shipper configuration `ElasticV3ShipperOptions`
- `buildUrl` utility helps decide which URL to use depending on whether the shipper is configured to send to production or staging.
- `eventsToNdjson` utility converts any array of events to NDJSON format.
- `reportTelemetryCounters` helps with building the TelemetryCounter to emit after processing an event.
It should be considered an internal package and should not be used other than by the shipper implementations: `@kbn/analytics-shippers-elastic-v3-browser` and `@kbn/analytics-shippers-elastic-v3-server`

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
module.exports = {
preset: '@kbn/test/jest_node',
rootDir: '../../../../../',
roots: ['<rootDir>/packages/analytics/shippers/elastic_v3/common'],
};

View file

@ -0,0 +1,8 @@
{
"name": "@kbn/analytics-shippers-elastic-v3-common",
"private": true,
"version": "1.0.0",
"browser": "./target_web/index.js",
"main": "./target_node/index.js",
"license": "SSPL-1.0 OR Elastic License 2.0"
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { buildHeaders } from './build_headers';
describe('buildHeaders', () => {
test('builds the headers as expected in the V3 endpoints', () => {
expect(buildHeaders('test-cluster', '1.2.3', 'test-license')).toMatchInlineSnapshot(`
Object {
"content-type": "application/x-njson",
"x-elastic-cluster-id": "test-cluster",
"x-elastic-license-id": "test-license",
"x-elastic-stack-version": "1.2.3",
}
`);
});
test('if license is not provided, it skips the license header', () => {
expect(buildHeaders('test-cluster', '1.2.3')).toMatchInlineSnapshot(`
Object {
"content-type": "application/x-njson",
"x-elastic-cluster-id": "test-cluster",
"x-elastic-stack-version": "1.2.3",
}
`);
});
});

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export function buildHeaders(clusterUuid: string, version: string, licenseId?: string) {
return {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': clusterUuid,
'x-elastic-stack-version': version,
...(licenseId && { 'x-elastic-license-id': licenseId }),
};
}

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { buildUrl } from './build_url';
describe('buildUrl', () => {
test('returns production URL', () => {
expect(buildUrl({ sendTo: 'production', channelName: 'test-channel' })).toBe(
'https://telemetry.elastic.co/v3/send/test-channel'
);
});
test('returns staging URL', () => {
expect(buildUrl({ sendTo: 'staging', channelName: 'test-channel' })).toBe(
'https://telemetry-staging.elastic.co/v3/send/test-channel'
);
});
});

View file

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/**
* Builds the URL for the V3 API.
* @param sendTo Whether to send it to production or staging.
* @param channelName The name of the channel to send the data to.
*/
export function buildUrl({
sendTo,
channelName,
}: {
sendTo: 'production' | 'staging';
channelName: string;
}): string {
const baseUrl =
sendTo === 'production'
? 'https://telemetry.elastic.co'
: 'https://telemetry-staging.elastic.co';
return `${baseUrl}/v3/send/${channelName}`;
}

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ErrorWithCode } from './error_with_code';
describe('ErrorWithCode', () => {
const error = new ErrorWithCode('test', 'test_code');
test('message and code properties are publicly accessible', () => {
expect(error.message).toBe('test');
expect(error.code).toBe('test_code');
});
test('extends error', () => {
expect(error).toBeInstanceOf(Error);
});
});

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export class ErrorWithCode extends Error {
constructor(message: string, public readonly code: string) {
super(message);
}
}

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Event } from '@kbn/analytics-client';
import { eventsToNDJSON } from './events_to_ndjson';
describe('eventsToNDJSON', () => {
test('works with one event', () => {
const event: Event = {
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type',
context: {},
properties: {},
};
// Mind the extra line at the bottom
expect(eventsToNDJSON([event])).toMatchInlineSnapshot(`
"{\\"timestamp\\":\\"2020-01-01T00:00:00.000Z\\",\\"event_type\\":\\"event_type\\",\\"context\\":{},\\"properties\\":{}}
"
`);
});
test('works with many events', () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type',
context: {},
properties: {},
},
{
timestamp: '2020-01-02T00:00:00.000Z',
event_type: 'event_type',
context: {},
properties: {},
},
];
expect(eventsToNDJSON(events)).toMatchInlineSnapshot(`
"{\\"timestamp\\":\\"2020-01-01T00:00:00.000Z\\",\\"event_type\\":\\"event_type\\",\\"context\\":{},\\"properties\\":{}}
{\\"timestamp\\":\\"2020-01-02T00:00:00.000Z\\",\\"event_type\\":\\"event_type\\",\\"context\\":{},\\"properties\\":{}}
"
`);
});
});

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Event } from '@kbn/analytics-client';
export function eventsToNDJSON(events: Event[]): string {
return `${events.map((event) => JSON.stringify(event)).join('\n')}\n`;
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { buildHeaders } from './build_headers';
export { buildUrl } from './build_url';
export { ErrorWithCode } from './error_with_code';
export { eventsToNDJSON } from './events_to_ndjson';
export { createTelemetryCounterHelper } from './report_telemetry_counters';
export type { ElasticV3ShipperOptions } from './types';

View file

@ -0,0 +1,203 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { firstValueFrom, Subject, take, toArray } from 'rxjs';
import type { Event, TelemetryCounter } from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
import { createTelemetryCounterHelper } from './report_telemetry_counters';
describe('reportTelemetryCounters', () => {
let reportTelemetryCounters: ReturnType<typeof createTelemetryCounterHelper>;
let telemetryCounter$: Subject<TelemetryCounter>;
beforeEach(() => {
telemetryCounter$ = new Subject<TelemetryCounter>();
reportTelemetryCounters = createTelemetryCounterHelper(telemetryCounter$, 'my_shipper');
});
test('emits a success counter for one event', async () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_a',
context: {},
properties: {},
},
];
const counters = firstValueFrom(telemetryCounter$);
reportTelemetryCounters(events);
await expect(counters).resolves.toMatchInlineSnapshot(`
Object {
"code": "OK",
"count": 1,
"event_type": "event_type_a",
"source": "my_shipper",
"type": "succeeded",
}
`);
});
test('emits a success counter for one event with custom code', async () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_a',
context: {},
properties: {},
},
];
const counters = firstValueFrom(telemetryCounter$);
reportTelemetryCounters(events, { code: 'my_code' });
await expect(counters).resolves.toMatchInlineSnapshot(`
Object {
"code": "my_code",
"count": 1,
"event_type": "event_type_a",
"source": "my_shipper",
"type": "succeeded",
}
`);
});
test('emits a counter with custom type', async () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_a',
context: {},
properties: {},
},
];
const counters = firstValueFrom(telemetryCounter$);
reportTelemetryCounters(events, {
type: TelemetryCounterType.dropped,
code: 'my_code',
});
await expect(counters).resolves.toMatchInlineSnapshot(`
Object {
"code": "my_code",
"count": 1,
"event_type": "event_type_a",
"source": "my_shipper",
"type": "dropped",
}
`);
});
test('emits a failure counter for one event with error message as a code', async () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_a',
context: {},
properties: {},
},
];
const counters = firstValueFrom(telemetryCounter$);
reportTelemetryCounters(events, {
error: new Error('Something went terribly wrong'),
});
await expect(counters).resolves.toMatchInlineSnapshot(`
Object {
"code": "Something went terribly wrong",
"count": 1,
"event_type": "event_type_a",
"source": "my_shipper",
"type": "failed",
}
`);
});
test('emits a failure counter for one event with custom code', async () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_a',
context: {},
properties: {},
},
];
const counters = firstValueFrom(telemetryCounter$);
reportTelemetryCounters(events, {
code: 'my_code',
error: new Error('Something went terribly wrong'),
});
await expect(counters).resolves.toMatchInlineSnapshot(`
Object {
"code": "my_code",
"count": 1,
"event_type": "event_type_a",
"source": "my_shipper",
"type": "failed",
}
`);
});
test('emits a success counter for multiple events of different types', async () => {
const events: Event[] = [
// 2 types a
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_a',
context: {},
properties: {},
},
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_a',
context: {},
properties: {},
},
// 1 type b
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'event_type_b',
context: {},
properties: {},
},
];
const counters = firstValueFrom(telemetryCounter$.pipe(take(2), toArray()));
reportTelemetryCounters(events);
await expect(counters).resolves.toMatchInlineSnapshot(`
Array [
Object {
"code": "OK",
"count": 2,
"event_type": "event_type_a",
"source": "my_shipper",
"type": "succeeded",
},
Object {
"code": "OK",
"count": 1,
"event_type": "event_type_b",
"source": "my_shipper",
"type": "succeeded",
},
]
`);
});
});

View file

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Subject } from 'rxjs';
import type { Event, TelemetryCounter } from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
/**
* Creates a telemetry counter helper to make it easier to generate them
* @param telemetryCounter$ The observable that will be used to emit the telemetry counters
* @param source The name of the shipper that is sending the events.
*/
export function createTelemetryCounterHelper(
telemetryCounter$: Subject<TelemetryCounter>,
source: string
) {
/**
* Triggers a telemetry counter for each event type.
* @param events The events to trigger the telemetry counter for.
* @param type The type of telemetry counter to trigger.
* @param code The success or error code for additional detail about the result.
* @param error The error that occurred, if any.
*/
return (
events: Event[],
{ type, code, error }: { type?: TelemetryCounterType; code?: string; error?: Error } = {}
) => {
const eventTypeCounts = countEventTypes(events);
Object.entries(eventTypeCounts).forEach(([eventType, count]) => {
telemetryCounter$.next({
source,
type: type ?? (error ? TelemetryCounterType.failed : TelemetryCounterType.succeeded),
code: code ?? error?.message ?? 'OK',
count,
event_type: eventType,
});
});
};
}
function countEventTypes(events: Event[]) {
return events.reduce((acc, event) => {
if (acc[event.event_type]) {
acc[event.event_type] += 1;
} else {
acc[event.event_type] = 1;
}
return acc;
}, {} as Record<string, number>);
}

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/**
* Options for the Elastic V3 shipper
*/
export interface ElasticV3ShipperOptions {
/**
* The name of the channel to stream all the events to.
*/
channelName: string;
/**
* The product's version.
*/
version: string;
/**
* Provide it to override the Analytics client's default configuration.
*/
sendTo?: 'staging' | 'production';
/**
* Should show debug information about the requests it makes to the V3 API.
*/
debug?: boolean;
}

View file

@ -0,0 +1,17 @@
{
"extends": "../../../../../tsconfig.bazel.json",
"compilerOptions": {
"declaration": true,
"emitDeclarationOnly": true,
"outDir": "target_types",
"rootDir": "src",
"stripInternal": false,
"types": [
"jest",
"node"
]
},
"include": [
"src/**/*"
]
}

View file

@ -0,0 +1,123 @@
load("@npm//@bazel/typescript:index.bzl", "ts_config")
load("@build_bazel_rules_nodejs//:index.bzl", "js_library")
load("//src/dev/bazel:index.bzl", "jsts_transpiler", "pkg_npm", "pkg_npm_types", "ts_project")
PKG_DIRNAME = "server"
PKG_REQUIRE_NAME = "@kbn/analytics-shippers-elastic-v3-server"
SOURCE_FILES = glob(
[
"src/**/*.ts",
],
exclude = [
"**/*.test.*",
],
)
SRCS = SOURCE_FILES
filegroup(
name = "srcs",
srcs = SRCS,
)
NPM_MODULE_EXTRA_FILES = [
"package.json",
]
# In this array place runtime dependencies, including other packages and NPM packages
# which must be available for this code to run.
#
# To reference other packages use:
# "//repo/relative/path/to/package"
# eg. "//packages/kbn-utils"
#
# To reference a NPM package use:
# "@npm//name-of-package"
# eg. "@npm//lodash"
RUNTIME_DEPS = [
"@npm//node-fetch",
"@npm//rxjs",
"//packages/analytics/client",
"//packages/analytics/shippers/elastic_v3/common",
]
# In this array place dependencies necessary to build the types, which will include the
# :npm_module_types target of other packages and packages from NPM, including @types/*
# packages.
#
# To reference the types for another package use:
# "//repo/relative/path/to/package:npm_module_types"
# eg. "//packages/kbn-utils:npm_module_types"
#
# References to NPM packages work the same as RUNTIME_DEPS
TYPES_DEPS = [
"@npm//@types/node",
"@npm//@types/node-fetch",
"@npm//@types/jest",
"@npm//rxjs",
"//packages/analytics/client:npm_module_types",
"//packages/analytics/shippers/elastic_v3/common:npm_module_types",
"//packages/kbn-logging-mocks:npm_module_types",
]
jsts_transpiler(
name = "target_node",
srcs = SRCS,
build_pkg_name = package_name(),
)
ts_config(
name = "tsconfig",
src = "tsconfig.json",
deps = [
"//:tsconfig.base.json",
"//:tsconfig.bazel.json",
],
)
ts_project(
name = "tsc_types",
args = ['--pretty'],
srcs = SRCS,
deps = TYPES_DEPS,
declaration = True,
emit_declaration_only = True,
out_dir = "target_types",
root_dir = "src",
tsconfig = ":tsconfig",
)
js_library(
name = PKG_DIRNAME,
srcs = NPM_MODULE_EXTRA_FILES,
deps = RUNTIME_DEPS + [":target_node"],
package_name = PKG_REQUIRE_NAME,
visibility = ["//visibility:public"],
)
pkg_npm(
name = "npm_module",
deps = [":" + PKG_DIRNAME],
)
filegroup(
name = "build",
srcs = [":npm_module"],
visibility = ["//visibility:public"],
)
pkg_npm_types(
name = "npm_module_types",
srcs = SRCS,
deps = [":tsc_types"],
package_name = PKG_REQUIRE_NAME,
tsconfig = ":tsconfig",
visibility = ["//visibility:public"],
)
filegroup(
name = "build_types",
srcs = [":npm_module_types"],
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,25 @@
# @kbn/analytics-shippers-elastic-v3-server
Server-side implementation of the Elastic V3 shipper for the `@kbn/analytics-client`.
## How to use it
This module is intended to be used **on the server-side only**. It is specially designed to apply the necessary backpressure mechanisms to prevent the server from getting overloaded with too many events and identify if the server sits behind a firewall to discard any incoming events. Refer to `@kbn/analytics-shippers-elastic-v3-browser` for the browser-side implementation.
```typescript
import { ElasticV3ServerShipper } from "@kbn/analytics-shippers-elastic-v3-server";
analytics.registerShipper(ElasticV3ServerShipper, { channelName: 'myChannel', version: '1.0.0' });
```
## Configuration
| Name | Description |
|:-------------:|:-------------------------------------------------------------------------------------------|
| `channelName` | The name of the channel to send the events. |
| `version` | The version of the application generating the events. |
| `debug` | When `true`, it logs the responses from the remote Telemetry Service. Defaults to `false`. |
## Transmission protocol
This shipper sends the events to the Elastic Internal Telemetry Service. It holds up to 1000 events in a shared queue. Any additional incoming events once it's full will be dropped. It sends the events from the queue in batches of 10kB every 10 seconds. If not enough events are available in the queue for longer than 10 minutes, it will send any remaining events. When shutting down, it'll send all the remaining events in the queue.

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
module.exports = {
preset: '@kbn/test/jest_node',
rootDir: '../../../../../',
roots: ['<rootDir>/packages/analytics/shippers/elastic_v3/server'],
};

View file

@ -0,0 +1,7 @@
{
"name": "@kbn/analytics-shippers-elastic-v3-server",
"private": true,
"version": "1.0.0",
"main": "./target_node/index.js",
"license": "SSPL-1.0 OR Elastic License 2.0"
}

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export type { ElasticV3ShipperOptions } from '@kbn/analytics-shippers-elastic-v3-common';
export { ElasticV3ServerShipper } from './server_shipper';

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const fetchMock = jest.fn().mockResolvedValue({
status: 200,
ok: true,
text: () => Promise.resolve('{"status": "ok"}'),
});
jest.doMock('node-fetch', () => fetchMock);

View file

@ -0,0 +1,438 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { loggerMock } from '@kbn/logging-mocks';
import { firstValueFrom } from 'rxjs';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import type { AnalyticsClientInitContext, Event } from '@kbn/analytics-client';
import { fetchMock } from './server_shipper.test.mocks';
import { ElasticV3ServerShipper } from './server_shipper';
const SECONDS = 1000;
const MINUTES = 60 * SECONDS;
describe('ElasticV3ServerShipper', () => {
const events: Event[] = [
{
timestamp: '2020-01-01T00:00:00.000Z',
event_type: 'test-event-type',
context: {},
properties: {},
},
];
const nextTick = () => new Promise((resolve) => setImmediate(resolve));
const initContext: AnalyticsClientInitContext = {
sendTo: 'staging',
isDev: true,
logger: loggerMock.create(),
};
let shipper: ElasticV3ServerShipper;
// eslint-disable-next-line dot-notation
const setLastBatchSent = (ms: number) => (shipper['lastBatchSent'] = ms);
beforeEach(() => {
jest.useFakeTimers();
shipper = new ElasticV3ServerShipper(
{ version: '1.2.3', channelName: 'test-channel', debug: true },
initContext
);
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = null;
});
afterEach(() => {
shipper.shutdown();
jest.clearAllMocks();
});
test('set optIn should update the isOptedIn$ observable', () => {
// eslint-disable-next-line dot-notation
const getInternalOptIn = () => shipper['isOptedIn'];
// Initially undefined
expect(getInternalOptIn()).toBeUndefined();
shipper.optIn(true);
expect(getInternalOptIn()).toBe(true);
shipper.optIn(false);
expect(getInternalOptIn()).toBe(false);
});
test('clears the queue after optIn: false', () => {
shipper.reportEvents(events);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(1);
shipper.optIn(false);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
});
test('set extendContext should store local values: clusterUuid and licenseId', () => {
// eslint-disable-next-line dot-notation
const getInternalClusterUuid = () => shipper['clusterUuid'];
// eslint-disable-next-line dot-notation
const getInternalLicenseId = () => shipper['licenseId'];
// Initial values
expect(getInternalClusterUuid()).toBe('UNKNOWN');
expect(getInternalLicenseId()).toBeUndefined();
shipper.extendContext({ cluster_uuid: 'test-cluster-uuid' });
expect(getInternalClusterUuid()).toBe('test-cluster-uuid');
expect(getInternalLicenseId()).toBeUndefined();
shipper.extendContext({ license_id: 'test-license-id' });
expect(getInternalClusterUuid()).toBe('test-cluster-uuid');
expect(getInternalLicenseId()).toBe('test-license-id');
shipper.extendContext({ cluster_uuid: 'test-cluster-uuid-2', license_id: 'test-license-id-2' });
expect(getInternalClusterUuid()).toBe('test-cluster-uuid-2');
expect(getInternalLicenseId()).toBe('test-license-id-2');
});
test('calls to reportEvents do not call `fetch` straight away', () => {
shipper.reportEvents(events);
expect(fetchMock).not.toHaveBeenCalled();
});
test(
'calls to reportEvents do not call `fetch` after 10 minutes because no optIn value is set yet',
fakeSchedulers((advance) => {
shipper.reportEvents(events);
advance(10 * MINUTES);
expect(fetchMock).not.toHaveBeenCalled();
})
);
test(
'calls to reportEvents call `fetch` after 10 minutes when optIn value is set to true',
fakeSchedulers(async (advance) => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "200",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "succeeded",
}
`);
})
);
test(
'calls to reportEvents do not call `fetch` after 10 minutes when optIn value is set to false',
fakeSchedulers((advance) => {
shipper.reportEvents(events);
shipper.optIn(false);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).not.toHaveBeenCalled();
})
);
test('calls to reportEvents call `fetch` when shutting down if optIn value is set to true', async () => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
shipper.shutdown();
await nextTick(); // We are handling the shutdown in a promise, so we need to wait for the next tick.
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "200",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "succeeded",
}
`);
});
test(
'does not add the query.debug: true property to the request if the shipper is not set with the debug flag',
fakeSchedulers((advance) => {
shipper = new ElasticV3ServerShipper(
{ version: '1.2.3', channelName: 'test-channel' },
initContext
);
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = null;
shipper.reportEvents(events);
shipper.optIn(true);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
}
);
})
);
test(
'sends when the queue overflows the 10kB leaky bucket one batch every 10s',
fakeSchedulers(async (advance) => {
expect.assertions(2 * 9 + 2);
shipper.reportEvents(new Array(1000).fill(events[0]));
shipper.optIn(true);
// Due to the size of the test events, it matches 9 rounds.
for (let i = 0; i < 9; i++) {
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(10 * SECONDS);
expect(fetchMock).toHaveBeenNthCalledWith(
i + 1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: new Array(103)
.fill(
'{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n'
)
.join(''),
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "200",
"count": 103,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "succeeded",
}
`);
await nextTick();
}
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(1000 - 9 * 103); // 73
// If we call it again, it should not enqueue all the events (only the ones to fill the queue):
shipper.reportEvents(new Array(1000).fill(events[0]));
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(1000);
})
);
test(
'handles when the fetch request fails',
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "Failed to fetch",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "failed",
}
`);
})
);
test(
'handles when the fetch request fails (request completes but not OK response)',
fakeSchedulers(async (advance) => {
fetchMock.mockResolvedValueOnce({
ok: false,
status: 400,
text: () => Promise.resolve('{"status": "not ok"}'),
});
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "400",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "failed",
}
`);
})
);
test(
'connectivity check is run after report failure',
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-njson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "Failed to fetch",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "failed",
}
`);
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
advance(1 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
2,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
fetchMock.mockResolvedValueOnce({ ok: false });
advance(2 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
3,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// let's see the effect of after 24 hours:
shipper.reportEvents(events);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(1);
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = 100;
fetchMock.mockResolvedValueOnce({ ok: false });
advance(4 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
4,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
// New events are not added to the queue because it's been offline for 24 hours.
shipper.reportEvents(events);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
// Regains connection
fetchMock.mockResolvedValueOnce({ ok: true });
advance(8 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
5,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// eslint-disable-next-line dot-notation
expect(shipper['firstTimeOffline']).toBe(null);
advance(16 * MINUTES);
await nextTick();
expect(fetchMock).not.toHaveBeenNthCalledWith(
6,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
})
);
});

View file

@ -0,0 +1,290 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import fetch from 'node-fetch';
import {
filter,
Subject,
interval,
concatMap,
merge,
from,
firstValueFrom,
timer,
retryWhen,
tap,
delayWhen,
takeWhile,
} from 'rxjs';
import type {
AnalyticsClientInitContext,
Event,
EventContext,
IShipper,
TelemetryCounter,
} from '@kbn/analytics-client';
import { TelemetryCounterType } from '@kbn/analytics-client';
import type { ElasticV3ShipperOptions } from '@kbn/analytics-shippers-elastic-v3-common';
import {
buildHeaders,
buildUrl,
createTelemetryCounterHelper,
eventsToNDJSON,
ErrorWithCode,
} from '@kbn/analytics-shippers-elastic-v3-common';
const SECOND = 1000;
const MINUTE = 60 * SECOND;
const HOUR = 60 * MINUTE;
const KIB = 1024;
const MAX_NUMBER_OF_EVENTS_IN_INTERNAL_QUEUE = 1000;
export class ElasticV3ServerShipper implements IShipper {
public static shipperName = 'elastic_v3_server';
public readonly telemetryCounter$ = new Subject<TelemetryCounter>();
private readonly reportTelemetryCounters = createTelemetryCounterHelper(
this.telemetryCounter$,
ElasticV3ServerShipper.shipperName
);
private readonly internalQueue: Event[] = [];
private readonly shutdown$ = new Subject<void>();
private readonly url: string;
private lastBatchSent = Date.now();
private clusterUuid: string = 'UNKNOWN';
private licenseId?: string;
private isOptedIn?: boolean;
/**
* Specifies when it went offline:
* - `undefined` means it doesn't know yet whether it is online or offline
* - `null` means it's online
* - `number` means it's offline since that time
* @private
*/
private firstTimeOffline?: number | null;
constructor(
private readonly options: ElasticV3ShipperOptions,
private readonly initContext: AnalyticsClientInitContext
) {
this.url = buildUrl({
sendTo: options.sendTo ?? initContext.sendTo,
channelName: options.channelName,
});
this.setInternalSubscriber();
this.checkConnectivity();
}
public extendContext(newContext: EventContext) {
if (newContext.cluster_uuid) {
this.clusterUuid = newContext.cluster_uuid;
}
if (newContext.license_id) {
this.licenseId = newContext.license_id;
}
}
public optIn(isOptedIn: boolean) {
this.isOptedIn = isOptedIn;
if (isOptedIn === false) {
this.internalQueue.length = 0;
}
}
public reportEvents(events: Event[]) {
if (
this.isOptedIn === false ||
(this.firstTimeOffline && Date.now() - this.firstTimeOffline > 24 * HOUR)
) {
return;
}
const freeSpace = MAX_NUMBER_OF_EVENTS_IN_INTERNAL_QUEUE - this.internalQueue.length;
// As per design, we only want store up-to 1000 events at a time. Drop anything that goes beyond that limit
if (freeSpace < events.length) {
const toDrop = events.length - freeSpace;
const droppedEvents = events.splice(-toDrop, toDrop);
this.reportTelemetryCounters(droppedEvents, {
type: TelemetryCounterType.dropped,
code: 'queue_full',
});
}
this.internalQueue.push(...events);
}
public shutdown() {
this.shutdown$.complete();
}
/**
* Checks the server has connectivity to the remote endpoint.
* The frequency of the connectivity tests will back off, starting with 1 minute, and multiplying by 2
* until it reaches 1 hour. Then, itll keep the 1h frequency until it reaches 24h without connectivity.
* At that point, it clears the queue and stops accepting events in the queue.
* The connectivity checks will continue to happen every 1 hour just in case it regains it at any point.
* @private
*/
private checkConnectivity() {
let backoff = 1 * MINUTE;
timer(0, 1 * MINUTE)
.pipe(
takeWhile(() => this.shutdown$.isStopped === false),
filter(() => this.isOptedIn === true && this.firstTimeOffline !== null),
concatMap(async () => {
const { ok } = await fetch(this.url, {
method: 'OPTIONS',
});
if (!ok) {
throw new Error(`Failed to connect to ${this.url}`);
}
this.firstTimeOffline = null;
backoff = 1 * MINUTE;
}),
retryWhen((errors) =>
errors.pipe(
takeWhile(() => this.shutdown$.isStopped === false),
tap(() => {
if (!this.firstTimeOffline) {
this.firstTimeOffline = Date.now();
} else if (Date.now() - this.firstTimeOffline > 24 * HOUR) {
this.internalQueue.length = 0;
}
backoff = backoff * 2;
if (backoff > 1 * HOUR) {
backoff = 1 * HOUR;
}
}),
delayWhen(() => timer(backoff))
)
)
)
.subscribe();
}
private setInternalSubscriber() {
// Check the status of the queues every 1 second.
merge(
interval(1000),
// Using a promise because complete does not emit through the pipe.
from(firstValueFrom(this.shutdown$, { defaultValue: true }))
)
.pipe(
// Only move ahead if it's opted-in and online.
filter(() => this.isOptedIn === true && this.firstTimeOffline === null),
// Send the events now if (validations sorted from cheapest to most CPU expensive):
// - We are shutting down.
// - There are some events in the queue, and we didn't send anything in the last 10 minutes.
// - The last time we sent was more than 10 seconds ago and:
// - We reached the minimum batch size of 10kB per request in our leaky bucket.
// - The queue is full (meaning we'll never get to 10kB because the events are very small).
filter(
() =>
(this.internalQueue.length > 0 &&
(this.shutdown$.isStopped || Date.now() - this.lastBatchSent >= 10 * MINUTE)) ||
(Date.now() - this.lastBatchSent >= 10 * SECOND &&
(this.internalQueue.length === 1000 ||
this.getQueueByteSize(this.internalQueue) >= 10 * KIB))
),
// Send the events
concatMap(async () => {
this.lastBatchSent = Date.now();
const eventsToSend = this.getEventsToSend();
await this.sendEvents(eventsToSend);
}),
// Stop the subscriber if we are shutting down.
takeWhile(() => !this.shutdown$.isStopped)
)
.subscribe();
}
/**
* Calculates the size of the queue in bytes.
* @returns The number of bytes held in the queue.
* @private
*/
private getQueueByteSize(queue: Event[]) {
return queue.reduce((acc, event) => {
return acc + this.getEventSize(event);
}, 0);
}
/**
* Calculates the size of the event in bytes.
* @param event The event to calculate the size of.
* @returns The number of bytes held in the event.
* @private
*/
private getEventSize(event: Event) {
return Buffer.from(JSON.stringify(event)).length;
}
/**
* Returns a queue of events of up-to 10kB.
* @remarks It mutates the internal queue by removing from it the events returned by this method.
* @private
*/
private getEventsToSend(): Event[] {
// If the internal queue is already smaller than the minimum batch size, do a direct assignment.
if (this.getQueueByteSize(this.internalQueue) < 10 * KIB) {
return this.internalQueue.splice(0, this.internalQueue.length);
}
// Otherwise, we'll feed the events to the leaky bucket queue until we reach 10kB.
const queue: Event[] = [];
let queueByteSize = 0;
while (queueByteSize < 10 * KIB) {
const event = this.internalQueue.shift()!;
queueByteSize += this.getEventSize(event);
queue.push(event);
}
return queue;
}
private async sendEvents(events: Event[]) {
try {
const code = await this.makeRequest(events);
this.reportTelemetryCounters(events, { code });
} catch (error) {
this.reportTelemetryCounters(events, { code: error.code, error });
this.firstTimeOffline = undefined;
}
}
private async makeRequest(events: Event[]): Promise<string> {
const { status, text, ok } = await fetch(this.url, {
method: 'POST',
body: eventsToNDJSON(events),
headers: buildHeaders(this.clusterUuid, this.options.version, this.licenseId),
...(this.options.debug && { query: { debug: true } }),
});
if (this.options.debug) {
this.initContext.logger.debug(
`[${ElasticV3ServerShipper.shipperName}]: ${status} - ${await text()}`
);
}
if (!ok) {
throw new ErrorWithCode(`${status} - ${await text()}`, `${status}`);
}
return `${status}`;
}
}

View file

@ -0,0 +1,17 @@
{
"extends": "../../../../../tsconfig.bazel.json",
"compilerOptions": {
"declaration": true,
"emitDeclarationOnly": true,
"outDir": "target_types",
"rootDir": "src",
"stripInternal": false,
"types": [
"jest",
"node"
]
},
"include": [
"src/**/*"
]
}

View file

@ -4,7 +4,7 @@ FullStory implementation as a shipper for the `@kbn/analytics-client`.
## How to use it
This module is intended to be used on the UI only. It does not support server-side events.
This module is intended to be used **on the browser only**. It does not support server-side events.
```typescript
import { FullStoryShipper } from "@kbn/analytics-shippers-fullstory";
@ -25,3 +25,7 @@ analytics.registerShipper(FullStoryShipper, { fullStoryOrgId: '12345' })
## FullStory Custom Events Rate Limits
FullStory limits the number of custom events that can be sent per second ([docs](https://help.fullstory.com/hc/en-us/articles/360020623234#custom-property-rate-limiting)). In order to comply with that limit, this shipper will only emit the event types registered in the allow-list defined in the constant [CUSTOM_EVENT_TYPES_ALLOWLIST](./src/fullstory_shipper.ts). We may change this behaviour in the future to a remotely-controlled list of events or rely on the opt-in _cherry-pick_ config mechanism of the Analytics Client.
## Transmission protocol
This shipper relies on FullStory official snippet. The internals about how it transfers the data are not documented.

View file

@ -128,4 +128,8 @@ export class FullStoryShipper implements IShipper {
this.fullStoryApi.event(event.event_type, formatPayload(event.properties));
});
}
public shutdown() {
// No need to do anything here for now.
}
}

View file

@ -24,6 +24,7 @@ export const BAZEL_PACKAGE_DIRS = [
'packages/shared-ux/*',
'packages/analytics',
'packages/analytics/shippers',
'packages/analytics/shippers/elastic_v3',
];
/**

View file

@ -94,7 +94,7 @@ export function getWebpackConfig(bundle: Bundle, bundleRefs: BundleRefs, worker:
module: {
// no parse rules for a few known large packages which have no require() statements
// or which have require() statements that should be ignored because the file is
// already bundled with all its necessary depedencies
// already bundled with all its necessary dependencies
noParse: [
/[\/\\]node_modules[\/\\]lodash[\/\\]index\.js$/,
/[\/\\]node_modules[\/\\]vega[\/\\]build[\/\\]vega\.js$/,

View file

@ -40,6 +40,7 @@ const createAnalyticsServiceMock = (): jest.Mocked<AnalyticsServiceContract> =>
return {
setup: jest.fn().mockImplementation(createAnalyticsServiceSetup),
start: jest.fn().mockImplementation(createAnalyticsServiceStart),
stop: jest.fn(),
};
};

View file

@ -16,7 +16,7 @@ import { createLogger } from './logger';
* {@link AnalyticsClient}
* @public
*/
export type AnalyticsServiceSetup = AnalyticsClient;
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
/**
* Exposes the public APIs of the AnalyticsClient during the start phase
* {@link AnalyticsClient}
@ -58,4 +58,7 @@ export class AnalyticsService {
telemetryCounter$: this.analyticsClient.telemetryCounter$,
};
}
public stop() {
this.analyticsClient.shutdown();
}
}

View file

@ -300,6 +300,7 @@ export class CoreSystem {
this.application.stop();
this.deprecations.stop();
this.theme.stop();
this.analytics.stop();
this.rootDomElement.textContent = '';
}
}

View file

@ -69,7 +69,7 @@ export { AnalyticsClient }
// Warning: (ae-unresolved-link) The @link reference could not be resolved: This type of declaration is not supported yet by the resolver
//
// @public
export type AnalyticsServiceSetup = AnalyticsClient;
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
// Warning: (ae-unresolved-link) The @link reference could not be resolved: This type of declaration is not supported yet by the resolver
//

View file

@ -54,6 +54,7 @@ const createAnalyticsServiceMock = (): jest.Mocked<AnalyticsServiceContract> =>
preboot: jest.fn().mockImplementation(createAnalyticsServicePreboot),
setup: jest.fn().mockImplementation(createAnalyticsServiceSetup),
start: jest.fn().mockImplementation(createAnalyticsServiceStart),
stop: jest.fn(),
};
};

View file

@ -15,13 +15,13 @@ import type { CoreContext } from '../core_context';
* {@link AnalyticsClient}
* @public
*/
export type AnalyticsServicePreboot = AnalyticsClient;
export type AnalyticsServicePreboot = Omit<AnalyticsClient, 'shutdown'>;
/**
* Exposes the public APIs of the AnalyticsClient during the setup phase.
* {@link AnalyticsClient}
* @public
*/
export type AnalyticsServiceSetup = AnalyticsClient;
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
/**
* Exposes the public APIs of the AnalyticsClient during the start phase
* {@link AnalyticsClient}
@ -74,4 +74,7 @@ export class AnalyticsService {
telemetryCounter$: this.analyticsClient.telemetryCounter$,
};
}
public stop() {
this.analyticsClient.shutdown();
}
}

View file

@ -77,12 +77,12 @@ export { AnalyticsClient }
// Warning: (ae-unresolved-link) The @link reference could not be resolved: This type of declaration is not supported yet by the resolver
//
// @public
export type AnalyticsServicePreboot = AnalyticsClient;
export type AnalyticsServicePreboot = Omit<AnalyticsClient, 'shutdown'>;
// Warning: (ae-unresolved-link) The @link reference could not be resolved: This type of declaration is not supported yet by the resolver
//
// @public
export type AnalyticsServiceSetup = AnalyticsClient;
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
// Warning: (ae-unresolved-link) The @link reference could not be resolved: This type of declaration is not supported yet by the resolver
//

View file

@ -358,6 +358,7 @@ export class Server {
public async stop() {
this.log.debug('stopping server');
this.analytics.stop();
await this.http.stop(); // HTTP server has to stop before savedObjects and ES clients are closed to be able to gracefully attempt to resolve any pending requests
await this.plugins.stop();
await this.savedObjects.stop();

View file

@ -12,6 +12,12 @@
*/
export const REPORT_INTERVAL_MS = 86400000;
/**
* How often we poll for the opt-in status.
* Currently, 10 seconds.
*/
export const OPT_IN_POLL_INTERVAL_MS = 10000;
/**
* Key for the localStorage service
*/

View file

@ -6,17 +6,18 @@
* Side Public License, v 1.
*/
import { TelemetryPlugin } from './plugin';
import { ElasticV3BrowserShipper } from '@kbn/analytics-shippers-elastic-v3-browser';
import { coreMock } from '@kbn/core/public/mocks';
import { homePluginMock } from '@kbn/home-plugin/public/mocks';
import { screenshotModePluginMock } from '@kbn/screenshot-mode-plugin/public/mocks';
import { HomePublicPluginSetup } from '@kbn/home-plugin/public';
import { ScreenshotModePluginSetup } from '@kbn/screenshot-mode-plugin/public';
let screenshotMode: ScreenshotModePluginSetup;
let home: HomePublicPluginSetup;
import { TelemetryPlugin } from './plugin';
describe('TelemetryPlugin', () => {
let screenshotMode: ScreenshotModePluginSetup;
let home: HomePublicPluginSetup;
beforeEach(() => {
screenshotMode = screenshotModePluginMock.createSetupContract();
home = homePluginMock.createSetupContract();
@ -56,5 +57,18 @@ describe('TelemetryPlugin', () => {
});
});
});
describe('EBT shipper registration', () => {
it('registers the UI telemetry shipper', () => {
const initializerContext = coreMock.createPluginInitializerContext();
const coreSetupMock = coreMock.createSetup();
new TelemetryPlugin(initializerContext).setup(coreSetupMock, { screenshotMode, home });
expect(coreSetupMock.analytics.registerShipper).toHaveBeenCalledWith(
ElasticV3BrowserShipper,
{ channelName: 'kibana-browser', version: 'version' }
);
});
});
});
});

View file

@ -6,6 +6,8 @@
* Side Public License, v 1.
*/
import { ElasticV3BrowserShipper } from '@kbn/analytics-shippers-elastic-v3-browser';
import type {
Plugin,
CoreStart,
@ -20,7 +22,7 @@ import type {
import type { ScreenshotModePluginSetup } from '@kbn/screenshot-mode-plugin/public';
import { HomePublicPluginSetup } from '@kbn/home-plugin/public';
import type { HomePublicPluginSetup } from '@kbn/home-plugin/public';
import { TelemetrySender, TelemetryService, TelemetryNotifications } from './services';
import type {
TelemetrySavedObjectAttributes,
@ -153,6 +155,11 @@ export class TelemetryPlugin implements Plugin<TelemetryPluginSetup, TelemetryPl
telemetryConstants = getTelemetryConstants(docLinks);
});
analytics.registerShipper(ElasticV3BrowserShipper, {
channelName: 'kibana-browser',
version: currentKibanaVersion,
});
this.telemetrySender = new TelemetrySender(this.telemetryService, async () => {
await this.refreshConfig();
analytics.optIn({ global: { enabled: this.telemetryService!.isOptedIn } });

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ElasticV3ServerShipper } from '@kbn/analytics-shippers-elastic-v3-server';
import { coreMock } from '@kbn/core/server/mocks';
import { usageCollectionPluginMock } from '@kbn/usage-collection-plugin/server/mocks';
import { telemetryCollectionManagerPluginMock } from '@kbn/telemetry-collection-manager-plugin/server/mocks';
import { TelemetryPlugin } from './plugin';
describe('TelemetryPlugin', () => {
describe('setup', () => {
describe('EBT shipper registration', () => {
it('registers the Server telemetry shipper', () => {
const initializerContext = coreMock.createPluginInitializerContext();
const coreSetupMock = coreMock.createSetup();
new TelemetryPlugin(initializerContext).setup(coreSetupMock, {
usageCollection: usageCollectionPluginMock.createSetupContract(),
telemetryCollectionManager: telemetryCollectionManagerPluginMock.createSetupContract(),
});
expect(coreSetupMock.analytics.registerShipper).toHaveBeenCalledWith(
ElasticV3ServerShipper,
{ channelName: 'kibana-server', version: 'version' }
);
});
});
});
});

View file

@ -8,7 +8,18 @@
import { URL } from 'url';
import type { Observable } from 'rxjs';
import { firstValueFrom, ReplaySubject } from 'rxjs';
import {
BehaviorSubject,
firstValueFrom,
ReplaySubject,
exhaustMap,
timer,
distinctUntilChanged,
filter,
} from 'rxjs';
import { ElasticV3ServerShipper } from '@kbn/analytics-shippers-elastic-v3-server';
import type { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server';
import type {
TelemetryCollectionManagerPluginSetup,
@ -33,6 +44,7 @@ import {
import type { TelemetryConfigType } from './config';
import { FetcherTask } from './fetcher';
import { getTelemetrySavedObject, TelemetrySavedObject } from './telemetry_repository';
import { OPT_IN_POLL_INTERVAL_MS } from '../common/constants';
import { getTelemetryOptIn, getTelemetryChannelEndpoint } from '../common/telemetry_config';
interface TelemetryPluginsDepsSetup {
@ -74,6 +86,7 @@ export class TelemetryPlugin implements Plugin<TelemetryPluginSetup, TelemetryPl
private readonly logger: Logger;
private readonly currentKibanaVersion: string;
private readonly config$: Observable<TelemetryConfigType>;
private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);
private readonly isDev: boolean;
private readonly fetcherTask: FetcherTask;
/**
@ -91,6 +104,17 @@ export class TelemetryPlugin implements Plugin<TelemetryPluginSetup, TelemetryPl
*/
private savedObjectsInternalClient$ = new ReplaySubject<SavedObjectsClient>(1);
/**
* Poll for the opt-in status and update the `isOptedIn$` subject.
* @private
*/
private readonly optInPollerSubscription = timer(0, OPT_IN_POLL_INTERVAL_MS)
.pipe(
exhaustMap(() => this.getOptInStatus()),
distinctUntilChanged()
)
.subscribe((isOptedIn) => this.isOptedIn$.next(isOptedIn));
private security?: SecurityPluginStart;
constructor(initializerContext: PluginInitializerContext<TelemetryConfigType>) {
@ -102,13 +126,29 @@ export class TelemetryPlugin implements Plugin<TelemetryPluginSetup, TelemetryPl
...initializerContext,
logger: this.logger,
});
// If the opt-in selection cannot be changed, set it as early as possible.
const { optIn, allowChangingOptInStatus } = initializerContext.config.get();
if (allowChangingOptInStatus === false) {
this.isOptedIn$.next(optIn);
}
}
public setup(
{ http, savedObjects }: CoreSetup,
{ analytics, http, savedObjects }: CoreSetup,
{ usageCollection, telemetryCollectionManager }: TelemetryPluginsDepsSetup
): TelemetryPluginSetup {
if (this.isOptedIn$.value !== undefined) {
analytics.optIn({ global: { enabled: this.isOptedIn$.value } });
}
const currentKibanaVersion = this.currentKibanaVersion;
analytics.registerShipper(ElasticV3ServerShipper, {
channelName: 'kibana-server',
version: currentKibanaVersion,
});
const config$ = this.config$;
const isDev = this.isDev;
registerCollection(telemetryCollectionManager);
@ -145,7 +185,12 @@ export class TelemetryPlugin implements Plugin<TelemetryPluginSetup, TelemetryPl
core: CoreStart,
{ telemetryCollectionManager, security }: TelemetryPluginsDepsStart
) {
const { savedObjects } = core;
const { analytics, savedObjects } = core;
this.isOptedIn$
.pipe(filter((isOptedIn): isOptedIn is boolean => typeof isOptedIn === 'boolean'))
.subscribe((isOptedIn) => analytics.optIn({ global: { enabled: isOptedIn } }));
const savedObjectsInternalRepository = savedObjects.createInternalRepository();
this.savedObjectsInternalRepository = savedObjectsInternalRepository;
this.savedObjectsInternalClient$.next(new SavedObjectsClient(savedObjectsInternalRepository));
@ -155,31 +200,46 @@ export class TelemetryPlugin implements Plugin<TelemetryPluginSetup, TelemetryPl
this.startFetcher(core, telemetryCollectionManager);
return {
getIsOptedIn: async () => {
const internalRepositoryClient = await firstValueFrom(this.savedObjectsInternalClient$);
let telemetrySavedObject: TelemetrySavedObject = false; // if an error occurs while fetching opt-in status, a `false` result indicates that Kibana cannot opt-in
try {
telemetrySavedObject = await getTelemetrySavedObject(internalRepositoryClient);
} catch (err) {
this.logger.debug('Failed to check telemetry opt-in status: ' + err.message);
}
const config = await firstValueFrom(this.config$);
const allowChangingOptInStatus = config.allowChangingOptInStatus;
const configTelemetryOptIn = typeof config.optIn === 'undefined' ? null : config.optIn;
const currentKibanaVersion = this.currentKibanaVersion;
const isOptedIn = getTelemetryOptIn({
currentKibanaVersion,
telemetrySavedObject,
allowChangingOptInStatus,
configTelemetryOptIn,
});
return isOptedIn === true;
},
getIsOptedIn: async () => this.isOptedIn$.value === true,
};
}
public stop() {
this.optInPollerSubscription.unsubscribe();
this.isOptedIn$.complete();
}
private async getOptInStatus(): Promise<boolean | undefined> {
const internalRepositoryClient = await firstValueFrom(this.savedObjectsInternalClient$);
let telemetrySavedObject: TelemetrySavedObject | undefined;
try {
telemetrySavedObject = await getTelemetrySavedObject(internalRepositoryClient);
} catch (err) {
this.logger.debug('Failed to check telemetry opt-in status: ' + err.message);
}
// If we can't get the saved object due to permissions or other error other than 404, skip this round.
if (typeof telemetrySavedObject === 'undefined' || telemetrySavedObject === false) {
return;
}
const config = await firstValueFrom(this.config$);
const allowChangingOptInStatus = config.allowChangingOptInStatus;
const configTelemetryOptIn = typeof config.optIn === 'undefined' ? null : config.optIn;
const currentKibanaVersion = this.currentKibanaVersion;
const isOptedIn = getTelemetryOptIn({
currentKibanaVersion,
telemetrySavedObject,
allowChangingOptInStatus,
configTelemetryOptIn,
});
if (typeof isOptedIn === 'boolean') {
return isOptedIn;
}
}
private startFetcher(
core: CoreStart,
telemetryCollectionManager: TelemetryCollectionManagerPluginStart

View file

@ -20,4 +20,5 @@ export class CustomShipper implements IShipper {
});
}
optIn(isOptedIn: boolean) {}
shutdown() {}
}

View file

@ -20,4 +20,5 @@ export class CustomShipper implements IShipper {
});
}
optIn(isOptedIn: boolean) {}
shutdown() {}
}

View file

@ -40,4 +40,5 @@ export class CustomShipper implements IShipper {
extendContext(newContext: EventContext) {
this.actions$.next({ action: 'extendContext', meta: newContext });
}
shutdown() {}
}

View file

@ -40,4 +40,5 @@ export class CustomShipper implements IShipper {
extendContext(newContext: EventContext) {
this.actions$.next({ action: 'extendContext', meta: newContext });
}
shutdown() {}
}

View file

@ -2932,6 +2932,18 @@
version "0.0.0"
uid ""
"@kbn/analytics-shippers-elastic-v3-common@link:bazel-bin/packages/analytics/shippers/elastic_v3/common":
version "0.0.0"
uid ""
"@kbn/analytics-shippers-elastic-v3-server@link:bazel-bin/packages/analytics/shippers/elastic_v3/server":
version "0.0.0"
uid ""
"@kbn/analytics-shippers-elastic-v3-browser@link:bazel-bin/packages/analytics/shippers/elastic_v3/browser":
version "0.0.0"
uid ""
"@kbn/analytics-shippers-fullstory@link:bazel-bin/packages/analytics/shippers/fullstory":
version "0.0.0"
uid ""
@ -6032,6 +6044,18 @@
version "0.0.0"
uid ""
"@types/kbn__analytics-shippers-elastic-v3-common@link:bazel-bin/packages/analytics/shippers/elastic_v3/common/npm_module_types":
version "0.0.0"
uid ""
"@types/kbn__analytics-shippers-elastic-v3-server@link:bazel-bin/packages/analytics/shippers/elastic_v3/server/npm_module_types":
version "0.0.0"
uid ""
"@types/kbn__analytics-shippers-elastic-v3-browser@link:bazel-bin/packages/analytics/shippers/elastic_v3/browser/npm_module_types":
version "0.0.0"
uid ""
"@types/kbn__analytics-shippers-fullstory@link:bazel-bin/packages/analytics/shippers/fullstory/npm_module_types":
version "0.0.0"
uid ""