[EBT] Add flush method and call it during stop (#144925)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
Resolves https://github.com/elastic/kibana/issues/140521
This commit is contained in:
Alejandro Fernández Haro 2022-11-16 14:48:11 +01:00 committed by GitHub
parent f90072df0c
commit a5f5d8682e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 278 additions and 29 deletions

View file

@ -133,6 +133,15 @@ analytics.optIn({
})
```
### Explicit flush of the events
If, at any given point (usually testing or during shutdowns) we need to make sure that all the pending events
in the queue are sent. The `flush` API returns a promise that will resolve as soon as all events in the queue are sent.
```typescript
await analytics.flush()
```
### Shipping events
In order to report the event to an analytics tool, we need to register the shippers our application wants to use. To register a shipper use the API `registerShipper`:

View file

@ -30,8 +30,8 @@ describe('AnalyticsClient', () => {
});
});
afterEach(() => {
analyticsClient.shutdown();
afterEach(async () => {
await analyticsClient.shutdown();
jest.useRealTimers();
});
@ -381,7 +381,7 @@ describe('AnalyticsClient', () => {
test(
'Handles errors in the shipper',
fakeSchedulers((advance) => {
fakeSchedulers(async (advance) => {
const optInMock = jest.fn().mockImplementation(() => {
throw new Error('Something went terribly wrong');
});
@ -404,7 +404,7 @@ describe('AnalyticsClient', () => {
`Shipper "${MockedShipper.shipperName}" failed to extend the context`,
expect.any(Error)
);
expect(() => analyticsClient.shutdown()).not.toThrow();
await expect(analyticsClient.shutdown()).resolves.toBeUndefined();
expect(shutdownMock).toHaveBeenCalled();
})
);

View file

@ -238,7 +238,20 @@ export class AnalyticsClient implements IAnalyticsClient {
this.shipperRegistered$.next();
};
public shutdown = () => {
public flush = async () => {
await Promise.all(
[...this.shippersRegistry.allShippers.entries()].map(async ([shipperName, shipper]) => {
try {
await shipper.flush();
} catch (err) {
this.initContext.logger.warn(`Failed to flush shipper "${shipperName}"`, err);
}
})
);
};
public shutdown = async () => {
await this.flush();
this.shippersRegistry.allShippers.forEach((shipper, shipperName) => {
try {
shipper.shutdown();

View file

@ -18,6 +18,7 @@ function createMockedAnalyticsClient(): jest.Mocked<IAnalyticsClient> {
removeContextProvider: jest.fn(),
registerShipper: jest.fn(),
telemetryCounter$: new Subject(),
flush: jest.fn(),
shutdown: jest.fn(),
};
}

View file

@ -216,7 +216,11 @@ export interface IAnalyticsClient {
*/
readonly telemetryCounter$: Observable<TelemetryCounter>;
/**
* Stops the client.
* Forces all shippers to send all their enqueued events and fulfills the returned promise.
*/
shutdown: () => void;
flush: () => Promise<void>;
/**
* Stops the client. Flushing any pending events in the process.
*/
shutdown: () => Promise<void>;
}

View file

@ -23,6 +23,7 @@ class MockedShipper implements IShipper {
public reportEvents = jest.fn();
public extendContext = jest.fn();
public telemetryCounter$ = new Subject<TelemetryCounter>();
public flush = jest.fn();
public shutdown = jest.fn();
}

View file

@ -32,6 +32,10 @@ export interface IShipper {
* Observable to emit the stats of the processed events.
*/
telemetryCounter$?: Observable<TelemetryCounter>;
/**
* Sends all the enqueued events and fulfills the returned promise.
*/
flush: () => Promise<void>;
/**
* Shutdown the shipper.
*/

View file

@ -161,6 +161,58 @@ describe('ElasticV3BrowserShipper', () => {
})
);
test(
'calls to flush forces the client to send all the pending events',
fakeSchedulers(async (advance) => {
shipper.optIn(true);
shipper.reportEvents(events);
const counter = firstValueFrom(shipper.telemetryCounter$);
const promise = shipper.flush();
advance(0); // bufferWhen requires some sort of fake scheduling to advance (but we are not advancing 1s)
await promise;
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-ndjson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
keepalive: true,
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "200",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_browser",
"type": "succeeded",
}
`);
})
);
test('calls to flush resolve immediately if there is nothing to send', async () => {
shipper.optIn(true);
await shipper.flush();
expect(fetchMock).toHaveBeenCalledTimes(0);
});
test('calling flush multiple times does not keep hanging', async () => {
await expect(shipper.flush()).resolves.toBe(undefined);
await expect(shipper.flush()).resolves.toBe(undefined);
await Promise.all([shipper.flush(), shipper.flush()]);
});
test('calling flush after shutdown does not keep hanging', async () => {
shipper.shutdown();
await expect(shipper.flush()).resolves.toBe(undefined);
});
test('calls to reportEvents call `fetch` when shutting down if optIn value is set to true', async () => {
shipper.reportEvents(events);
shipper.optIn(true);

View file

@ -6,7 +6,17 @@
* Side Public License, v 1.
*/
import { BehaviorSubject, interval, Subject, bufferWhen, concatMap, filter, skipWhile } from 'rxjs';
import {
BehaviorSubject,
interval,
Subject,
bufferWhen,
concatMap,
skipWhile,
firstValueFrom,
map,
merge,
} from 'rxjs';
import type {
AnalyticsClientInitContext,
Event,
@ -39,6 +49,8 @@ export class ElasticV3BrowserShipper implements IShipper {
private readonly url: string;
private readonly internalQueue$ = new Subject<Event>();
private readonly flush$ = new Subject<void>();
private readonly queueFlushed$ = new Subject<void>();
private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);
private clusterUuid: string = 'UNKNOWN';
@ -92,25 +104,48 @@ export class ElasticV3BrowserShipper implements IShipper {
});
}
/**
* Triggers a flush of the internal queue to attempt to send any events held in the queue
* and resolves the returned promise once the queue is emptied.
*/
public async flush() {
if (this.flush$.isStopped) {
// If called after shutdown, return straight away
return;
}
const promise = firstValueFrom(this.queueFlushed$);
this.flush$.next();
await promise;
}
/**
* Shuts down the shipper.
* Triggers a flush of the internal queue to attempt to send any events held in the queue.
*/
public shutdown() {
this.internalQueue$.complete(); // NOTE: When completing the observable, the buffer logic does not wait and releases any buffered events.
this.flush$.complete();
}
private setUpInternalQueueSubscriber() {
this.internalQueue$
.pipe(
// Buffer events for 1 second or until we have an optIn value
bufferWhen(() => interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))),
// Discard any events if we are not opted in
skipWhile(() => this.isOptedIn$.value === false),
// Skip empty buffers
filter((events) => events.length > 0),
// Send events
concatMap(async (events) => this.sendEvents(events))
bufferWhen(() =>
merge(
this.flush$,
interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))
)
),
// Send events (one batch at a time)
concatMap(async (events) => {
// Only send if opted-in and there's anything to send
if (this.isOptedIn$.value === true && events.length > 0) {
await this.sendEvents(events);
}
}),
map(() => this.queueFlushed$.next())
)
.subscribe();
}

View file

@ -580,4 +580,45 @@ describe('ElasticV3ServerShipper', () => {
);
});
});
describe('flush method', () => {
test('resolves straight away if it should not send anything', async () => {
await expect(shipper.flush()).resolves.toBe(undefined);
});
test('resolves when all the ongoing requests are complete', async () => {
shipper.optIn(true);
shipper.reportEvents(events);
expect(fetchMock).toHaveBeenCalledTimes(0);
fetchMock.mockImplementation(async () => {
// eslint-disable-next-line dot-notation
expect(shipper['inFlightRequests$'].value).toBe(1);
});
await expect(shipper.flush()).resolves.toBe(undefined);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-ndjson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
});
test('calling flush multiple times does not keep hanging', async () => {
await expect(shipper.flush()).resolves.toBe(undefined);
await expect(shipper.flush()).resolves.toBe(undefined);
await Promise.all([shipper.flush(), shipper.flush()]);
});
test('calling flush after shutdown does not keep hanging', async () => {
shipper.shutdown();
await expect(shipper.flush()).resolves.toBe(undefined);
});
});
});

View file

@ -22,6 +22,8 @@ import {
BehaviorSubject,
exhaustMap,
mergeMap,
skip,
firstValueFrom,
} from 'rxjs';
import type {
AnalyticsClientInitContext,
@ -63,6 +65,8 @@ export class ElasticV3ServerShipper implements IShipper {
private readonly internalQueue: Event[] = [];
private readonly shutdown$ = new ReplaySubject<void>(1);
private readonly flush$ = new Subject<void>();
private readonly inFlightRequests$ = new BehaviorSubject<number>(0);
private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);
private readonly url: string;
@ -152,12 +156,33 @@ export class ElasticV3ServerShipper implements IShipper {
this.internalQueue.push(...events);
}
/**
* Triggers a flush of the internal queue to attempt to send any events held in the queue
* and resolves the returned promise once the queue is emptied.
*/
public async flush() {
if (this.flush$.isStopped) {
// If called after shutdown, return straight away
return;
}
const promise = firstValueFrom(
this.inFlightRequests$.pipe(
skip(1), // Skipping the first value because BehaviourSubjects always emit the current value on subscribe.
filter((count) => count === 0) // Wait until all the inflight requests are completed.
)
);
this.flush$.next();
await promise;
}
/**
* Shuts down the shipper.
* Triggers a flush of the internal queue to attempt to send any events held in the queue.
*/
public shutdown() {
this.shutdown$.next();
this.flush$.complete();
this.shutdown$.complete();
this.isOptedIn$.complete();
}
@ -226,17 +251,26 @@ export class ElasticV3ServerShipper implements IShipper {
takeUntil(this.shutdown$),
map(() => ({ shouldFlush: false }))
),
// Whenever a `flush` request comes in
this.flush$.pipe(map(() => ({ shouldFlush: true }))),
// Attempt to send one last time on shutdown, flushing the queue
this.shutdown$.pipe(map(() => ({ shouldFlush: true })))
)
.pipe(
// Only move ahead if it's opted-in and online, and there are some events in the queue
filter(
() =>
filter(() => {
const shouldSendAnything =
this.isOptedIn$.value === true &&
this.firstTimeOffline === null &&
this.internalQueue.length > 0
),
this.internalQueue.length > 0;
// If it should not send anything, re-emit the inflight request observable just in case it's already 0
if (!shouldSendAnything) {
this.inFlightRequests$.next(this.inFlightRequests$.value);
}
return shouldSendAnything;
}),
// Send the events:
// 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions.
@ -298,6 +332,7 @@ export class ElasticV3ServerShipper implements IShipper {
private async sendEvents(events: Event[]) {
this.initContext.logger.debug(`Reporting ${events.length} events...`);
this.inFlightRequests$.next(this.inFlightRequests$.value + 1);
try {
const code = await this.makeRequest(events);
this.reportTelemetryCounters(events, { code });
@ -308,6 +343,7 @@ export class ElasticV3ServerShipper implements IShipper {
this.reportTelemetryCounters(events, { code: error.code, error });
this.firstTimeOffline = undefined;
}
this.inFlightRequests$.next(Math.max(0, this.inFlightRequests$.value - 1));
}
private async makeRequest(events: Event[]): Promise<string> {

View file

@ -135,6 +135,12 @@ export class FullStoryShipper implements IShipper {
});
}
/**
* Flushes all internal queues of the shipper.
* It doesn't really do anything inside because this shipper doesn't hold any internal queues.
*/
public async flush() {}
/**
* Shuts down the shipper.
* It doesn't really do anything inside because this shipper doesn't hold any internal queues.

View file

@ -93,6 +93,12 @@ export class GainsightShipper implements IShipper {
});
}
/**
* Flushes all internal queues of the shipper.
* It doesn't really do anything inside because this shipper doesn't hold any internal queues.
*/
public async flush() {}
/**
* Shuts down the shipper.
* It doesn't really do anything inside because this shipper doesn't hold any internal queues.

View file

@ -17,6 +17,7 @@ export const analyticsClientMock: jest.Mocked<AnalyticsClient> = {
removeContextProvider: jest.fn(),
registerShipper: jest.fn(),
telemetryCounter$: new Subject(),
flush: jest.fn(),
shutdown: jest.fn(),
};

View file

@ -45,6 +45,9 @@ export class AnalyticsService {
this.registerBrowserInfoAnalyticsContext();
this.subscriptionsHandler.add(trackClicks(this.analyticsClient, core.env.mode.dev));
this.subscriptionsHandler.add(trackViewportSize(this.analyticsClient));
// Register a flush method in the browser so CI can explicitly call it before closing the browser.
window.__kbnAnalytics = { flush: () => this.analyticsClient.flush() };
}
public setup({ injectedMetadata }: AnalyticsServiceSetupDeps): AnalyticsServiceSetup {
@ -69,9 +72,9 @@ export class AnalyticsService {
};
}
public stop() {
public async stop() {
this.subscriptionsHandler.unsubscribe();
this.analyticsClient.shutdown();
await this.analyticsClient.shutdown();
}
/**

View file

@ -6,4 +6,8 @@
* Side Public License, v 1.
*/
export type { AnalyticsServiceSetup, AnalyticsServiceStart } from './src/types';
export type {
AnalyticsServiceSetup,
AnalyticsServiceStart,
KbnAnalyticsWindowApi,
} from './src/types';

View file

@ -13,7 +13,7 @@ import type { AnalyticsClient } from '@kbn/analytics-client';
* {@link AnalyticsClient}
* @public
*/
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'flush' | 'shutdown'>;
/**
* Exposes the public APIs of the AnalyticsClient during the start phase
@ -24,3 +24,19 @@ export type AnalyticsServiceStart = Pick<
AnalyticsClient,
'optIn' | 'reportEvent' | 'telemetryCounter$'
>;
/**
* API exposed through `window.__kbnAnalytics`
*/
export interface KbnAnalyticsWindowApi {
/**
* Returns a promise that resolves when all the events in the queue have been sent.
*/
flush: AnalyticsClient['flush'];
}
declare global {
interface Window {
__kbnAnalytics: KbnAnalyticsWindowApi;
}
}

View file

@ -65,8 +65,8 @@ export class AnalyticsService {
};
}
public stop() {
this.analyticsClient.shutdown();
public async stop() {
await this.analyticsClient.shutdown();
}
/**

View file

@ -13,14 +13,14 @@ import type { AnalyticsClient } from '@kbn/analytics-client';
* {@link AnalyticsClient}
* @public
*/
export type AnalyticsServicePreboot = Omit<AnalyticsClient, 'shutdown'>;
export type AnalyticsServicePreboot = Omit<AnalyticsClient, 'flush' | 'shutdown'>;
/**
* Exposes the public APIs of the AnalyticsClient during the setup phase.
* {@link AnalyticsClient}
* @public
*/
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'shutdown'>;
export type AnalyticsServiceSetup = Omit<AnalyticsClient, 'flush' | 'shutdown'>;
/**
* Exposes the public APIs of the AnalyticsClient during the start phase

View file

@ -430,7 +430,7 @@ export class Server {
public async stop() {
this.log.debug('stopping server');
this.analytics.stop();
await this.analytics.stop();
await this.http.stop(); // HTTP server has to stop before savedObjects and ES clients are closed to be able to gracefully attempt to resolve any pending requests
await this.plugins.stop();
await this.savedObjects.stop();

View file

@ -27,5 +27,6 @@ export class CustomShipper implements IShipper {
});
}
optIn(isOptedIn: boolean) {}
async flush() {}
shutdown() {}
}

View file

@ -27,5 +27,6 @@ export class CustomShipper implements IShipper {
});
}
optIn(isOptedIn: boolean) {}
async flush() {}
shutdown() {}
}

View file

@ -39,5 +39,8 @@ export class CustomShipper implements IShipper {
extendContext(newContext: EventContext) {
this.actions$.next({ action: 'extendContext', meta: newContext });
}
async flush() {
this.actions$.next({ action: 'flush', meta: {} });
}
shutdown() {}
}

View file

@ -74,6 +74,8 @@ export class AnalyticsPluginA implements Plugin {
setOptIn(optIn: boolean) {
analytics.optIn({ global: { enabled: optIn } });
},
getFlushAction: async () =>
firstValueFrom(this.actions$.pipe(filter(({ action }) => action === 'flush'))),
};
registerContextProvider({

View file

@ -13,6 +13,7 @@ declare global {
interface Window {
__analyticsPluginA__: {
getActionsUntilReportTestPluginLifecycleEvent: () => Promise<Action[]>;
getFlushAction: () => Promise<Action>;
stats: TelemetryCounter[];
setOptIn: (optIn: boolean) => void;
};

View file

@ -39,5 +39,8 @@ export class CustomShipper implements IShipper {
extendContext(newContext: EventContext) {
this.actions$.next({ action: 'extendContext', meta: newContext });
}
async flush() {
this.actions$.next({ action: 'flush', meta: {} });
}
shutdown() {}
}

View file

@ -9,8 +9,8 @@
import expect from '@kbn/expect';
import type { Event, TelemetryCounter } from '@kbn/core/server';
import type { Action } from '@kbn/analytics-plugin-a-plugin/public/custom_shipper';
import type { FtrProviderContext } from '../services';
import '@kbn/analytics-plugin-a-plugin/public/types';
import type { FtrProviderContext } from '../services';
export default function ({ getService, getPageObjects }: FtrProviderContext) {
const { common } = getPageObjects(['common']);
@ -170,6 +170,12 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
expect(event.context.user_agent).to.be.a('string');
});
it('should call flush when using the window-exposed flush method', async () => {
await browser.execute(() => window.__kbnAnalytics.flush());
const action = await browser.execute(() => window.__analyticsPluginA__.getFlushAction());
expect(action).to.eql({ action: 'flush', meta: {} });
});
describe('Test helpers capabilities', () => {
it('should return the count of the events', async () => {
const eventCount = await ebtUIHelper.getEventCount({