[EBT] ElasticV3-Server - Check connectivity as soon as optIn(true) is called (#136936)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2022-07-25 19:45:41 +02:00 committed by GitHub
parent 691e21ec2d
commit 0824234fe1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 248 additions and 93 deletions

View file

@ -47,7 +47,7 @@ describe('ElasticV3ServerShipper', () => {
initContext
);
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = null;
shipper['firstTimeOffline'] = null; // The tests think connectivity is OK initially for easier testing.
});
afterEach(() => {
@ -57,7 +57,7 @@ describe('ElasticV3ServerShipper', () => {
test('set optIn should update the isOptedIn$ observable', () => {
// eslint-disable-next-line dot-notation
const getInternalOptIn = () => shipper['isOptedIn'];
const getInternalOptIn = () => shipper['isOptedIn$'].value;
// Initially undefined
expect(getInternalOptIn()).toBeUndefined();
@ -342,97 +342,242 @@ describe('ElasticV3ServerShipper', () => {
})
);
test(
'connectivity check is run after report failure',
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-ndjson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
describe('Connectivity Checks', () => {
describe('connectivity check when connectivity is confirmed (firstTimeOffline === null)', () => {
test.each([undefined, false, true])('does not run for opt-in %p', (optInValue) =>
fakeSchedulers(async (advance) => {
if (optInValue !== undefined) {
shipper.optIn(optInValue);
}
// From the start, it doesn't check connectivity because already confirmed
expect(fetchMock).not.toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// Wait a big time (1 minute should be enough, but for the sake of tests...)
advance(10 * MINUTES);
await nextTick();
expect(fetchMock).not.toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
})()
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "Failed to fetch",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "failed",
}
`);
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
advance(1 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
2,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
fetchMock.mockResolvedValueOnce({ ok: false });
advance(2 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
3,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
});
describe('connectivity check with initial unknown state of the connectivity', () => {
beforeEach(() => {
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = undefined; // Initial unknown state of the connectivity
});
test.each([undefined, false])('does not run for opt-in %p', (optInValue) =>
fakeSchedulers(async (advance) => {
if (optInValue !== undefined) {
shipper.optIn(optInValue);
}
// From the start, it doesn't check connectivity because already confirmed
expect(fetchMock).not.toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// Wait a big time (1 minute should be enough, but for the sake of tests...)
advance(10 * MINUTES);
await nextTick();
expect(fetchMock).not.toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
})()
);
// let's see the effect of after 24 hours:
shipper.reportEvents(events);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(1);
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = 100;
test('runs as soon as opt-in is set to true', () => {
shipper.optIn(true);
fetchMock.mockResolvedValueOnce({ ok: false });
advance(4 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
4,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
// From the start, it doesn't check connectivity because opt-in is not true
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
});
});
describe('connectivity check with the connectivity confirmed to be faulty', () => {
beforeEach(() => {
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = 100; // Failed at some point
});
test.each([undefined, false])('does not run for opt-in %p', (optInValue) =>
fakeSchedulers(async (advance) => {
if (optInValue !== undefined) {
shipper.optIn(optInValue);
}
// From the start, it doesn't check connectivity because already confirmed
expect(fetchMock).not.toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// Wait a big time (1 minute should be enough, but for the sake of tests...)
advance(10 * MINUTES);
await nextTick();
expect(fetchMock).not.toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
})()
);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
// New events are not added to the queue because it's been offline for 24 hours.
shipper.reportEvents(events);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
test('runs as soon as opt-in is set to true', () => {
shipper.optIn(true);
// Regains connection
fetchMock.mockResolvedValueOnce({ ok: true });
advance(8 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
5,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
// From the start, it doesn't check connectivity because opt-in is not true
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
});
});
describe('after report failure', () => {
// generate the report failure for each test
beforeEach(
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
shipper.reportEvents(events);
shipper.optIn(true);
const counter = firstValueFrom(shipper.telemetryCounter$);
setLastBatchSent(Date.now() - 10 * MINUTES);
advance(10 * MINUTES);
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-ndjson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "Failed to fetch",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_server",
"type": "failed",
}
`);
})
);
// eslint-disable-next-line dot-notation
expect(shipper['firstTimeOffline']).toBe(null);
advance(16 * MINUTES);
await nextTick();
expect(fetchMock).not.toHaveBeenNthCalledWith(
6,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
test(
'connectivity check runs periodically',
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
advance(1 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
2,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
fetchMock.mockResolvedValueOnce({ ok: false });
advance(2 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
3,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
})
);
})
);
});
describe('after being offline for longer than 24h', () => {
beforeEach(() => {
shipper.optIn(true);
shipper.reportEvents(events);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(1);
// eslint-disable-next-line dot-notation
shipper['firstTimeOffline'] = 100;
});
test(
'the following connectivity check clears the queue',
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
advance(1 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
})
);
test(
'new events are not added to the queue',
fakeSchedulers(async (advance) => {
fetchMock.mockRejectedValueOnce(new Error('Failed to fetch'));
advance(1 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
shipper.reportEvents(events);
// eslint-disable-next-line dot-notation
expect(shipper['internalQueue'].length).toBe(0);
})
);
test(
'regains the connection',
fakeSchedulers(async (advance) => {
fetchMock.mockResolvedValueOnce({ ok: true });
advance(1 * MINUTES);
await nextTick();
expect(fetchMock).toHaveBeenNthCalledWith(
1,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
// eslint-disable-next-line dot-notation
expect(shipper['firstTimeOffline']).toBe(null);
advance(10 * MINUTES);
await nextTick();
expect(fetchMock).not.toHaveBeenNthCalledWith(
2,
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{ method: 'OPTIONS' }
);
})
);
});
});
});

View file

@ -22,6 +22,8 @@ import {
delayWhen,
takeUntil,
map,
BehaviorSubject,
exhaustMap,
} from 'rxjs';
import type {
AnalyticsClientInitContext,
@ -30,8 +32,8 @@ import type {
IShipper,
TelemetryCounter,
} from '@kbn/analytics-client';
import type { ElasticV3ShipperOptions } from '@kbn/analytics-shippers-elastic-v3-common';
import {
type ElasticV3ShipperOptions,
buildHeaders,
buildUrl,
createTelemetryCounterHelper,
@ -62,6 +64,7 @@ export class ElasticV3ServerShipper implements IShipper {
private readonly internalQueue: Event[] = [];
private readonly shutdown$ = new ReplaySubject<void>(1);
private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);
private readonly url: string;
@ -69,7 +72,6 @@ export class ElasticV3ServerShipper implements IShipper {
private clusterUuid: string = 'UNKNOWN';
private licenseId?: string;
private isOptedIn?: boolean;
/**
* Specifies when it went offline:
@ -116,7 +118,7 @@ export class ElasticV3ServerShipper implements IShipper {
* @param isOptedIn `true` for resume sending events. `false` to stop.
*/
public optIn(isOptedIn: boolean) {
this.isOptedIn = isOptedIn;
this.isOptedIn$.next(isOptedIn);
if (isOptedIn === false) {
this.internalQueue.length = 0;
@ -129,7 +131,7 @@ export class ElasticV3ServerShipper implements IShipper {
*/
public reportEvents(events: Event[]) {
if (
this.isOptedIn === false ||
this.isOptedIn$.value === false ||
(this.firstTimeOffline && Date.now() - this.firstTimeOffline > 24 * HOUR)
) {
return;
@ -157,6 +159,7 @@ export class ElasticV3ServerShipper implements IShipper {
public shutdown() {
this.shutdown$.next();
this.shutdown$.complete();
this.isOptedIn$.complete();
}
/**
@ -169,11 +172,17 @@ export class ElasticV3ServerShipper implements IShipper {
*/
private checkConnectivity() {
let backoff = 1 * MINUTE;
timer(0, 1 * MINUTE)
merge(
timer(0, 1 * MINUTE),
// Also react to opt-in changes to avoid being stalled for 1 minute for the first connectivity check.
// More details in: https://github.com/elastic/kibana/issues/135647
this.isOptedIn$
)
.pipe(
takeUntil(this.shutdown$),
filter(() => this.isOptedIn === true && this.firstTimeOffline !== null),
concatMap(async () => {
filter(() => this.isOptedIn$.value === true && this.firstTimeOffline !== null),
// Using exhaustMap here because one request at a time is enough to check the connectivity.
exhaustMap(async () => {
const { ok } = await fetch(this.url, {
method: 'OPTIONS',
});
@ -215,7 +224,7 @@ export class ElasticV3ServerShipper implements IShipper {
)
.pipe(
// Only move ahead if it's opted-in and online.
filter(() => this.isOptedIn === true && this.firstTimeOffline === null),
filter(() => this.isOptedIn$.value === true && this.firstTimeOffline === null),
// Send the events now if (validations sorted from cheapest to most CPU expensive):
// - We are shutting down.
@ -241,6 +250,7 @@ export class ElasticV3ServerShipper implements IShipper {
// 2. Skip empty buffers
filter((events) => events.length > 0),
// 3. Actually send the events
// Using `concatMap` here because we want to send events whenever the emitter says so. Otherwise, it'd skip sending some events.
concatMap(async (eventsToSend) => await this.sendEvents(eventsToSend))
)
.subscribe();