[bfetch] compress stream chunks (#97994)

* Move inspector adapter integration into search source

* docs and ts

* Move other bucket to search source

* test ts + delete unused tabilfy function

* hierarchical param in aggconfig.
ts improvements
more inspector tests

* fix jest

* separate inspect
more tests

* jest

* inspector

* Error handling and more tests

* put the fun in functional tests

* delete client side legacy msearch code

* ts

* override to sync search in search source

* delete more legacy code

* ts

* delete moarrrr

* deflate bfetch chunks

* update tests
use only zlib

* ts

* extract getInflatedResponse

* tests

* Use fflate in attempt to reduce package size

* use node streams, fflate and hex encoding.

* DISABLE_SEARCH_COMPRESSION UI Settings
Use base64 and async compression

* i18n

* Code review
Use custom header for compression
Promisify once

* use custom headers

* Update jest

* fix tests

* code review, baby!

* integration

* tests

* limit

* limit

* limit

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Liza Katz 2021-06-01 09:57:46 +03:00 committed by GitHub
parent a622cd5450
commit 842bb69aea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 505 additions and 53 deletions

View file

@ -3,6 +3,7 @@
"console": "src/plugins/console",
"core": "src/core",
"discover": "src/plugins/discover",
"bfetch": "src/plugins/bfetch",
"dashboard": "src/plugins/dashboard",
"data": "src/plugins/data",
"embeddableApi": "src/plugins/embeddable",

View file

@ -229,6 +229,7 @@
"expiry-js": "0.1.7",
"extract-zip": "^2.0.1",
"fast-deep-equal": "^3.1.1",
"fflate": "^0.6.9",
"file-saver": "^1.3.8",
"file-type": "^10.9.0",
"focus-trap-react": "^3.1.1",

View file

@ -3,7 +3,7 @@ pageLoadAssetSize:
alerting: 106936
apm: 64385
apmOss: 18996
bfetch: 41874
bfetch: 51874
canvas: 1066647
charts: 195358
cloud: 21076

View file

@ -44,6 +44,8 @@ export const Theme = require('./theme.ts');
export const Lodash = require('lodash');
export const LodashFp = require('lodash/fp');
export const Fflate = require('fflate/esm/browser');
// runtime deps which don't need to be copied across all bundles
export const TsLib = require('tslib');
export const KbnAnalytics = require('@kbn/analytics');

View file

@ -52,6 +52,7 @@ exports.externals = {
'@elastic/eui/dist/eui_theme_dark.json': '__kbnSharedDeps__.Theme.euiDarkVars',
lodash: '__kbnSharedDeps__.Lodash',
'lodash/fp': '__kbnSharedDeps__.LodashFp',
fflate: '__kbnSharedDeps__.Fflate',
/**
* runtime deps which don't need to be copied across all bundles

View file

@ -19,3 +19,8 @@ export interface BatchResponseItem<Result extends object, Error extends ErrorLik
result?: Result;
error?: Error;
}
export interface BatchItemWrapper {
compressed: boolean;
payload: string;
}

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const DISABLE_BFETCH_COMPRESSION = 'bfetch:disableCompression';

View file

@ -10,3 +10,4 @@ export * from './util';
export * from './streaming';
export * from './buffer';
export * from './batch';
export * from './constants';

View file

@ -9,7 +9,9 @@
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';
import { Subject, of as rxof } from 'rxjs';
const flushPromises = () => new Promise((resolve) => setImmediate(resolve));
const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Promise.race<'resolved' | 'rejected' | 'pending'>([
@ -52,6 +54,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
});
expect(typeof fn).toBe('function');
});
@ -61,6 +64,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
});
const res = fn({});
expect(typeof res.then).toBe('function');
@ -74,6 +78,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
@ -93,6 +98,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
@ -107,6 +113,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
fn({ foo: 'bar' });
@ -125,6 +132,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
fn({ foo: 'bar' });
@ -146,14 +154,18 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ foo: 'bar' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ baz: 'quix' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ full: 'yep' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});
@ -164,6 +176,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const abortController = new AbortController();
@ -186,11 +199,13 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
await flushPromises();
expect(fetchStreaming.mock.calls[0][0]).toMatchObject({
url: '/test',
@ -209,13 +224,16 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(1);
fn({ d: '4' });
await flushPromises();
await new Promise((r) => setTimeout(r, 6));
expect(fetchStreaming).toHaveBeenCalledTimes(2);
});
@ -229,6 +247,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = fn({ a: '1' });
@ -246,8 +265,11 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
await flushPromises();
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
@ -287,6 +309,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = fn({ a: '1' });
@ -314,6 +337,20 @@ describe('createStreamingBatchedFunction()', () => {
expect(await promise3).toEqual({ foo: 'bar 2' });
});
test('compression is false by default', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
flushOnMaxItems: 1,
fetchStreaming,
});
fn({ a: '1' });
const dontCompress = await fetchStreaming.mock.calls[0][0].compressionDisabled$.toPromise();
expect(dontCompress).toBe(false);
});
test('resolves falsy results', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
@ -321,6 +358,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = fn({ a: '1' });
@ -362,6 +400,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise = fn({ a: '1' });
@ -390,6 +429,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = of(fn({ a: '1' }));
@ -442,6 +482,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const abortController = new AbortController();
@ -471,6 +512,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const abortController = new AbortController();
@ -509,6 +551,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = of(fn({ a: '1' }));
@ -539,6 +582,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = of(fn({ a: '1' }));
@ -576,6 +620,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = of(fn({ a: '1' }));
@ -608,6 +653,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
const promise1 = of(fn({ a: '1' }));
@ -644,7 +690,9 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
await flushPromises();
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));

View file

@ -6,16 +6,16 @@
* Side Public License, v 1.
*/
import { Observable, of } from 'rxjs';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
createBatchedFunction,
BatchResponseItem,
ErrorLike,
normalizeError,
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';
import { fetchStreaming } from '../streaming';
import { BatchedFunc, BatchItem } from './types';
export interface BatchedFunctionProtocolError extends ErrorLike {
@ -47,6 +47,11 @@ export interface StreamingBatchedFunctionParams<Payload, Result> {
* before sending the batch request.
*/
maxItemAge?: TimedItemBufferParams<any>['maxItemAge'];
/**
* Disabled zlib compression of response chunks.
*/
compressionDisabled$?: Observable<boolean>;
}
/**
@ -64,6 +69,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
fetchStreaming: fetchStreamingInjected = fetchStreaming,
flushOnMaxItems = 25,
maxItemAge = 10,
compressionDisabled$ = of(false),
} = params;
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
@ -119,6 +125,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
compressionDisabled$,
});
const handleStreamError = (error: any) => {
@ -127,10 +134,10 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
};
stream.pipe(split('\n')).subscribe({
stream.subscribe({
next: (json: string) => {
try {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
const response = JSON.parse(json);
if (response.error) {
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export {
createStreamingBatchedFunction,
StreamingBatchedFunctionParams,
} from './create_streaming_batched_function';

View file

@ -7,12 +7,11 @@
*/
import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/public';
import { from, Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming';
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
import { DISABLE_BFETCH_COMPRESSION, removeLeadingSlash } from '../common';
import { createStreamingBatchedFunction, StreamingBatchedFunctionParams } from './batching';
import { BatchedFunc } from './batching/types';
// eslint-disable-next-line
@ -43,12 +42,23 @@ export class BfetchPublicPlugin
constructor(private readonly initializerContext: PluginInitializerContext) {}
public setup(core: CoreSetup, plugins: BfetchPublicSetupDependencies): BfetchPublicSetup {
public setup(
core: CoreSetup<any, any>,
plugins: BfetchPublicSetupDependencies
): BfetchPublicSetup {
const { version } = this.initializerContext.env.packageInfo;
const basePath = core.http.basePath.get();
const fetchStreaming = this.fetchStreaming(version, basePath);
const batchedFunction = this.batchedFunction(fetchStreaming);
const compressionDisabled$ = from(core.getStartServices()).pipe(
switchMap((deps) => {
return of(deps[0]);
}),
switchMap((coreStart) => {
return coreStart.uiSettings.get$<boolean>(DISABLE_BFETCH_COMPRESSION);
})
);
const fetchStreaming = this.fetchStreaming(version, basePath, compressionDisabled$);
const batchedFunction = this.batchedFunction(fetchStreaming, compressionDisabled$);
this.contract = {
fetchStreaming,
@ -66,7 +76,8 @@ export class BfetchPublicPlugin
private fetchStreaming = (
version: string,
basePath: string
basePath: string,
compressionDisabled$: Observable<boolean>
): BfetchPublicSetup['fetchStreaming'] => (params) =>
fetchStreamingStatic({
...params,
@ -76,13 +87,16 @@ export class BfetchPublicPlugin
'kbn-version': version,
...(params.headers || {}),
},
compressionDisabled$,
});
private batchedFunction = (
fetchStreaming: BfetchPublicContract['fetchStreaming']
fetchStreaming: BfetchPublicContract['fetchStreaming'],
compressionDisabled$: Observable<boolean>
): BfetchPublicContract['batchedFunction'] => (params) =>
createStreamingBatchedFunction({
...params,
compressionDisabled$,
fetchStreaming: params.fetchStreaming || fetchStreaming,
});
}

View file

@ -8,6 +8,15 @@
import { fetchStreaming } from './fetch_streaming';
import { mockXMLHttpRequest } from '../test_helpers/xhr';
import { of } from 'rxjs';
import { promisify } from 'util';
import { deflate } from 'zlib';
const pDeflate = promisify(deflate);
const compressResponse = async (resp: any) => {
const gzipped = await pDeflate(JSON.stringify(resp));
return gzipped.toString('base64');
};
const tick = () => new Promise((resolve) => setTimeout(resolve, 1));
@ -21,6 +30,7 @@ test('returns XHR request', () => {
setup();
const { xhr } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
expect(typeof xhr.readyState).toBe('number');
});
@ -29,6 +39,7 @@ test('returns stream', () => {
setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
expect(typeof stream.subscribe).toBe('function');
});
@ -37,6 +48,7 @@ test('promise resolves when request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
let resolved = false;
@ -65,10 +77,90 @@ test('promise resolves when request completes', async () => {
expect(resolved).toBe(true);
});
test('streams incoming text as it comes through', async () => {
test('promise resolves when compressed request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(false),
});
let resolved = false;
let result;
stream.toPromise().then((r) => {
resolved = true;
result = r;
});
await tick();
expect(resolved).toBe(false);
const msg = { foo: 'bar' };
// Whole message in a response
(env.xhr as any).responseText = `${await compressResponse(msg)}\n`;
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(resolved).toBe(true);
expect(result).toStrictEqual(JSON.stringify(msg));
});
test('promise resolves when compressed chunked request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(false),
});
let resolved = false;
let result;
stream.toPromise().then((r) => {
resolved = true;
result = r;
});
await tick();
expect(resolved).toBe(false);
const msg = { veg: 'tomato' };
const msgToCut = await compressResponse(msg);
const part1 = msgToCut.substr(0, 3);
// Message and a half in a response
(env.xhr as any).responseText = part1;
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
// Half a message in a response
(env.xhr as any).responseText = `${msgToCut}\n`;
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(resolved).toBe(true);
expect(result).toStrictEqual(JSON.stringify(msg));
});
test('streams incoming text as it comes through, according to separators', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
const spy = jest.fn();
@ -81,15 +173,21 @@ test('streams incoming text as it comes through', async () => {
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('foo');
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo\nbar';
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('foo');
(env.xhr as any).responseText = 'foo\nbar\n';
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenCalledWith('\nbar');
expect(spy).toHaveBeenCalledWith('bar');
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
@ -103,6 +201,7 @@ test('completes stream observable when request finishes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
const spy = jest.fn();
@ -127,6 +226,7 @@ test('completes stream observable when aborted', async () => {
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
compressionDisabled$: of(true),
});
const spy = jest.fn();
@ -152,6 +252,7 @@ test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
const spy = jest.fn();
@ -178,6 +279,7 @@ test('stream observable errors when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
const spy = jest.fn();
@ -210,6 +312,7 @@ test('sets custom headers', async () => {
'Content-Type': 'text/plain',
Authorization: 'Bearer 123',
},
compressionDisabled$: of(true),
});
expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Content-Type', 'text/plain');
@ -223,6 +326,7 @@ test('uses credentials', async () => {
fetchStreaming({
url: 'http://example.com',
compressionDisabled$: of(true),
});
expect(env.xhr.withCredentials).toBe(true);
@ -238,6 +342,7 @@ test('opens XHR request and sends specified body', async () => {
url: 'http://elastic.co',
method: 'GET',
body: 'foobar',
compressionDisabled$: of(true),
});
expect(env.xhr.open).toHaveBeenCalledTimes(1);
@ -250,6 +355,7 @@ test('uses POST request method by default', async () => {
const env = setup();
fetchStreaming({
url: 'http://elastic.co',
compressionDisabled$: of(true),
});
expect(env.xhr.open).toHaveBeenCalledWith('POST', 'http://elastic.co');
});

View file

@ -6,7 +6,11 @@
* Side Public License, v 1.
*/
import { Observable, of } from 'rxjs';
import { map, share, switchMap } from 'rxjs/operators';
import { inflateResponse } from '.';
import { fromStreamingXhr } from './from_streaming_xhr';
import { split } from './split';
export interface FetchStreamingParams {
url: string;
@ -14,6 +18,7 @@ export interface FetchStreamingParams {
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
compressionDisabled$?: Observable<boolean>;
}
/**
@ -26,23 +31,49 @@ export function fetchStreaming({
method = 'POST',
body = '',
signal,
compressionDisabled$ = of(false),
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();
// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;
const msgStream = compressionDisabled$.pipe(
switchMap((compressionDisabled) => {
// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));
if (!compressionDisabled) {
headers['X-Chunk-Encoding'] = 'deflate';
}
const stream = fromStreamingXhr(xhr, signal);
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));
// Send the payload to the server
xhr.send(body);
const stream = fromStreamingXhr(xhr, signal);
// Send the payload to the server
xhr.send(body);
// Return a stream of chunked decompressed messages
return stream.pipe(
split('\n'),
map((msg) => {
return compressionDisabled ? msg : inflateResponse(msg);
})
);
}),
share()
);
// start execution
const msgStreamSub = msgStream.subscribe({
error: (e) => {},
complete: () => {
msgStreamSub.unsubscribe();
},
});
return {
xhr,
stream,
stream: msgStream,
};
}

View file

@ -9,3 +9,4 @@
export * from './split';
export * from './from_streaming_xhr';
export * from './fetch_streaming';
export { inflateResponse } from './inflate_response';

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { unzlibSync, strFromU8 } from 'fflate';
export function inflateResponse<Result extends object>(response: string) {
const buff = Buffer.from(response, 'base64');
const unzip = unzlibSync(buff);
return strFromU8(unzip);
}

View file

@ -16,6 +16,7 @@ import type {
RouteMethod,
RequestHandler,
RequestHandlerContext,
StartServicesAccessor,
} from 'src/core/server';
import { schema } from '@kbn/config-schema';
import { Subject } from 'rxjs';
@ -28,7 +29,8 @@ import {
normalizeError,
} from '../common';
import { StreamingRequestHandler } from './types';
import { createNDJSONStream } from './streaming';
import { createStream } from './streaming';
import { getUiSettings } from './ui_settings';
// eslint-disable-next-line
export interface BfetchServerSetupDependencies {}
@ -112,9 +114,19 @@ export class BfetchServerPlugin
public setup(core: CoreSetup, plugins: BfetchServerSetupDependencies): BfetchServerSetup {
const logger = this.initializerContext.logger.get();
const router = core.http.createRouter();
const addStreamingResponseRoute = this.addStreamingResponseRoute({ router, logger });
core.uiSettings.register(getUiSettings());
const addStreamingResponseRoute = this.addStreamingResponseRoute({
getStartServices: core.getStartServices,
router,
logger,
});
const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute);
const createStreamingRequestHandler = this.createStreamingRequestHandler({ logger });
const createStreamingRequestHandler = this.createStreamingRequestHandler({
getStartServices: core.getStartServices,
logger,
});
return {
addBatchProcessingRoute,
@ -129,10 +141,16 @@ export class BfetchServerPlugin
public stop() {}
private getCompressionDisabled(request: KibanaRequest) {
return request.headers['x-chunk-encoding'] !== 'deflate';
}
private addStreamingResponseRoute = ({
getStartServices,
router,
logger,
}: {
getStartServices: StartServicesAccessor;
router: ReturnType<CoreSetup['http']['createRouter']>;
logger: Logger;
}): BfetchServerSetup['addStreamingResponseRoute'] => (path, handler) => {
@ -146,9 +164,10 @@ export class BfetchServerPlugin
async (context, request, response) => {
const handlerInstance = handler(request);
const data = request.body;
const compressionDisabled = this.getCompressionDisabled(request);
return response.ok({
headers: streamingHeaders,
body: createNDJSONStream(handlerInstance.getResponseStream(data), logger),
body: createStream(handlerInstance.getResponseStream(data), logger, compressionDisabled),
});
}
);
@ -156,17 +175,20 @@ export class BfetchServerPlugin
private createStreamingRequestHandler = ({
logger,
getStartServices,
}: {
logger: Logger;
getStartServices: StartServicesAccessor;
}): BfetchServerSetup['createStreamingRequestHandler'] => (streamHandler) => async (
context,
request,
response
) => {
const response$ = await streamHandler(context, request);
const compressionDisabled = this.getCompressionDisabled(request);
return response.ok({
headers: streamingHeaders,
body: createNDJSONStream(response$, logger),
body: createStream(response$, logger, compressionDisabled),
});
};

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { promisify } from 'util';
import { Observable } from 'rxjs';
import { catchError, concatMap, finalize } from 'rxjs/operators';
import { Logger } from 'src/core/server';
import { Stream, PassThrough } from 'stream';
import { constants, deflate } from 'zlib';
const delimiter = '\n';
const pDeflate = promisify(deflate);
async function zipMessageToStream(output: PassThrough, message: string) {
return new Promise(async (resolve, reject) => {
try {
const gzipped = await pDeflate(message, {
flush: constants.Z_SYNC_FLUSH,
});
output.write(gzipped.toString('base64'));
output.write(delimiter);
resolve(undefined);
} catch (err) {
reject(err);
}
});
}
export const createCompressedStream = <Response>(
results: Observable<Response>,
logger: Logger
): Stream => {
const output = new PassThrough();
const sub = results
.pipe(
concatMap((message: Response) => {
const strMessage = JSON.stringify(message);
return zipMessageToStream(output, strMessage);
}),
catchError((e) => {
logger.error('Could not serialize or stream a message.');
logger.error(e);
throw e;
}),
finalize(() => {
output.end();
sub.unsubscribe();
})
)
.subscribe();
return output;
};

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Logger } from 'kibana/server';
import { Stream } from 'stream';
import { Observable } from 'rxjs';
import { createCompressedStream } from './create_compressed_stream';
import { createNDJSONStream } from './create_ndjson_stream';
export function createStream<Payload, Response>(
response$: Observable<Response>,
logger: Logger,
compressionDisabled: boolean
): Stream {
return compressionDisabled
? createNDJSONStream(response$, logger)
: createCompressedStream(response$, logger);
}

View file

@ -7,3 +7,5 @@
*/
export * from './create_ndjson_stream';
export * from './create_compressed_stream';
export * from './create_stream';

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { i18n } from '@kbn/i18n';
import { UiSettingsParams } from 'src/core/server';
import { schema } from '@kbn/config-schema';
import { DISABLE_BFETCH_COMPRESSION } from '../common';
export function getUiSettings(): Record<string, UiSettingsParams<unknown>> {
return {
[DISABLE_BFETCH_COMPRESSION]: {
name: i18n.translate('bfetch.disableBfetchCompression', {
defaultMessage: 'Disable Batch Compression',
}),
value: false,
description: i18n.translate('bfetch.disableBfetchCompressionDesc', {
defaultMessage:
'Disable batch compression. This allows you to debug individual requests, but increases response size.',
}),
schema: schema.boolean(),
category: [],
},
};
}

View file

@ -388,6 +388,10 @@ export const stackManagementSchema: MakeSchemaFrom<UsageStats> = {
type: 'long',
_meta: { description: 'Non-default value of setting.' },
},
'bfetch:disableCompression': {
type: 'boolean',
_meta: { description: 'Non-default value of setting.' },
},
'visualization:visualize:legacyChartsLibrary': {
type: 'boolean',
_meta: { description: 'Non-default value of setting.' },

View file

@ -22,6 +22,7 @@ export interface UsageStats {
/**
* non-sensitive settings
*/
'bfetch:disableCompression': boolean;
'autocomplete:useTimeRange': boolean;
'search:timeout': number;
'visualization:visualize:legacyChartsLibrary': boolean;

View file

@ -8217,6 +8217,12 @@
"description": "Non-default value of setting."
}
},
"bfetch:disableCompression": {
"type": "boolean",
"_meta": {
"description": "Non-default value of setting."
}
},
"visualization:visualize:legacyChartsLibrary": {
"type": "boolean",
"_meta": {

View file

@ -8,15 +8,18 @@
import expect from '@kbn/expect';
import request from 'superagent';
import { inflateResponse } from '../../../../src/plugins/bfetch/public/streaming';
import { FtrProviderContext } from '../../ftr_provider_context';
import { painlessErrReq } from './painless_err_req';
import { verifyErrorResponse } from './verify_error';
function parseBfetchResponse(resp: request.Response): Array<Record<string, any>> {
function parseBfetchResponse(resp: request.Response, compressed: boolean = false) {
return resp.text
.trim()
.split('\n')
.map((item) => JSON.parse(item));
.map((item) => {
return JSON.parse(compressed ? inflateResponse<any>(item) : item);
});
}
export default function ({ getService }: FtrProviderContext) {
@ -26,29 +29,69 @@ export default function ({ getService }: FtrProviderContext) {
describe('bsearch', () => {
describe('post', () => {
it('should return 200 a single response', async () => {
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: {
params: {
body: {
query: {
match_all: {},
const resp = await supertest
.post(`/internal/bsearch`)
.set({ 'X-Chunk-Encoding': '' })
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
},
options: {
strategy: 'es',
},
},
},
],
});
],
});
const jsonBody = JSON.parse(resp.text);
const jsonBody = parseBfetchResponse(resp);
expect(resp.status).to.be(200);
expect(jsonBody.id).to.be(0);
expect(jsonBody.result).to.have.property('isPartial');
expect(jsonBody.result).to.have.property('isRunning');
expect(jsonBody.result).to.have.property('rawResponse');
expect(jsonBody[0].id).to.be(0);
expect(jsonBody[0].result.isPartial).to.be(false);
expect(jsonBody[0].result.isRunning).to.be(false);
expect(jsonBody[0].result).to.have.property('rawResponse');
});
it('should return 200 a single response from compressed', async () => {
const resp = await supertest
.post(`/internal/bsearch`)
.set({ 'X-Chunk-Encoding': 'deflate' })
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
},
options: {
strategy: 'es',
},
},
],
});
const jsonBody = parseBfetchResponse(resp, true);
expect(resp.status).to.be(200);
expect(jsonBody[0].id).to.be(0);
expect(jsonBody[0].result.isPartial).to.be(false);
expect(jsonBody[0].result.isRunning).to.be(false);
expect(jsonBody[0].result).to.have.property('rawResponse');
});
it('should return a batch of successful responses', async () => {
@ -57,6 +100,7 @@ export default function ({ getService }: FtrProviderContext) {
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
@ -68,6 +112,7 @@ export default function ({ getService }: FtrProviderContext) {
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
@ -95,6 +140,7 @@ export default function ({ getService }: FtrProviderContext) {
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
@ -121,6 +167,7 @@ export default function ({ getService }: FtrProviderContext) {
batch: [
{
request: {
index: '.kibana',
indexType: 'baad',
params: {
body: {

View file

@ -13483,6 +13483,11 @@ fetch-mock@^7.3.9:
path-to-regexp "^2.2.1"
whatwg-url "^6.5.0"
fflate@^0.6.9:
version "0.6.9"
resolved "https://registry.yarnpkg.com/fflate/-/fflate-0.6.9.tgz#fb369b30792a03ff7274e174f3b36e51292d3f99"
integrity sha512-hmAdxNHub7fw36hX7BHiuAO0uekp6ufY2sjxBXWxIf0sw5p7tnS9GVrdM4D12SDYQUHVpiC50fPBYPTjOzRU2Q==
figgy-pudding@^3.5.1:
version "3.5.1"
resolved "https://registry.yarnpkg.com/figgy-pudding/-/figgy-pudding-3.5.1.tgz#862470112901c727a0e495a80744bd5baa1d6790"