[EBT/ElasticV3Server] Simplify leaky bucket (#143323)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2022-10-17 18:17:57 +02:00 committed by GitHub
parent 2d48eef8aa
commit 6e5f13740c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 47 deletions

View file

@ -22,4 +22,4 @@ analytics.registerShipper(ElasticV3ServerShipper, { channelName: 'myChannel', ve
## Transmission protocol
This shipper sends the events to the Elastic Internal Telemetry Service. It holds up to 1000 events in a shared queue. Any additional incoming events once it's full will be dropped. It sends the events from the queue in batches of 10kB every 10 seconds. If not enough events are available in the queue for longer than 10 minutes, it will send any remaining events. When shutting down, it'll send all the remaining events in the queue.
This shipper sends the events to the Elastic Internal Telemetry Service. It holds up to 1000 events in a shared queue. Any additional incoming events once it's full will be dropped. It sends the events from the queue in batches of up to 10kB every 10 seconds. When shutting down, it'll send all the remaining events in the queue.

View file

@ -117,13 +117,13 @@ describe('ElasticV3ServerShipper', () => {
);
test(
'calls to reportEvents call `fetch` after 10 minutes when optIn value is set to true',
'calls to reportEvents call `fetch` after 10 seconds when optIn value is set to true',
fakeSchedulers(async (advance) => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(1 * SECONDS); // Moving 1 second should be enough to trigger the logic
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
@ -150,12 +150,12 @@ describe('ElasticV3ServerShipper', () => {
);
test(
'calls to reportEvents do not call `fetch` after 10 minutes when optIn value is set to false',
'calls to reportEvents do not call `fetch` after 10 seconds when optIn value is set to false',
fakeSchedulers((advance) => {
shipper.reportEvents(events);
shipper.optIn(false);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(1 * SECONDS); // Moving 1 second should be enough to trigger the logic
expect(fetchMock).not.toHaveBeenCalled();
})
);
@ -201,8 +201,8 @@ describe('ElasticV3ServerShipper', () => {
shipper['firstTimeOffline'] = null;
shipper.reportEvents(events);
shipper.optIn(true);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(1 * SECONDS); // Moving 1 second should be enough to trigger the logic
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
@ -226,11 +226,11 @@ describe('ElasticV3ServerShipper', () => {
shipper.reportEvents(new Array(1000).fill(events[0]));
shipper.optIn(true);
// Due to the size of the test events, it matches 9 rounds.
// Due to the size of the test events, it matches 8 rounds.
for (let i = 0; i < 9; i++) {
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(10 * SECONDS);
advance(1 * SECONDS); // Moving 1 second should be enough to trigger the logic
expect(fetchMock).toHaveBeenNthCalledWith(
i + 1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
@ -277,8 +277,8 @@ describe('ElasticV3ServerShipper', () => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(1 * SECONDS); // Moving 1 second should be enough to trigger the logic
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
@ -315,8 +315,8 @@ describe('ElasticV3ServerShipper', () => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(1 * SECONDS); // Moving 1 second should be enough to trigger the logic
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
@ -458,8 +458,8 @@ describe('ElasticV3ServerShipper', () => {
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
setLastBatchSent(Date.now() - 10 * SECONDS);
advance(1 * SECONDS); // Moving 1 second should be enough to trigger the logic
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',

View file

@ -12,10 +12,7 @@ import {
Subject,
ReplaySubject,
interval,
concatMap,
merge,
from,
firstValueFrom,
timer,
retryWhen,
tap,
@ -24,6 +21,7 @@ import {
map,
BehaviorSubject,
exhaustMap,
mergeMap,
} from 'rxjs';
import type {
AnalyticsClientInitContext,
@ -46,6 +44,7 @@ const MINUTE = 60 * SECOND;
const HOUR = 60 * MINUTE;
const KIB = 1024;
const MAX_NUMBER_OF_EVENTS_IN_INTERNAL_QUEUE = 1000;
const MIN_TIME_SINCE_LAST_SEND = 10 * SECOND;
/**
* Elastic V3 shipper to use on the server side.
@ -130,6 +129,7 @@ export class ElasticV3ServerShipper implements IShipper {
* @param events batched events {@link Event}
*/
public reportEvents(events: Event[]) {
// If opted out OR offline for longer than 24 hours, skip processing any events.
if (
this.isOptedIn$.value === false ||
(this.firstTimeOffline && Date.now() - this.firstTimeOffline > 24 * HOUR)
@ -216,42 +216,40 @@ export class ElasticV3ServerShipper implements IShipper {
}
private setInternalSubscriber() {
// Check the status of the queues every 1 second.
// Create an emitter that emits when MIN_TIME_SINCE_LAST_SEND have passed since the last time we sent the data
const minimumTimeSinceLastSent$ = interval(SECOND).pipe(
filter(() => Date.now() - this.lastBatchSent >= MIN_TIME_SINCE_LAST_SEND)
);
merge(
interval(1000).pipe(takeUntil(this.shutdown$)),
// Using a promise because complete does not emit through the pipe.
from(firstValueFrom(this.shutdown$, { defaultValue: true }))
minimumTimeSinceLastSent$.pipe(
takeUntil(this.shutdown$),
map(() => ({ shouldFlush: false }))
),
// Attempt to send one last time on shutdown, flushing the queue
this.shutdown$.pipe(map(() => ({ shouldFlush: true })))
)
.pipe(
// Only move ahead if it's opted-in and online.
filter(() => this.isOptedIn$.value === true && this.firstTimeOffline === null),
// Send the events now if (validations sorted from cheapest to most CPU expensive):
// - We are shutting down.
// - There are some events in the queue, and we didn't send anything in the last 10 minutes.
// - The last time we sent was more than 10 seconds ago and:
// - We reached the minimum batch size of 10kB per request in our leaky bucket.
// - The queue is full (meaning we'll never get to 10kB because the events are very small).
// Only move ahead if it's opted-in and online, and there are some events in the queue
filter(
() =>
(this.internalQueue.length > 0 &&
(this.shutdown$.isStopped || Date.now() - this.lastBatchSent >= 10 * MINUTE)) ||
(Date.now() - this.lastBatchSent >= 10 * SECOND &&
(this.internalQueue.length === MAX_NUMBER_OF_EVENTS_IN_INTERNAL_QUEUE ||
this.getQueueByteSize(this.internalQueue) >= 10 * KIB))
this.isOptedIn$.value === true &&
this.firstTimeOffline === null &&
this.internalQueue.length > 0
),
// Send the events:
// 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions.
map(() => {
map(({ shouldFlush }) => {
this.lastBatchSent = Date.now();
return this.getEventsToSend();
return this.getEventsToSend(shouldFlush);
}),
// 2. Skip empty buffers
// 2. Skip empty buffers (just to be sure)
filter((events) => events.length > 0),
// 3. Actually send the events
// Using `concatMap` here because we want to send events whenever the emitter says so. Otherwise, it'd skip sending some events.
concatMap(async (eventsToSend) => await this.sendEvents(eventsToSend))
// Using `mergeMap` here because we want to send events whenever the emitter says so:
// We don't want to skip emissions (exhaustMap) or enqueue them (concatMap).
mergeMap((eventsToSend) => this.sendEvents(eventsToSend))
)
.subscribe();
}
@ -278,13 +276,13 @@ export class ElasticV3ServerShipper implements IShipper {
}
/**
* Returns a queue of events of up-to 10kB.
* Returns a queue of events of up-to 10kB. Or all events in the queue if it's a FLUSH action.
* @remarks It mutates the internal queue by removing from it the events returned by this method.
* @private
*/
private getEventsToSend(): Event[] {
// If the internal queue is already smaller than the minimum batch size, do a direct assignment.
if (this.getQueueByteSize(this.internalQueue) < 10 * KIB) {
private getEventsToSend(shouldFlush: boolean): Event[] {
// If the internal queue is already smaller than the minimum batch size, or it's a flush action, do a direct assignment.
if (shouldFlush || this.getQueueByteSize(this.internalQueue) < 10 * KIB) {
return this.internalQueue.splice(0, this.internalQueue.length);
}
// Otherwise, we'll feed the events to the leaky bucket queue until we reach 10kB.