Stop using aborted event for KibanaRequest.events.aborted$ (#126184) (#126248)

* Stop using `aborted` event for `KibanaRequest.events.aborted$`

* add another test, just in case

* use a single `fromEvent`

* add replay effect to aborted$

* improve impl

* remove useless bottom-stream replay

* yup, that's simpler

(cherry picked from commit d053a7f026)

Co-authored-by: Pierre Gayvallet <pierre.gayvallet@elastic.co>
This commit is contained in:
Kibana Machine 2022-02-23 12:40:42 -05:00 committed by GitHub
parent 41d2ba1bd7
commit 467de6fafe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 7 deletions

View file

@ -198,6 +198,42 @@ describe('KibanaRequest', () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
});
it('emits once and completes when request aborted after the payload has been consumed', async () => {
expect.assertions(1);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');
const nextSpy = jest.fn();
const done = new Promise<void>((resolve) => {
router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, request, res) => {
request.events.aborted$.subscribe({
next: nextSpy,
complete: resolve,
});
// prevents the server to respond
await delay(30000);
return res.ok({ body: 'ok' });
}
);
});
await server.start();
const incomingRequest = supertest(innerServer.listener)
.post('/')
.send({ hello: 'dolly' })
// end required to send request
.end();
setTimeout(() => incomingRequest.abort(), 50);
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});
it('completes & does not emit when request handled', async () => {
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');
@ -336,6 +372,41 @@ describe('KibanaRequest', () => {
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});
it('emits once and completes when response is aborted after the payload has been consumed', async () => {
expect.assertions(2);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');
const nextSpy = jest.fn();
const done = new Promise<void>((resolve) => {
router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: resolve,
});
expect(nextSpy).not.toHaveBeenCalled();
await delay(30000);
return res.ok({ body: 'ok' });
}
);
});
await server.start();
const incomingRequest = supertest(innerServer.listener)
.post('/')
.send({ foo: 'bar' })
// end required to send request
.end();
setTimeout(() => incomingRequest.abort(), 50);
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});
});
});

View file

@ -9,8 +9,8 @@
import { URL } from 'url';
import uuid from 'uuid';
import { Request, RouteOptionsApp, RequestApplicationState, RouteOptions } from '@hapi/hapi';
import { Observable, fromEvent, merge } from 'rxjs';
import { shareReplay, first, takeUntil } from 'rxjs/operators';
import { Observable, fromEvent } from 'rxjs';
import { shareReplay, first, filter } from 'rxjs/operators';
import { RecursiveReadonly } from '@kbn/utility-types';
import { deepFreeze } from '@kbn/std';
@ -127,6 +127,7 @@ export class KibanaRequest<
const body = routeValidator.getBody(req.payload, 'request body');
return { query, params, body };
}
/**
* A identifier to identify this request.
*
@ -215,11 +216,9 @@ export class KibanaRequest<
}
private getEvents(request: Request): KibanaRequestEvents {
// the response is completed, or its underlying connection was terminated prematurely
const finish$ = fromEvent(request.raw.res, 'close').pipe(shareReplay(1), first());
const aborted$ = fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$));
const completed$ = merge<void, void>(finish$, aborted$).pipe(shareReplay(1), first());
const completed$ = fromEvent<void>(request.raw.res, 'close').pipe(shareReplay(1), first());
// the response's underlying connection was terminated prematurely
const aborted$ = completed$.pipe(filter(() => !isCompleted(request)));
return {
aborted$,
@ -331,3 +330,7 @@ function isRequest(request: any): request is Request {
export function isRealRequest(request: unknown): request is KibanaRequest | Request {
return isKibanaRequest(request) || isRequest(request);
}
function isCompleted(request: Request) {
return request.raw.res.writableFinished;
}