[bfetch] Fix memory leak (#113756)

This commit is contained in:
Anton Dosov 2021-10-11 13:49:54 +02:00 committed by GitHub
parent ce7b1ea653
commit 427c7ef8ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 94 deletions

View file

@ -9,7 +9,7 @@
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject, of as rxof } from 'rxjs';
import { Subject } from 'rxjs';
const flushPromises = () => new Promise((resolve) => setImmediate(resolve));
@ -61,7 +61,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
expect(typeof fn).toBe('function');
});
@ -71,7 +71,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const res = fn({});
expect(typeof res.then).toBe('function');
@ -85,7 +85,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
@ -105,7 +105,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
@ -120,7 +120,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
fn({ foo: 'bar' });
@ -139,7 +139,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
fn({ foo: 'bar' });
@ -161,7 +161,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
@ -180,7 +180,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const abortController = new AbortController();
@ -203,7 +203,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
fn({ a: '1' });
@ -227,7 +227,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
fn({ a: '1' });
@ -248,7 +248,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = fn({ a: '1' });
@ -266,7 +266,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
await flushPromises();
@ -310,7 +310,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = fn({ a: '1' });
@ -348,7 +348,7 @@ describe('createStreamingBatchedFunction()', () => {
fn({ a: '1' });
const dontCompress = await fetchStreaming.mock.calls[0][0].compressionDisabled$.toPromise();
const dontCompress = await fetchStreaming.mock.calls[0][0].getIsCompressionDisabled();
expect(dontCompress).toBe(false);
});
@ -359,7 +359,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = fn({ a: '1' });
@ -401,7 +401,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise = fn({ a: '1' });
@ -430,7 +430,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
@ -483,7 +483,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const abortController = new AbortController();
@ -514,7 +514,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const abortController = new AbortController();
@ -554,7 +554,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
@ -585,7 +585,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
@ -623,7 +623,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
@ -656,7 +656,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
@ -693,7 +693,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
getIsCompressionDisabled: () => true,
});
await flushPromises();

View file

@ -6,7 +6,6 @@
* Side Public License, v 1.
*/
import { Observable, of } from 'rxjs';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
@ -51,7 +50,7 @@ export interface StreamingBatchedFunctionParams<Payload, Result> {
/**
* Disabled zlib compression of response chunks.
*/
compressionDisabled$?: Observable<boolean>;
getIsCompressionDisabled?: () => boolean;
}
/**
@ -69,7 +68,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
fetchStreaming: fetchStreamingInjected = fetchStreaming,
flushOnMaxItems = 25,
maxItemAge = 10,
compressionDisabled$ = of(false),
getIsCompressionDisabled = () => false,
} = params;
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
@ -125,7 +124,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
compressionDisabled$,
getIsCompressionDisabled,
});
const handleStreamError = (error: any) => {

View file

@ -6,13 +6,12 @@
* Side Public License, v 1.
*/
import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/public';
import { from, Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from 'src/core/public';
import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming';
import { DISABLE_BFETCH_COMPRESSION, removeLeadingSlash } from '../common';
import { createStreamingBatchedFunction, StreamingBatchedFunctionParams } from './batching';
import { BatchedFunc } from './batching/types';
import { createStartServicesGetter } from '../../kibana_utils/public';
// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
@ -50,16 +49,12 @@ export class BfetchPublicPlugin
const { version } = this.initializerContext.env.packageInfo;
const basePath = core.http.basePath.get();
const compressionDisabled$ = from(core.getStartServices()).pipe(
switchMap((deps) => {
return of(deps[0]);
}),
switchMap((coreStart) => {
return coreStart.uiSettings.get$<boolean>(DISABLE_BFETCH_COMPRESSION);
})
);
const fetchStreaming = this.fetchStreaming(version, basePath, compressionDisabled$);
const batchedFunction = this.batchedFunction(fetchStreaming, compressionDisabled$);
const startServices = createStartServicesGetter(core.getStartServices);
const getIsCompressionDisabled = () =>
startServices().core.uiSettings.get<boolean>(DISABLE_BFETCH_COMPRESSION);
const fetchStreaming = this.fetchStreaming(version, basePath, getIsCompressionDisabled);
const batchedFunction = this.batchedFunction(fetchStreaming, getIsCompressionDisabled);
this.contract = {
fetchStreaming,
@ -79,7 +74,7 @@ export class BfetchPublicPlugin
(
version: string,
basePath: string,
compressionDisabled$: Observable<boolean>
getIsCompressionDisabled: () => boolean
): BfetchPublicSetup['fetchStreaming'] =>
(params) =>
fetchStreamingStatic({
@ -90,18 +85,18 @@ export class BfetchPublicPlugin
'kbn-version': version,
...(params.headers || {}),
},
compressionDisabled$,
getIsCompressionDisabled,
});
private batchedFunction =
(
fetchStreaming: BfetchPublicContract['fetchStreaming'],
compressionDisabled$: Observable<boolean>
getIsCompressionDisabled: () => boolean
): BfetchPublicContract['batchedFunction'] =>
(params) =>
createStreamingBatchedFunction({
...params,
compressionDisabled$,
getIsCompressionDisabled,
fetchStreaming: params.fetchStreaming || fetchStreaming,
});
}

View file

@ -8,7 +8,6 @@
import { fetchStreaming } from './fetch_streaming';
import { mockXMLHttpRequest } from '../test_helpers/xhr';
import { of } from 'rxjs';
import { promisify } from 'util';
import { deflate } from 'zlib';
const pDeflate = promisify(deflate);
@ -30,7 +29,7 @@ test('returns XHR request', () => {
setup();
const { xhr } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
expect(typeof xhr.readyState).toBe('number');
});
@ -39,7 +38,7 @@ test('returns stream', () => {
setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
expect(typeof stream.subscribe).toBe('function');
});
@ -48,7 +47,7 @@ test('promise resolves when request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
let resolved = false;
@ -81,7 +80,7 @@ test('promise resolves when compressed request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(false),
getIsCompressionDisabled: () => false,
});
let resolved = false;
@ -116,7 +115,7 @@ test('promise resolves when compressed chunked request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(false),
getIsCompressionDisabled: () => false,
});
let resolved = false;
@ -160,7 +159,7 @@ test('streams incoming text as it comes through, according to separators', async
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
@ -201,7 +200,7 @@ test('completes stream observable when request finishes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
@ -226,7 +225,7 @@ test('completes stream observable when aborted', async () => {
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
@ -252,7 +251,7 @@ test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
@ -279,7 +278,7 @@ test('stream observable errors when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
@ -312,7 +311,7 @@ test('sets custom headers', async () => {
'Content-Type': 'text/plain',
Authorization: 'Bearer 123',
},
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Content-Type', 'text/plain');
@ -326,7 +325,7 @@ test('uses credentials', async () => {
fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
expect(env.xhr.withCredentials).toBe(true);
@ -342,7 +341,7 @@ test('opens XHR request and sends specified body', async () => {
url: 'http://elastic.co',
method: 'GET',
body: 'foobar',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
expect(env.xhr.open).toHaveBeenCalledTimes(1);
@ -355,7 +354,7 @@ test('uses POST request method by default', async () => {
const env = setup();
fetchStreaming({
url: 'http://elastic.co',
compressionDisabled$: of(true),
getIsCompressionDisabled: () => true,
});
expect(env.xhr.open).toHaveBeenCalledWith('POST', 'http://elastic.co');
});

View file

@ -6,8 +6,7 @@
* Side Public License, v 1.
*/
import { Observable, of } from 'rxjs';
import { map, share, switchMap } from 'rxjs/operators';
import { map, share } from 'rxjs/operators';
import { inflateResponse } from '.';
import { fromStreamingXhr } from './from_streaming_xhr';
import { split } from './split';
@ -18,7 +17,7 @@ export interface FetchStreamingParams {
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
compressionDisabled$?: Observable<boolean>;
getIsCompressionDisabled?: () => boolean;
}
/**
@ -31,49 +30,39 @@ export function fetchStreaming({
method = 'POST',
body = '',
signal,
compressionDisabled$ = of(false),
getIsCompressionDisabled = () => false,
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();
const msgStream = compressionDisabled$.pipe(
switchMap((compressionDisabled) => {
// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;
// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;
if (!compressionDisabled) {
headers['X-Chunk-Encoding'] = 'deflate';
}
const isCompressionDisabled = getIsCompressionDisabled();
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));
if (!isCompressionDisabled) {
headers['X-Chunk-Encoding'] = 'deflate';
}
const stream = fromStreamingXhr(xhr, signal);
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));
// Send the payload to the server
xhr.send(body);
const stream = fromStreamingXhr(xhr, signal);
// Return a stream of chunked decompressed messages
return stream.pipe(
split('\n'),
map((msg) => {
return compressionDisabled ? msg : inflateResponse(msg);
})
);
// Send the payload to the server
xhr.send(body);
// Return a stream of chunked decompressed messages
const stream$ = stream.pipe(
split('\n'),
map((msg) => {
return isCompressionDisabled ? msg : inflateResponse(msg);
}),
share()
);
// start execution
const msgStreamSub = msgStream.subscribe({
error: (e) => {},
complete: () => {
msgStreamSub.unsubscribe();
},
});
return {
xhr,
stream: msgStream,
stream: stream$,
};
}