Remove bfetch plugin (#204285)

## Summary

Part of https://github.com/elastic/kibana/issues/186139.

Relies on https://github.com/elastic/kibana/pull/204284.

Second step of breaking up https://github.com/elastic/kibana/pull/199066
into smaller pieces.

Removes the bfetch and bfetch-error plugins.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Lukas Olson 2024-12-19 14:51:58 -07:00 committed by GitHub
parent d7280a1380
commit 9ad31d0863
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
79 changed files with 90 additions and 3463 deletions

4
.github/CODEOWNERS vendored
View file

@ -289,7 +289,6 @@ packages/kbn-babel-preset @elastic/kibana-operations
packages/kbn-babel-register @elastic/kibana-operations
packages/kbn-babel-transform @elastic/kibana-operations
packages/kbn-bazel-runner @elastic/kibana-operations
packages/kbn-bfetch-error @elastic/appex-sharedux
packages/kbn-calculate-auto @elastic/obs-ux-management-team
packages/kbn-calculate-width-from-char-count @elastic/kibana-visualizations
packages/kbn-capture-oas-snapshot-cli @elastic/kibana-core
@ -619,7 +618,6 @@ src/platform/plugins/shared/esql @elastic/kibana-esql
src/platform/plugins/shared/esql_datagrid @elastic/kibana-esql
src/platform/plugins/shared/management @elastic/kibana-management
src/plugins/advanced_settings @elastic/appex-sharedux @elastic/kibana-management
src/plugins/bfetch @elastic/appex-sharedux
src/plugins/chart_expressions/common @elastic/kibana-visualizations
src/plugins/chart_expressions/expression_gauge @elastic/kibana-visualizations
src/plugins/chart_expressions/expression_heatmap @elastic/kibana-visualizations
@ -2911,7 +2909,6 @@ src/platform/packages/shared/kbn-analytics @elastic/kibana-core
src/platform/packages/shared/kbn-apm-data-view @elastic/obs-ux-infra_services-team
src/platform/packages/shared/kbn-apm-utils @elastic/obs-ux-infra_services-team
src/platform/packages/shared/kbn-avc-banner @elastic/security-defend-workflows
src/platform/packages/shared/kbn-bfetch-error @elastic/appex-sharedux
src/platform/packages/shared/kbn-calculate-width-from-char-count @elastic/kibana-visualizations
src/platform/packages/shared/kbn-cases-components @elastic/response-ops
src/platform/packages/shared/kbn-cbor @elastic/kibana-operations
@ -3091,7 +3088,6 @@ src/platform/plugins/private/vis_types/vega @elastic/kibana-visualizations
src/platform/plugins/private/vis_types/vislib @elastic/kibana-visualizations
src/platform/plugins/private/vis_types/xy @elastic/kibana-visualizations
src/platform/plugins/shared/ai_assistant_management/selection @elastic/obs-ai-assistant
src/platform/plugins/shared/bfetch @elastic/appex-sharedux
src/platform/plugins/shared/chart_expressions/expression_gauge @elastic/kibana-visualizations
src/platform/plugins/shared/chart_expressions/expression_heatmap @elastic/kibana-visualizations
src/platform/plugins/shared/chart_expressions/expression_legacy_metric @elastic/kibana-visualizations

View file

@ -8,8 +8,6 @@
"apmOss": "src/plugins/apm_oss",
"autocomplete": "x-pack/solutions/security/packages/kbn-securitysolution-autocomplete/src",
"avcBanner": "src/platform/packages/shared/kbn-avc-banner/src",
"bfetch": "src/plugins/bfetch",
"bfetchError": "packages/kbn-bfetch-error",
"cases": ["packages/kbn-cases-components"],
"cellActions": "src/platform/packages/shared/kbn-cell-actions",
"charts": "src/plugins/charts",

View file

@ -32,10 +32,6 @@ as uiSettings within the code.
|The aiAssistantManagementSelection plugin manages the Ai Assistant management section.
|{kib-repo}blob/{branch}/src/plugins/bfetch/README.md[bfetch]
|bfetch allows to batch HTTP requests and streams responses back.
|{kib-repo}blob/{branch}/src/plugins/charts/README.md[charts]
|The Charts plugin is a way to create easier integration of shared colors, themes, types and other utilities across all Kibana charts and visualizations.

View file

@ -196,8 +196,6 @@
"@kbn/audit-log-plugin": "link:x-pack/test/security_api_integration/plugins/audit_log",
"@kbn/avc-banner": "link:src/platform/packages/shared/kbn-avc-banner",
"@kbn/banners-plugin": "link:x-pack/plugins/banners",
"@kbn/bfetch-error": "link:packages/kbn-bfetch-error",
"@kbn/bfetch-plugin": "link:src/plugins/bfetch",
"@kbn/calculate-auto": "link:packages/kbn-calculate-auto",
"@kbn/calculate-width-from-char-count": "link:packages/kbn-calculate-width-from-char-count",
"@kbn/canvas-plugin": "link:x-pack/plugins/canvas",

View file

@ -1,35 +0,0 @@
load("@build_bazel_rules_nodejs//:index.bzl", "js_library")
SRCS = glob(
[
"**/*.ts",
"**/*.tsx",
],
exclude = [
"**/test_helpers.ts",
"**/*.config.js",
"**/*.mock.*",
"**/*.test.*",
"**/*.stories.*",
"**/__snapshots__/**",
"**/integration_tests/**",
"**/mocks/**",
"**/scripts/**",
"**/storybook/**",
"**/test_fixtures/**",
"**/test_helpers/**",
],
)
BUNDLER_DEPS = [
"//packages/kbn-i18n",
"@npm//tslib",
]
js_library(
name = "kbn-bfetch-error",
package_name = "@kbn/bfetch-error",
srcs = ["package.json"] + SRCS,
deps = BUNDLER_DEPS,
visibility = ["//visibility:public"],
)

View file

@ -1,3 +0,0 @@
# @kbn/bfetch-error
package isolating befetch error logic

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export { BfetchRequestError } from './src/bfetch_error';

View file

@ -1,14 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../..',
roots: ['<rootDir>/packages/kbn-bfetch-error'],
};

View file

@ -1,9 +0,0 @@
{
"type": "shared-common",
"id": "@kbn/bfetch-error",
"owner": [
"@elastic/appex-sharedux"
],
"group": "platform",
"visibility": "shared"
}

View file

@ -1,6 +0,0 @@
{
"name": "@kbn/bfetch-error",
"private": true,
"version": "1.0.0",
"license": "Elastic License 2.0 OR AGPL-3.0-only OR SSPL-1.0"
}

View file

@ -1,38 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { i18n } from '@kbn/i18n';
/**
* Error thrown when xhr request fails
* @public
*/
export class BfetchRequestError extends Error {
/**
* constructor
* @param code - Xhr error code
*/
constructor(code: number) {
const message =
code === 0
? i18n.translate('bfetchError.networkError', {
defaultMessage: 'Check your network connection and try again.',
})
: i18n.translate('bfetchError.networkErrorWithStatus', {
defaultMessage: 'Check your network connection and try again. Code {code}',
values: { code },
});
super(message);
this.name = 'BfetchRequestError';
this.code = code;
}
code: number;
}

View file

@ -1,21 +0,0 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types",
"types": [
"jest",
"node",
"react"
]
},
"include": [
"**/*.ts",
"**/*.tsx",
],
"exclude": [
"target/**/*"
],
"kbn_references": [
"@kbn/i18n",
]
}

View file

@ -7,7 +7,6 @@ pageLoadAssetSize:
apm: 64385
assetInventory: 18478
banners: 17946
bfetch: 22837
canvas: 29355
cases: 180037
charts: 55000

View file

@ -22,7 +22,6 @@ SRCS = glob(
)
BUNDLER_DEPS = [
"//packages/kbn-bfetch-error",
"//packages/kbn-i18n",
"@npm//@elastic/elasticsearch",
"@npm//@elastic/eui",

View file

@ -9,7 +9,6 @@
import { i18n } from '@kbn/i18n';
import { ReactNode } from 'react';
import { BfetchRequestError } from '@kbn/bfetch-error';
import { EsError } from './es_error';
export function renderSearchError(
@ -25,7 +24,7 @@ export function renderSearchError(
};
}
if (error.constructor.name === 'HttpFetchError' || error instanceof BfetchRequestError) {
if (error.constructor.name === 'HttpFetchError') {
const defaultMsg = i18n.translate('searchErrors.errors.fetchError', {
defaultMessage: 'Check your network connection and try again.',
});

View file

@ -20,7 +20,6 @@
"@kbn/core",
"@kbn/kibana-utils-plugin",
"@kbn/data-views-plugin",
"@kbn/bfetch-error",
"@kbn/search-types",
]
}

View file

@ -1,56 +0,0 @@
# `bfetch` plugin
`bfetch` allows to batch HTTP requests and streams responses back.
# Example
We will create a batch processing endpoint that receives a number then doubles it
and streams it back. We will also consider the number to be time in milliseconds
and before streaming the number back the server will wait for the specified number of
milliseconds.
To do that, first create server-side batch processing route using [`addBatchProcessingRoute`](./docs/server/reference.md#addBatchProcessingRoute).
```ts
plugins.bfetch.addBatchProcessingRoute<{ num: number }, { num: number }>(
'/my-plugin/double',
() => ({
onBatchItem: async ({ num }) => {
// Validate inputs.
if (num < 0) throw new Error('Invalid number');
// Wait number of specified milliseconds.
await new Promise(r => setTimeout(r, num));
// Double the number and send it back.
return { num: 2 * num };
},
})
);
```
Now on client-side create `double` function using [`batchedFunction`](./docs/browser/reference.md#batchedFunction).
The newly created `double` function can be called many times and it
will package individual calls into batches and send them to the server.
```ts
const double = plugins.bfetch.batchedFunction<{ num: number }, { num: number }>({
url: '/my-plugin/double',
});
```
Note: the created `double` must accept a single object argument (`{ num: number }` in this case)
and it will return a promise that resolves into an object, too (also `{ num: number }` in this case).
Use the `double` function.
```ts
double({ num: 1 }).then(console.log, console.error); // { num: 2 }
double({ num: 2 }).then(console.log, console.error); // { num: 4 }
double({ num: 3 }).then(console.log, console.error); // { num: 6 }
```
## Reference
- [Browser](./docs/browser/reference.md)
- [Server](./docs/server/reference.md)

View file

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export interface ErrorLike {
message: string;
}
export interface BatchRequestData<Item> {
batch: Item[];
}
export interface BatchResponseItem<Result extends object, Error extends ErrorLike = ErrorLike> {
id: number;
result?: Result;
error?: Error;
}
export interface BatchItemWrapper {
compressed: boolean;
payload: string;
}

View file

@ -1,65 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { createBatchedFunction } from './create_batched_function';
describe('createBatchedFunction', () => {
test('calls onCall every time fn is called, calls onBatch once flushOnMaxItems reached', async () => {
const onBatch = jest.fn();
const onCall = jest.fn(() => [1, 2] as any);
const [fn] = createBatchedFunction({
onBatch,
onCall,
flushOnMaxItems: 2,
maxItemAge: 10,
});
expect(onCall).toHaveBeenCalledTimes(0);
expect(onBatch).toHaveBeenCalledTimes(0);
fn(123);
expect(onCall).toHaveBeenCalledTimes(1);
expect(onCall).toHaveBeenCalledWith(123);
expect(onBatch).toHaveBeenCalledTimes(0);
fn(456);
expect(onCall).toHaveBeenCalledTimes(2);
expect(onCall).toHaveBeenCalledWith(456);
expect(onBatch).toHaveBeenCalledTimes(1);
expect(onBatch).toHaveBeenCalledWith([2, 2]);
});
test('calls onBatch once timeout is reached', async () => {
const onBatch = jest.fn();
const onCall = jest.fn(() => [4, 3] as any);
const [fn] = createBatchedFunction({
onBatch,
onCall,
flushOnMaxItems: 2,
maxItemAge: 10,
});
expect(onCall).toHaveBeenCalledTimes(0);
expect(onBatch).toHaveBeenCalledTimes(0);
fn(123);
expect(onCall).toHaveBeenCalledTimes(1);
expect(onCall).toHaveBeenCalledWith(123);
expect(onBatch).toHaveBeenCalledTimes(0);
await new Promise((r) => setTimeout(r, 15));
expect(onCall).toHaveBeenCalledTimes(1);
expect(onBatch).toHaveBeenCalledTimes(1);
expect(onBatch).toHaveBeenCalledWith([3]);
});
});

View file

@ -1,39 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import type { ItemBufferParams, TimedItemBufferParams } from '@kbn/item-buffer';
import { TimedItemBuffer } from '@kbn/item-buffer';
type Fn = (...args: any) => any;
export interface BatchedFunctionParams<Func extends Fn, BatchEntry> {
onCall: (...args: Parameters<Func>) => [ReturnType<Func>, BatchEntry];
onBatch: (items: BatchEntry[]) => void;
flushOnMaxItems?: ItemBufferParams<any>['flushOnMaxItems'];
maxItemAge?: TimedItemBufferParams<any>['maxItemAge'];
}
export const createBatchedFunction = <Func extends Fn, BatchEntry>(
params: BatchedFunctionParams<Func, BatchEntry>
): [Func, TimedItemBuffer<BatchEntry>] => {
const { onCall, onBatch, maxItemAge = 10, flushOnMaxItems = 25 } = params;
const buffer = new TimedItemBuffer<BatchEntry>({
onFlush: onBatch,
maxItemAge,
flushOnMaxItems,
});
const fn: Func = ((...args) => {
const [result, batchEntry] = onCall(...args);
buffer.write(batchEntry);
return result;
}) as Func;
return [fn, buffer];
};

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export * from './create_batched_function';

View file

@ -1,12 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export const DISABLE_BFETCH_COMPRESSION = 'bfetch:disableCompression';
export const DISABLE_BFETCH = 'bfetch:disable';
export const BFETCH_ROUTE_VERSION_LATEST = '1';

View file

@ -1,18 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export { normalizeError, removeLeadingSlash, appendQueryParam } from './util';
export type { StreamingResponseHandler } from './streaming';
export { type BatchedFunctionParams, createBatchedFunction } from './buffer';
export type { ErrorLike, BatchRequestData, BatchResponseItem, BatchItemWrapper } from './batch';
export {
DISABLE_BFETCH_COMPRESSION,
DISABLE_BFETCH,
BFETCH_ROUTE_VERSION_LATEST,
} from './constants';

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export * from './types';

View file

@ -1,14 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Observable } from 'rxjs';
export interface StreamingResponseHandler<Payload, Response> {
getResponseStream(payload: Payload): Observable<Response>;
}

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export * from './streaming/types';

View file

@ -1,12 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export * from './normalize_error';
export * from './remove_leading_slash';
export * from './query_params';

View file

@ -1,36 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { BfetchRequestError } from '@kbn/bfetch-error';
import { ErrorLike } from '../batch';
export const normalizeError = <E extends ErrorLike = ErrorLike>(err: any): E => {
if (!err) {
return {
message: 'Unknown error.',
} as E;
}
if (err instanceof BfetchRequestError) {
// ignoring so we can return the error as is
// @ts-expect-error
return err;
}
if (err instanceof Error) {
return { message: err.message } as E;
}
if (typeof err === 'object') {
return {
...err,
message: err.message || 'Unknown error.',
} as E;
}
return {
message: String(err),
} as E;
};

View file

@ -1,13 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export const appendQueryParam = (url: string, key: string, value: string): string => {
const separator = url.includes('?') ? '&' : '?';
return `${url}${separator}${key}=${value}`;
};

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export const removeLeadingSlash = (text: string) => (text[0] === '/' ? text.substr(1) : text);

View file

@ -1,44 +0,0 @@
# `bfetch` browser reference
- [`batchedFunction`](#batchedFunction)
- [`fetchStreaming`](#fetchStreaming)
## `batchedFunction`
Creates a function that will buffer its calls (until timeout&mdash;10ms default&mdash; or capacity reached&mdash;25 default)
and send all calls in one batch to the specified endpoint. The endpoint is expected
to stream results back in ND-JSON format using `Transfer-Encoding: chunked`, which is
implemented by `addBatchProcessingRoute` server-side method of `bfetch` plugin.
The created function is expected to be called with a single object argument and will
return a promise that will resolve to an object.
```ts
const fn = bfetch.batchedFunction({ url: '/my-plugin/something' });
const result = await fn({ foo: 'bar' });
```
Options:
- `url` &mdash; URL endpoint that will receive a batch of requests. This endpoint is expected
to receive batch as a serialized JSON array. It should stream responses back
in ND-JSON format using `Transfer-Encoding: chunked` HTTP/1 streaming.
- `fetchStreaming` &mdash; The instance of `fetchStreaming` function that will perform ND-JSON handling.
There should be a version of this function available in setup contract of `bfetch` plugin.
- `flushOnMaxItems` &mdash; The maximum size of function call buffer before sending the batch request.
- `maxItemAge` &mdash; The maximum timeout in milliseconds of the oldest item in the batch
before sending the batch request.
## `fetchStreaming`
Executes an HTTP request and expects that server streams back results using
HTTP/1 `Transfer-Encoding: chunked`.
```ts
const { stream } = bfetch.fetchStreaming({ url: 'http://elastic.co' });
stream.subscribe(value => {});
```

View file

@ -1,54 +0,0 @@
# `bfetch` server reference
- [`addBatchProcessingRoute`](#addBatchProcessingRoute)
- [`addStreamingResponseRoute`](#addStreamingResponseRoute)
## `addBatchProcessingRoute`
Sets up a server endpoint that expects to work with [`batchedFunction`](../browser/reference.md#batchedFunction).
The endpoint receives a batch of requests, processes each request and streams results
back immediately as they become available. You only need to implement the
processing of each request (`onBatchItem` function), everything else is handled.
`onBatchItem` function is called for each individual request in the batch.
`onBatchItem` function receives a single object argument which is the payload
of one request; and it must return a promise that resolves to an object, too.
`onBatchItem` function is allowed to throw, in that case the error will be forwarded
to the browser only to the individual request, the rest of the batch will still continue
executing.
```ts
plugins.bfetch.addBatchProcessingRoute<object, object>(
'/my-plugin/double',
request => ({
onBatchItem: async (payload) => {
// ...
return {};
},
})
);
```
`request` is the `KibanaRequest` object. `addBatchProcessingRoute` together with `batchedFunction`
ensure that errors are handled and that all items in the batch get executed.
## `addStreamingResponseRoute`
`addStreamingResponseRoute` is a lower-level interface that receives and `payload`
message returns and observable which results are streamed back as ND-JSON messages
until the observable completes. `addStreamingResponseRoute` does not know about the
type of the messages, it does not handle errors, and it does not have a concept of
batch size&mdash;observable can stream any number of messages until it completes.
```ts
plugins.bfetch.addStreamingResponseRoute('/my-plugin/foo', request => ({
getResponseStream: (payload) => {
const subject = new Subject();
setTimeout(() => { subject.next('123'); }, 100);
setTimeout(() => { subject.complete(); }, 200);
return subject;
},
}));
```

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../../..',
roots: ['<rootDir>/src/plugins/bfetch'],
coverageDirectory: '<rootDir>/target/kibana-coverage/jest/src/plugins/bfetch',
coverageReporters: ['text', 'html'],
collectCoverageFrom: ['<rootDir>/src/plugins/bfetch/{common,public,server}/**/*.{ts,tsx}'],
};

View file

@ -1,18 +0,0 @@
{
"type": "plugin",
"id": "@kbn/bfetch-plugin",
"owner": [
"@elastic/appex-sharedux"
],
"group": "platform",
"visibility": "shared",
"description": "Considering using bfetch capabilities when fetching large amounts of data. This services supports batching HTTP requests and streaming responses back.",
"plugin": {
"id": "bfetch",
"browser": true,
"server": true,
"requiredBundles": [
"kibanaUtils"
]
}
}

View file

@ -1,756 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { AbortError, defer, of } from '@kbn/kibana-utils-plugin/public';
import { Subject } from 'rxjs';
const flushPromises = () =>
new Promise((resolve) => jest.requireActual('timers').setImmediate(resolve));
const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Promise.race<'resolved' | 'rejected' | 'pending'>([
new Promise<any>((resolve) =>
promise.then(
() => resolve('resolved'),
() => resolve('rejected')
)
),
new Promise<'pending'>((resolve) => resolve('pending')).then(() => 'pending'),
]);
const isPending = (promise: Promise<unknown>): Promise<boolean> =>
getPromiseState(promise).then((state) => state === 'pending');
const setup = () => {
const xhr = {} as unknown as XMLHttpRequest;
const { promise, resolve, reject } = defer<void>();
const stream = new Subject<any>();
const fetchStreaming = jest.fn(() => ({
xhr,
promise,
stream,
})) as unknown as jest.SpyInstance & typeof fetchStreamingReal;
return {
fetchStreaming,
xhr,
promise,
resolve,
reject,
stream,
};
};
describe('createStreamingBatchedFunction()', () => {
beforeAll(() => {
jest.useFakeTimers({ legacyFakeTimers: true });
});
afterAll(() => {
jest.useRealTimers();
});
test('returns a function', () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
getIsCompressionDisabled: () => true,
});
expect(typeof fn).toBe('function');
});
test('returned function is async', () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
getIsCompressionDisabled: () => true,
});
const res = fn({});
expect(typeof res.then).toBe('function');
});
describe('when timeout is reached', () => {
test('dispatches batch', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ foo: 'bar' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ baz: 'quix' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(6);
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});
test('does nothing is buffer is empty', async () => {
const { fetchStreaming } = setup();
createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(6);
expect(fetchStreaming).toHaveBeenCalledTimes(0);
});
test('sends POST request to correct endpoint', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
fn({ foo: 'bar' });
jest.advanceTimersByTime(6);
expect(fetchStreaming.mock.calls[0][0]).toMatchObject({
url: '/test',
method: 'POST',
});
});
test('collects calls into an array batch ordered by in same order as calls', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
fn({ foo: 'bar' });
fn({ baz: 'quix' });
jest.advanceTimersByTime(6);
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ foo: 'bar' }, { baz: 'quix' }],
});
});
});
describe('when buffer becomes full', () => {
test('dispatches batch request', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ foo: 'bar' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ baz: 'quix' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ full: 'yep' });
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});
test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const abortController = new AbortController();
abortController.abort();
of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });
jest.advanceTimersByTime(6);
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});
test("doesn't send batch request if all items have been aborted", async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const abortController = new AbortController();
abortController.abort();
expect.assertions(3);
const req1 = fn({ foo: 'bar' }, abortController.signal).catch((e) =>
expect(e).toBeInstanceOf(AbortError)
);
const req2 = fn({ baz: 'quix' }, abortController.signal).catch((e) =>
expect(e).toBeInstanceOf(AbortError)
);
jest.advanceTimersByTime(6);
expect(fetchStreaming).not.toBeCalled();
await Promise.all([req1, req2]);
});
test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
expect(fetchStreaming.mock.calls[0][0]).toMatchObject({
url: '/test',
method: 'POST',
});
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ a: '1' }, { b: '2' }, { c: '3' }],
});
});
test('dispatches batch on full buffer and also on timeout', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
expect(fetchStreaming).toHaveBeenCalledTimes(1);
fn({ d: '4' });
jest.advanceTimersByTime(6);
expect(fetchStreaming).toHaveBeenCalledTimes(2);
});
});
describe('when receiving results', () => {
test('does not resolve call promises until request finishes', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
jest.advanceTimersByTime(6);
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(true);
});
test('resolves only promise of result that was streamed back', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
await flushPromises();
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
jest.advanceTimersByTime(6);
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(true);
expect(await isPending(promise3)).toBe(true);
stream.next(
JSON.stringify({
id: 1,
result: { foo: 'bar' },
}) + '\n'
);
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(false);
expect(await isPending(promise3)).toBe(true);
stream.next(
JSON.stringify({
id: 0,
result: { foo: 'bar 2' },
}) + '\n'
);
expect(await isPending(promise1)).toBe(false);
expect(await isPending(promise2)).toBe(false);
expect(await isPending(promise3)).toBe(true);
});
test('resolves each promise with correct data', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
jest.advanceTimersByTime(6);
stream.next(
JSON.stringify({
id: 1,
result: { foo: 'bar' },
}) + '\n'
);
stream.next(
JSON.stringify({
id: 2,
result: { foo: 'bar 2' },
}) + '\n'
);
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(false);
expect(await isPending(promise3)).toBe(false);
expect(await promise2).toEqual({ foo: 'bar' });
expect(await promise3).toEqual({ foo: 'bar 2' });
});
test('compression is false by default', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
flushOnMaxItems: 1,
fetchStreaming,
});
fn({ a: '1' });
const dontCompress = await fetchStreaming.mock.calls[0][0].getIsCompressionDisabled();
expect(dontCompress).toBe(false);
});
test('resolves falsy results', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
jest.advanceTimersByTime(6);
stream.next(
JSON.stringify({
id: 0,
result: false,
}) + '\n'
);
stream.next(
JSON.stringify({
id: 1,
result: 0,
}) + '\n'
);
stream.next(
JSON.stringify({
id: 2,
result: '',
}) + '\n'
);
expect(await isPending(promise1)).toBe(false);
expect(await isPending(promise2)).toBe(false);
expect(await isPending(promise3)).toBe(false);
expect(await promise1).toEqual(false);
expect(await promise2).toEqual(0);
expect(await promise3).toEqual('');
});
test('rejects promise on error response', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise = fn({ a: '1' });
jest.advanceTimersByTime(6);
expect(await isPending(promise)).toBe(true);
stream.next(
JSON.stringify({
id: 0,
error: { message: 'oops' },
}) + '\n'
);
expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toEqual({
message: 'oops',
});
});
test('resolves successful requests even after rejected ones', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
const promise3 = of(fn({ a: '3' }));
jest.advanceTimersByTime(6);
stream.next(
JSON.stringify({
id: 2,
result: { b: '3' },
}) + '\n'
);
jest.advanceTimersByTime(1);
stream.next(
JSON.stringify({
id: 1,
error: { b: '2' },
}) + '\n'
);
jest.advanceTimersByTime(1);
stream.next(
JSON.stringify({
id: 0,
result: { b: '1' },
}) + '\n'
);
jest.advanceTimersByTime(1);
const [result1] = await promise1;
const [, error2] = await promise2;
const [result3] = await promise3;
expect(result1).toEqual({ b: '1' });
expect(error2).toEqual({ b: '2' });
expect(result3).toEqual({ b: '3' });
});
describe('when requests are aborted', () => {
test('aborts stream when all are aborted', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' }, abortController.signal);
jest.advanceTimersByTime(6);
expect(await isPending(promise)).toBe(true);
expect(await isPending(promise2)).toBe(true);
abortController.abort();
jest.advanceTimersByTime(6);
await flushPromises();
expect(await isPending(promise)).toBe(false);
expect(await isPending(promise2)).toBe(false);
const [, error] = await of(promise);
const [, error2] = await of(promise2);
expect(error).toBeInstanceOf(AbortError);
expect(error2).toBeInstanceOf(AbortError);
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
});
test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
jest.advanceTimersByTime(6);
expect(await isPending(promise)).toBe(true);
abortController.abort();
jest.advanceTimersByTime(6);
await flushPromises();
expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);
stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);
jest.advanceTimersByTime(1);
const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});
describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
jest.advanceTimersByTime(6);
stream.complete();
jest.advanceTimersByTime(1);
const [, error1] = await promise1;
const [, error2] = await promise2;
expect(error1).toMatchObject({
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
});
expect(error2).toMatchObject({
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
});
});
test('rejects with CONNECTION error only pending promises', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
jest.advanceTimersByTime(6);
stream.next(
JSON.stringify({
id: 1,
result: { b: '1' },
}) + '\n'
);
stream.complete();
jest.advanceTimersByTime(1);
const [, error1] = await promise1;
const [result1] = await promise2;
expect(error1).toMatchObject({
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
});
expect(result1).toMatchObject({
b: '1',
});
});
});
describe('when stream errors', () => {
test('rejects pending promises with STREAM error code', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
jest.advanceTimersByTime(6);
stream.error({
message: 'something went wrong',
});
jest.advanceTimersByTime(1);
const [, error1] = await promise1;
const [, error2] = await promise2;
expect(error1).toMatchObject({
message: 'something went wrong',
code: 'STREAM',
});
expect(error2).toMatchObject({
message: 'something went wrong',
code: 'STREAM',
});
});
test('rejects with STREAM error only pending promises', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
jest.advanceTimersByTime(6);
stream.next(
JSON.stringify({
id: 1,
result: { b: '1' },
}) + '\n'
);
stream.error('oops');
jest.advanceTimersByTime(1);
const [, error1] = await promise1;
const [result1] = await promise2;
expect(error1).toMatchObject({
message: 'oops',
code: 'STREAM',
});
expect(result1).toMatchObject({
b: '1',
});
});
});
test('rejects with STREAM error on JSON parse error only pending promises', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
getIsCompressionDisabled: () => true,
});
await flushPromises();
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
jest.advanceTimersByTime(6);
stream.next(
JSON.stringify({
id: 1,
result: { b: '1' },
}) + '\n'
);
stream.next('Not a JSON\n');
jest.advanceTimersByTime(1);
const [, error1] = await promise1;
const [result1] = await promise2;
expect(error1).toMatchObject({
message: `Unexpected token 'N', "Not a JSON\n" is not valid JSON`,
code: 'STREAM',
});
expect(result1).toMatchObject({
b: '1',
});
});
});
});

View file

@ -1,170 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { AbortError, abortSignalToPromise, defer } from '@kbn/kibana-utils-plugin/public';
import type { ItemBufferParams, TimedItemBufferParams } from '@kbn/item-buffer';
import { createBatchedFunction, ErrorLike, normalizeError } from '../../common';
import { fetchStreaming } from '../streaming';
import { BatchedFunc, BatchItem } from './types';
export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
}
export interface StreamingBatchedFunctionParams<Payload, Result> {
/**
* URL endpoint that will receive a batch of requests. This endpoint is expected
* to receive batch as a serialized JSON array. It should stream responses back
* in ND-JSON format using `Transfer-Encoding: chunked` HTTP/1 streaming.
*/
url: string;
/**
* The instance of `fetchStreaming` function that will perform ND-JSON handling.
* There should be a version of this function available in setup contract of `bfetch`
* plugin.
*/
fetchStreaming?: typeof fetchStreaming;
/**
* The maximum size of function call buffer before sending the batch request.
*/
flushOnMaxItems?: ItemBufferParams<any>['flushOnMaxItems'];
/**
* The maximum timeout in milliseconds of the oldest item in the batch
* before sending the batch request.
*/
maxItemAge?: TimedItemBufferParams<any>['maxItemAge'];
/**
* Disabled zlib compression of response chunks.
*/
getIsCompressionDisabled?: () => boolean;
}
/**
* Returns a function that does not execute immediately but buffers the call internally until
* `params.flushOnMaxItems` is reached or after `params.maxItemAge` timeout in milliseconds is reached. Once
* one of those thresholds is reached all buffered calls are sent in one batch to the
* server using `params.fetchStreaming` in a POST request. Responses are streamed back
* and each batch item is resolved once corresponding response is received.
*/
export const createStreamingBatchedFunction = <Payload, Result extends object>(
params: StreamingBatchedFunctionParams<Payload, Result>
): BatchedFunc<Payload, Result> => {
const {
url,
fetchStreaming: fetchStreamingInjected = fetchStreaming,
flushOnMaxItems = 25,
maxItemAge = 10,
getIsCompressionDisabled = () => false,
} = params;
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});
if (items.length === 0) {
return; // all items have been aborted before a request has been sent
}
const donePromises: Array<Promise<any>> = items.map((item) => {
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};
const onDone = () => {
resolve();
cleanup();
};
if (abortPromise)
abortPromise.catch(() => {
item.future.reject(new AbortError());
onDone();
});
item.future.promise.then(onDone, onDone);
});
});
// abort when all items were either resolved, rejected or aborted
const abortController = new AbortController();
let isBatchDone = false;
Promise.all(donePromises).then(() => {
isBatchDone = true;
abortController.abort();
});
const batch = items.map((item) => item.payload);
const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
getIsCompressionDisabled,
});
const handleStreamError = (error: any) => {
const normalizedError = normalizeError<BatchedFunctionProtocolError>(error);
normalizedError.code = 'STREAM';
for (const { future } of items) future.reject(normalizedError);
};
stream.subscribe({
next: (json: string) => {
try {
const response = JSON.parse(json);
if (response.error) {
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
items[response.id].future.resolve(response.result);
}
} catch (e) {
handleStreamError(e);
}
},
error: handleStreamError,
complete: () => {
if (!isBatchDone) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
};
for (const { future } of items) future.reject(error);
}
},
});
await stream.toPromise();
} catch (error) {
for (const item of items) item.future.reject(error);
}
},
flushOnMaxItems,
maxItemAge,
});
return fn;
};

View file

@ -1,11 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export type { StreamingBatchedFunctionParams } from './create_streaming_batched_function';
export { createStreamingBatchedFunction } from './create_streaming_batched_function';

View file

@ -1,21 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Defer } from '@kbn/kibana-utils-plugin/public';
export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
signal?: AbortSignal;
}
export type BatchedFunc<Payload, Result> = (
payload: Payload,
signal?: AbortSignal
) => Promise<Result>;

View file

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { PluginInitializerContext } from '@kbn/core/public';
import { BfetchPublicPlugin } from './plugin';
export type { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';
export type { BatchedFunc } from './batching/types';
export { DISABLE_BFETCH } from '../common/constants';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}

View file

@ -1,55 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { coreMock } from '@kbn/core/public/mocks';
import { BfetchPublicSetup, BfetchPublicStart } from '.';
import { plugin as pluginInitializer } from '.';
export type Setup = jest.Mocked<BfetchPublicSetup>;
export type Start = jest.Mocked<BfetchPublicStart>;
const createSetupContract = (): Setup => {
const setupContract: Setup = {
fetchStreaming: jest.fn(),
batchedFunction: jest.fn(),
};
return setupContract;
};
const createStartContract = (): Start => {
const startContract: Start = {
fetchStreaming: jest.fn(),
batchedFunction: jest.fn(),
};
return startContract;
};
const createPlugin = async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext();
const coreSetup = coreMock.createSetup();
const coreStart = coreMock.createStart();
const plugin = pluginInitializer(pluginInitializerContext);
const setup = await plugin.setup(coreSetup, {});
return {
pluginInitializerContext,
coreSetup,
coreStart,
plugin,
setup,
doStart: async () => await plugin.start(coreStart, {}),
};
};
export const bfetchPluginMock = {
createSetupContract,
createStartContract,
createPlugin,
};

View file

@ -1,116 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '@kbn/core/public';
import { createStartServicesGetter } from '@kbn/kibana-utils-plugin/public';
import {
ELASTIC_HTTP_VERSION_HEADER,
X_ELASTIC_INTERNAL_ORIGIN_REQUEST,
} from '@kbn/core-http-common';
import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming';
import { DISABLE_BFETCH_COMPRESSION, removeLeadingSlash } from '../common';
import { createStreamingBatchedFunction, StreamingBatchedFunctionParams } from './batching';
import { BatchedFunc } from './batching/types';
import { BFETCH_ROUTE_VERSION_LATEST } from '../common/constants';
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface BfetchPublicSetupDependencies {}
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface BfetchPublicStartDependencies {}
export interface BfetchPublicContract {
fetchStreaming: (params: FetchStreamingParams) => ReturnType<typeof fetchStreamingStatic>;
batchedFunction: <Payload, Result extends object>(
params: StreamingBatchedFunctionParams<Payload, Result>
) => BatchedFunc<Payload, Result>;
}
export type BfetchPublicSetup = BfetchPublicContract;
export type BfetchPublicStart = BfetchPublicContract;
export class BfetchPublicPlugin
implements
Plugin<
BfetchPublicSetup,
BfetchPublicStart,
BfetchPublicSetupDependencies,
BfetchPublicStartDependencies
>
{
private contract!: BfetchPublicContract;
constructor(private readonly initializerContext: PluginInitializerContext) {}
public setup(
core: CoreSetup<any, any>,
plugins: BfetchPublicSetupDependencies
): BfetchPublicSetup {
const { version: kibanaVersion } = this.initializerContext.env.packageInfo;
const basePath = core.http.basePath.get();
const startServices = createStartServicesGetter(core.getStartServices);
const getIsCompressionDisabled = () =>
startServices().core.uiSettings.get<boolean>(DISABLE_BFETCH_COMPRESSION);
const fetchStreaming = this.fetchStreaming(
BFETCH_ROUTE_VERSION_LATEST,
kibanaVersion,
basePath,
getIsCompressionDisabled
);
const batchedFunction = this.batchedFunction(fetchStreaming, getIsCompressionDisabled);
this.contract = {
fetchStreaming,
batchedFunction,
};
return this.contract;
}
public start(core: CoreStart, plugins: BfetchPublicStartDependencies): BfetchPublicStart {
return this.contract;
}
public stop() {}
private fetchStreaming =
(
version: string,
kibanaVersion: string,
basePath: string,
getIsCompressionDisabled: () => boolean
): BfetchPublicSetup['fetchStreaming'] =>
(params) =>
fetchStreamingStatic({
...params,
url: `${basePath}/${removeLeadingSlash(params.url)}`,
headers: {
'Content-Type': 'application/json',
'kbn-version': kibanaVersion,
[X_ELASTIC_INTERNAL_ORIGIN_REQUEST]: 'Kibana',
[ELASTIC_HTTP_VERSION_HEADER]: version,
...(params.headers || {}),
},
getIsCompressionDisabled,
});
private batchedFunction =
(
fetchStreaming: BfetchPublicContract['fetchStreaming'],
getIsCompressionDisabled: () => boolean
): BfetchPublicContract['batchedFunction'] =>
(params) =>
createStreamingBatchedFunction({
...params,
getIsCompressionDisabled,
fetchStreaming: params.fetchStreaming || fetchStreaming,
});
}

View file

@ -1,362 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { fetchStreaming } from './fetch_streaming';
import { mockXMLHttpRequest } from '../test_helpers/xhr';
import { promisify } from 'util';
import { deflate } from 'zlib';
const pDeflate = promisify(deflate);
const compressResponse = async (resp: any) => {
const gzipped = await pDeflate(JSON.stringify(resp));
return gzipped.toString('base64');
};
const tick = () => new Promise((resolve) => setTimeout(resolve, 1));
const setup = () => {
const { xhr, XMLHttpRequest } = mockXMLHttpRequest();
window.XMLHttpRequest = XMLHttpRequest;
(xhr as any).status = 200;
return { xhr };
};
test('returns XHR request', () => {
setup();
const { xhr } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
expect(typeof xhr.readyState).toBe('number');
});
test('returns stream', () => {
setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
expect(typeof stream.subscribe).toBe('function');
});
test('promise resolves when request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
let resolved = false;
stream.toPromise().then(() => (resolved = true));
await tick();
expect(resolved).toBe(false);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).responseText = 'foo\nbar';
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(resolved).toBe(true);
});
test('promise resolves when compressed request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => false,
});
let resolved = false;
let result;
stream.toPromise().then((r) => {
resolved = true;
result = r;
});
await tick();
expect(resolved).toBe(false);
const msg = { foo: 'bar' };
// Whole message in a response
(env.xhr as any).responseText = `${await compressResponse(msg)}\n`;
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(resolved).toBe(true);
expect(result).toStrictEqual(JSON.stringify(msg));
});
test('promise resolves when compressed chunked request completes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => false,
});
let resolved = false;
let result;
stream.toPromise().then((r) => {
resolved = true;
result = r;
});
await tick();
expect(resolved).toBe(false);
const msg = { veg: 'tomato' };
const msgToCut = await compressResponse(msg);
const part1 = msgToCut.substr(0, 3);
// Message and a half in a response
(env.xhr as any).responseText = part1;
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
// Half a message in a response
(env.xhr as any).responseText = `${msgToCut}\n`;
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(resolved).toBe(true);
expect(result).toStrictEqual(JSON.stringify(msg));
});
test('streams incoming text as it comes through, according to separators', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
stream.subscribe(spy);
await tick();
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo\nbar';
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('foo');
(env.xhr as any).responseText = 'foo\nbar\n';
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenCalledWith('bar');
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(2);
});
test('completes stream observable when request finishes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
stream.subscribe({
complete: spy,
});
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
expect(spy).toHaveBeenCalledTimes(1);
});
test('completes stream observable when aborted', async () => {
const env = setup();
const abort = new AbortController();
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
stream.subscribe({
complete: spy,
});
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
abort.abort();
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
expect(spy).toHaveBeenCalledTimes(1);
});
test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
stream.toPromise().catch(spy);
await tick();
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 400;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy.mock.calls[0][0]).toBeInstanceOf(Error);
expect(spy.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Check your network connection and try again. Code 400"`
);
});
test('stream observable errors when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
const spy = jest.fn();
stream.subscribe({
error: spy,
});
await tick();
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 400;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy.mock.calls[0][0]).toBeInstanceOf(Error);
expect(spy.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Check your network connection and try again. Code 400"`
);
});
test('sets custom headers', async () => {
const env = setup();
fetchStreaming({
url: 'http://example.com',
headers: {
'Content-Type': 'text/plain',
Authorization: 'Bearer 123',
},
getIsCompressionDisabled: () => true,
});
expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Content-Type', 'text/plain');
expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Authorization', 'Bearer 123');
});
test('uses credentials', async () => {
const env = setup();
expect(env.xhr.withCredentials).toBe(false);
fetchStreaming({
url: 'http://example.com',
getIsCompressionDisabled: () => true,
});
expect(env.xhr.withCredentials).toBe(true);
});
test('opens XHR request and sends specified body', async () => {
const env = setup();
expect(env.xhr.open).toHaveBeenCalledTimes(0);
expect(env.xhr.send).toHaveBeenCalledTimes(0);
fetchStreaming({
url: 'http://elastic.co',
method: 'GET',
body: 'foobar',
getIsCompressionDisabled: () => true,
});
expect(env.xhr.open).toHaveBeenCalledTimes(1);
expect(env.xhr.send).toHaveBeenCalledTimes(1);
expect(env.xhr.open).toHaveBeenCalledWith('GET', 'http://elastic.co');
expect(env.xhr.send).toHaveBeenCalledWith('foobar');
});
test('uses POST request method by default', async () => {
const env = setup();
fetchStreaming({
url: 'http://elastic.co',
getIsCompressionDisabled: () => true,
});
expect(env.xhr.open).toHaveBeenCalledWith('POST', 'http://elastic.co');
});

View file

@ -1,68 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { map, share } from 'rxjs';
import { inflateResponse } from '.';
import { fromStreamingXhr } from './from_streaming_xhr';
import { split } from './split';
import { appendQueryParam } from '../../common';
export interface FetchStreamingParams {
url: string;
headers?: Record<string, string>;
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
getIsCompressionDisabled?: () => boolean;
}
/**
* Sends an AJAX request to the server, and processes the result as a
* streaming HTTP/1 response. Streams data as text through observable.
*/
export function fetchStreaming({
url,
headers = {},
method = 'POST',
body = '',
signal,
getIsCompressionDisabled = () => false,
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();
const isCompressionDisabled = getIsCompressionDisabled();
if (!isCompressionDisabled) {
url = appendQueryParam(url, 'compress', 'true');
}
// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));
const stream = fromStreamingXhr(xhr, signal);
// Send the payload to the server
xhr.send(body);
// Return a stream of chunked decompressed messages
const stream$ = stream.pipe(
split('\n'),
map((msg) => {
return isCompressionDisabled ? msg : inflateResponse(msg);
}),
share()
);
return {
xhr,
stream: stream$,
};
}

View file

@ -1,271 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { fromStreamingXhr } from './from_streaming_xhr';
const createXhr = (): XMLHttpRequest =>
({
abort: () => {},
onprogress: () => {},
onreadystatechange: () => {},
readyState: 0,
responseText: '',
status: 200,
} as unknown as XMLHttpRequest);
test('returns observable', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
expect(typeof observable.subscribe).toBe('function');
});
test('emits an event to observable', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = 'foo';
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('foo');
});
test('streams multiple events to observable', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '12';
xhr.onprogress!({} as any);
(xhr as any).responseText = '123';
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('1');
expect(spy.mock.calls[1][0]).toBe('2');
expect(spy.mock.calls[2][0]).toBe('3');
});
test('completes observable when request reaches end state', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const next = jest.fn();
const complete = jest.fn();
observable.subscribe({
next,
complete,
});
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '2';
xhr.onprogress!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
(xhr as any).readyState = 4;
(xhr as any).status = 200;
xhr.onreadystatechange!({} as any);
expect(complete).toHaveBeenCalledTimes(1);
});
test('completes observable when aborted', () => {
const xhr = createXhr();
const abortController = new AbortController();
const observable = fromStreamingXhr(xhr, abortController.signal);
const next = jest.fn();
const complete = jest.fn();
observable.subscribe({
next,
complete,
});
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '2';
xhr.onprogress!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
(xhr as any).readyState = 2;
abortController.abort();
expect(complete).toHaveBeenCalledTimes(1);
// Shouldn't trigger additional events
(xhr as any).readyState = 4;
(xhr as any).status = 200;
xhr.onreadystatechange!({} as any);
expect(complete).toHaveBeenCalledTimes(1);
});
test('errors observable if request returns with error', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const next = jest.fn();
const complete = jest.fn();
const error = jest.fn();
observable.subscribe({
next,
complete,
error,
});
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '2';
xhr.onprogress!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
(xhr as any).readyState = 4;
(xhr as any).status = 400;
xhr.onreadystatechange!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
expect(error).toHaveBeenCalledTimes(1);
expect(error.mock.calls[0][0]).toBeInstanceOf(Error);
expect(error.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Check your network connection and try again. Code 400"`
);
});
test('does not emit when gets error response', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const next = jest.fn();
const complete = jest.fn();
const error = jest.fn();
observable.subscribe({
next,
complete,
error,
});
(xhr as any).responseText = 'error';
(xhr as any).status = 400;
xhr.onprogress!({} as any);
expect(next).toHaveBeenCalledTimes(0);
(xhr as any).readyState = 4;
xhr.onreadystatechange!({} as any);
expect(next).toHaveBeenCalledTimes(0);
expect(error).toHaveBeenCalledTimes(1);
expect(error.mock.calls[0][0]).toBeInstanceOf(Error);
expect(error.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Check your network connection and try again. Code 400"`
);
});
test('when .onprogress called multiple times with same text, does not create new observable events', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '12';
xhr.onprogress!({} as any);
(xhr as any).responseText = '12';
xhr.onprogress!({} as any);
(xhr as any).responseText = '123';
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('1');
expect(spy.mock.calls[1][0]).toBe('2');
expect(spy.mock.calls[2][0]).toBe('3');
});
test('generates new observable events on .onreadystatechange', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '{"foo":"bar"}';
xhr.onreadystatechange!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n';
xhr.onreadystatechange!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n123';
xhr.onreadystatechange!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('{"foo":"bar"}');
expect(spy.mock.calls[1][0]).toBe('\n');
expect(spy.mock.calls[2][0]).toBe('123');
});
test('.onreadystatechange and .onprogress can be called in any order', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '{"foo":"bar"}';
xhr.onreadystatechange!({} as any);
xhr.onprogress!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n';
xhr.onprogress!({} as any);
xhr.onreadystatechange!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n123';
xhr.onreadystatechange!({} as any);
xhr.onprogress!({} as any);
xhr.onreadystatechange!({} as any);
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('{"foo":"bar"}');
expect(spy.mock.calls[1][0]).toBe('\n');
expect(spy.mock.calls[2][0]).toBe('123');
});

View file

@ -1,74 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Observable, Subject } from 'rxjs';
import { BfetchRequestError } from '@kbn/bfetch-error';
/**
* Creates observable from streaming XMLHttpRequest, where each event
* corresponds to a streamed chunk.
*/
export const fromStreamingXhr = (
xhr: Pick<
XMLHttpRequest,
'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText' | 'abort'
>,
signal?: AbortSignal
): Observable<string> => {
const subject = new Subject<string>();
let index = 0;
let aborted = false;
// 0 indicates a network failure. 400+ messages are considered server errors
const isErrorStatus = () => xhr.status === 0 || xhr.status >= 400;
const processBatch = () => {
if (aborted) return;
if (isErrorStatus()) return;
const { responseText } = xhr;
if (index >= responseText.length) return;
subject.next(responseText.substr(index));
index = responseText.length;
};
xhr.onprogress = processBatch;
const onBatchAbort = () => {
if (xhr.readyState !== 4) {
aborted = true;
xhr.abort();
subject.complete();
if (signal) signal.removeEventListener('abort', onBatchAbort);
}
};
if (signal) signal.addEventListener('abort', onBatchAbort);
xhr.onreadystatechange = () => {
if (aborted) return;
// Older browsers don't support onprogress, so we need
// to call this here, too. It's safe to call this multiple
// times even for the same progress event.
processBatch();
// 4 is the magic number that means the request is done
if (xhr.readyState === 4) {
if (signal) signal.removeEventListener('abort', onBatchAbort);
if (isErrorStatus()) {
subject.error(new BfetchRequestError(xhr.status));
} else {
subject.complete();
}
}
};
return subject;
};

View file

@ -1,13 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export * from './split';
export * from './from_streaming_xhr';
export * from './fetch_streaming';
export { inflateResponse } from './inflate_response';

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { unzlibSync, strFromU8 } from 'fflate';
import { toByteArray } from 'base64-js';
export function inflateResponse<Result extends object>(response: string) {
const buff = toByteArray(response);
const unzip = unzlibSync(buff);
return strFromU8(unzip);
}

View file

@ -1,61 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { split } from './split';
import { Subject } from 'rxjs';
test('splits a single IP address', () => {
const ip = '127.0.0.1';
const list: string[] = [];
const subject = new Subject<string>();
const splitted = split('.')(subject);
splitted.subscribe((value) => list.push(value));
subject.next(ip);
subject.complete();
expect(list).toEqual(['127', '0', '0', '1']);
});
const streams = [
'adsf.asdf.asdf',
'single.dot',
'empty..split',
'trailingdot.',
'.leadingdot',
'.',
'....',
'no_delimiter',
'1.2.3.4.5',
'1.2.3.4.5.',
'.1.2.3.4.5.',
'.1.2.3.4.5',
];
for (const stream of streams) {
test(`splits stream by delimiter correctly "${stream}"`, () => {
const correctResult = stream.split('.').filter(Boolean);
for (let j = 0; j < 100; j++) {
const list: string[] = [];
const subject = new Subject<string>();
const splitted = split('.')(subject);
splitted.subscribe((value) => list.push(value));
let i = 0;
while (i < stream.length) {
const len = Math.round(Math.random() * 10);
const chunk = stream.substr(i, len);
subject.next(chunk);
i += len;
}
subject.complete();
expect(list).toEqual(correctResult);
}
});
}

View file

@ -1,49 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Observable, Subject } from 'rxjs';
import { filter } from 'rxjs';
/**
* Receives observable that emits strings, and returns a new observable
* that also returns strings separated by delimiter.
*
* Input stream:
*
* asdf.f -> df..aaa. -> dfsdf
*
* Output stream, assuming "." is used as delimiter:
*
* asdf -> fdf -> aaa -> dfsdf
*
*/
export const split =
(delimiter: string = '\n') =>
(in$: Observable<string>): Observable<string> => {
const out$ = new Subject<string>();
let startingText = '';
in$.subscribe(
(chunk) => {
const messages = (startingText + chunk).split(delimiter);
// We don't want to send the last message here, since it may or
// may not be a partial message.
messages.slice(0, -1).forEach(out$.next.bind(out$));
startingText = messages.length ? messages[messages.length - 1] : '';
},
out$.error.bind(out$),
() => {
out$.next(startingText);
out$.complete();
}
);
return out$.pipe(filter<string>(Boolean));
};

View file

@ -1,69 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
/* eslint-disable max-classes-per-file */
export const mockXMLHttpRequest = (): {
xhr: XMLHttpRequest;
XMLHttpRequest: typeof window.XMLHttpRequest;
} => {
class MockXMLHttpRequest implements XMLHttpRequest {
// @ts-expect-error upgrade typescript v5.1.6
DONE = 0;
// @ts-expect-error upgrade typescript v5.1.6
HEADERS_RECEIVED = 0;
// @ts-expect-error upgrade typescript v5.1.6
LOADING = 0;
// @ts-expect-error upgrade typescript v5.1.6
OPENED = 0;
// @ts-expect-error upgrade typescript v5.1.6
UNSENT = 0;
abort = jest.fn();
addEventListener = jest.fn();
dispatchEvent = jest.fn();
getAllResponseHeaders = jest.fn();
getResponseHeader = jest.fn();
onabort = jest.fn();
onerror = jest.fn();
onload = jest.fn();
onloadend = jest.fn();
onloadstart = jest.fn();
onprogress = jest.fn();
onreadystatechange = jest.fn();
ontimeout = jest.fn();
open = jest.fn();
overrideMimeType = jest.fn();
readyState = 0;
removeEventListener = jest.fn();
response = null;
responseText = '';
responseType = null as any;
responseURL = '';
responseXML = null;
send = jest.fn();
setRequestHeader = jest.fn();
status = 0;
statusText = '';
timeout = 0;
upload = null as any;
withCredentials = false;
}
const xhr = new MockXMLHttpRequest();
return {
// @ts-expect-error upgrade typescript v5.1.6
xhr,
XMLHttpRequest: class {
constructor() {
return xhr;
}
} as any,
};
};

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { PluginInitializerContext } from '@kbn/core/server';
export type { BfetchServerSetup, BfetchServerStart, BatchProcessingRouteParams } from './plugin';
export async function plugin(initializerContext: PluginInitializerContext) {
const { BfetchServerPlugin } = await import('./plugin');
return new BfetchServerPlugin(initializerContext);
}

View file

@ -1,52 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { coreMock } from '@kbn/core/server/mocks';
import { BfetchServerSetup, BfetchServerStart } from '.';
import { plugin as pluginInitializer } from '.';
export type Setup = jest.Mocked<BfetchServerSetup>;
export type Start = jest.Mocked<BfetchServerStart>;
const createSetupContract = (): Setup => {
const setupContract: Setup = {
addBatchProcessingRoute: jest.fn(),
addStreamingResponseRoute: jest.fn(),
};
return setupContract;
};
const createStartContract = (): Start => {
const startContract: Start = {};
return startContract;
};
const createPlugin = async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext();
const coreSetup = coreMock.createSetup();
const coreStart = coreMock.createStart();
const plugin = await pluginInitializer(pluginInitializerContext);
const setup = await plugin.setup(coreSetup, {});
return {
pluginInitializerContext,
coreSetup,
coreStart,
plugin,
setup,
doStart: async () => await plugin.start(coreStart, {}),
};
};
export const bfetchPluginMock = {
createSetupContract,
createStartContract,
createPlugin,
};

View file

@ -1,221 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import {
CoreStart,
PluginInitializerContext,
CoreSetup,
Plugin,
Logger,
KibanaRequest,
StartServicesAccessor,
RequestHandlerContext,
RequestHandler,
KibanaResponseFactory,
AnalyticsServiceStart,
HttpProtocol,
} from '@kbn/core/server';
import { map$ } from '@kbn/std';
import { schema } from '@kbn/config-schema';
import { BFETCH_ROUTE_VERSION_LATEST } from '../common/constants';
import {
StreamingResponseHandler,
BatchRequestData,
BatchResponseItem,
ErrorLike,
removeLeadingSlash,
normalizeError,
} from '../common';
import { createStream } from './streaming';
import { getUiSettings } from './ui_settings';
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface BfetchServerSetupDependencies {}
export interface BfetchServerStartDependencies {
analytics?: AnalyticsServiceStart;
}
export interface BatchProcessingRouteParams<BatchItemData, BatchItemResult> {
onBatchItem: (data: BatchItemData) => Promise<BatchItemResult>;
}
/** @public */
export interface BfetchServerSetup {
addBatchProcessingRoute: <BatchItemData extends object, BatchItemResult extends object>(
path: string,
handler: (request: KibanaRequest) => BatchProcessingRouteParams<BatchItemData, BatchItemResult>
) => void;
addStreamingResponseRoute: <Payload, Response>(
path: string,
params: (
request: KibanaRequest,
context: RequestHandlerContext
) => StreamingResponseHandler<Payload, Response>,
method?: 'GET' | 'POST' | 'PUT' | 'DELETE',
pluginRouter?: ReturnType<CoreSetup['http']['createRouter']>
) => void;
}
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface BfetchServerStart {}
const getStreamingHeaders = (protocol: HttpProtocol): Record<string, string> => {
if (protocol === 'http2') {
return {
'Content-Type': 'application/x-ndjson',
'X-Accel-Buffering': 'no',
};
}
return {
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
};
};
interface Query {
compress: boolean;
}
export class BfetchServerPlugin
implements
Plugin<
BfetchServerSetup,
BfetchServerStart,
BfetchServerSetupDependencies,
BfetchServerStartDependencies
>
{
private _analyticsService: AnalyticsServiceStart | undefined;
constructor(private readonly initializerContext: PluginInitializerContext) {}
public setup(core: CoreSetup, plugins: BfetchServerSetupDependencies): BfetchServerSetup {
const logger = this.initializerContext.logger.get();
const router = core.http.createRouter();
core.uiSettings.register(getUiSettings());
const addStreamingResponseRoute = this.addStreamingResponseRoute({
getStartServices: core.getStartServices,
router,
logger,
});
const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute);
return {
addBatchProcessingRoute,
addStreamingResponseRoute,
};
}
public start(core: CoreStart, plugins: BfetchServerStartDependencies): BfetchServerStart {
this._analyticsService = core.analytics;
return {};
}
public stop() {}
private addStreamingResponseRoute =
({
router,
logger,
}: {
getStartServices: StartServicesAccessor;
router: ReturnType<CoreSetup['http']['createRouter']>;
logger: Logger;
}): BfetchServerSetup['addStreamingResponseRoute'] =>
(path, handler, method = 'POST', pluginRouter) => {
const httpRouter = pluginRouter || router;
const routeDefinition = {
version: BFETCH_ROUTE_VERSION_LATEST,
validate: {
request: {
body: schema.any(),
query: schema.object({ compress: schema.boolean({ defaultValue: false }) }),
},
},
};
const routeHandler: RequestHandler<unknown, Query> = async (
context: RequestHandlerContext,
request: KibanaRequest<unknown, Query, any>,
response: KibanaResponseFactory
) => {
const handlerInstance = handler(request, context);
const data = request.body;
const compress = request.query.compress;
return response.ok({
headers: getStreamingHeaders(request.protocol),
body: createStream(
handlerInstance.getResponseStream(data),
logger,
compress,
this._analyticsService
),
});
};
switch (method) {
case 'GET':
httpRouter.versioned
.get({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
case 'POST':
httpRouter.versioned
.post({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
case 'PUT':
httpRouter.versioned
.put({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
case 'DELETE':
httpRouter.versioned
.delete({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
default:
throw new Error(`Handler for method ${method} is not defined`);
}
};
private addBatchProcessingRoute =
(
addStreamingResponseRoute: BfetchServerSetup['addStreamingResponseRoute']
): BfetchServerSetup['addBatchProcessingRoute'] =>
<BatchItemData extends object, BatchItemResult extends object, E extends ErrorLike = ErrorLike>(
path: string,
handler: (
request: KibanaRequest
) => BatchProcessingRouteParams<BatchItemData, BatchItemResult>
) => {
addStreamingResponseRoute<
BatchRequestData<BatchItemData>,
BatchResponseItem<BatchItemResult, E>
>(path, (request) => {
const handlerInstance = handler(request);
return {
getResponseStream: ({ batch }) =>
map$(batch, async (batchItem, id) => {
try {
const result = await handlerInstance.onBatchItem(batchItem);
return { id, result };
} catch (error) {
return { id, error: normalizeError<E>(error) };
}
}),
};
});
};
}

View file

@ -1,109 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { promisify } from 'util';
import { Observable } from 'rxjs';
import { catchError, concatMap, finalize } from 'rxjs';
import { AnalyticsServiceStart, Logger } from '@kbn/core/server';
import { Stream, PassThrough } from 'stream';
import { constants, deflate } from 'zlib';
import { reportPerformanceMetricEvent } from '@kbn/ebt-tools';
const delimiter = '\n';
const pDeflate = promisify(deflate);
const BFETCH_SERVER_ENCODING_EVENT_TYPE = 'bfetch_server_encoding';
class StreamMetricCollector {
private readonly _collector: number[] = [];
addMetric(time: number, messageSize: number) {
this._collector.push(time);
this._collector.push(messageSize);
}
getEBTPerformanceMetricEvent() {
let totalTime = 0;
let totalMessageSize = 0;
for (let i = 0; i < this._collector.length; i += 2) {
totalTime += this._collector[i];
totalMessageSize += this._collector[i + 1];
}
return {
eventName: BFETCH_SERVER_ENCODING_EVENT_TYPE,
duration: totalTime,
key1: 'message_count',
value1: this._collector.length / 2,
key2: 'total_byte_size',
value2: totalMessageSize,
key3: 'stream_type',
value3: 1, // 1 == 'compressed'. Can always include support for ndjson-type later (e.g. 2 == ndjson)
};
}
}
async function zipMessageToStream(
output: PassThrough,
message: string,
collector?: StreamMetricCollector
) {
return new Promise(async (resolve, reject) => {
try {
const before = performance.now();
const gzipped = await pDeflate(message, {
flush: constants.Z_SYNC_FLUSH,
});
const base64Compressed = gzipped.toString('base64');
if (collector) {
// 1 ASCII character = 1 byte
collector.addMetric(performance.now() - before, base64Compressed.length);
}
output.write(base64Compressed);
output.write(delimiter);
resolve(undefined);
} catch (err) {
reject(err);
}
});
}
export const createCompressedStream = <Response>(
results: Observable<Response>,
logger: Logger,
analyticsStart?: AnalyticsServiceStart
): Stream => {
const output = new PassThrough();
const metricCollector: StreamMetricCollector | undefined = analyticsStart
? new StreamMetricCollector()
: undefined;
results
.pipe(
concatMap((message: Response) => {
const strMessage = JSON.stringify(message);
return zipMessageToStream(output, strMessage, metricCollector);
}),
catchError((e) => {
logger.error('Could not serialize or stream a message.');
logger.error(e);
throw e;
}),
finalize(() => {
output.end();
if (analyticsStart && metricCollector) {
reportPerformanceMetricEvent(
analyticsStart,
metricCollector.getEBTPerformanceMetricEvent()
);
}
})
)
.subscribe();
return output;
};

View file

@ -1,40 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Observable } from 'rxjs';
import { Logger } from '@kbn/core/server';
import { Stream, PassThrough } from 'stream';
const delimiter = '\n';
export const createNDJSONStream = <Response>(
results: Observable<Response>,
logger: Logger
): Stream => {
const stream = new PassThrough();
results.subscribe({
next: (message: Response) => {
try {
const line = JSON.stringify(message);
stream.write(`${line}${delimiter}`);
} catch (error) {
logger.error('Could not serialize or stream a message.');
logger.error(error);
}
},
error: (error) => {
stream.end();
logger.error(error);
},
complete: () => stream.end(),
});
return stream;
};

View file

@ -1,25 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { AnalyticsServiceStart, Logger } from '@kbn/core/server';
import { Stream } from 'stream';
import { Observable } from 'rxjs';
import { createCompressedStream } from './create_compressed_stream';
import { createNDJSONStream } from './create_ndjson_stream';
export function createStream<Payload, Response>(
response$: Observable<Response>,
logger: Logger,
compress: boolean,
analytics?: AnalyticsServiceStart
): Stream {
return compress
? createCompressedStream(response$, logger, analytics)
: createNDJSONStream(response$, logger);
}

View file

@ -1,12 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export * from './create_ndjson_stream';
export * from './create_compressed_stream';
export * from './create_stream';

View file

@ -1,56 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { i18n } from '@kbn/i18n';
import { UiSettingsParams } from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { DISABLE_BFETCH_COMPRESSION, DISABLE_BFETCH } from '../common';
export function getUiSettings(): Record<string, UiSettingsParams<unknown>> {
return {
[DISABLE_BFETCH]: {
name: i18n.translate('bfetch.disableBfetch', {
defaultMessage: 'Disable request batching',
}),
value: true,
description: i18n.translate('bfetch.disableBfetchDesc', {
defaultMessage:
'Disables requests batching. This increases number of HTTP requests from Kibana, but allows to debug requests individually.',
}),
schema: schema.boolean(),
deprecation: {
message: i18n.translate('bfetch.advancedSettings.disableBfetchDeprecation', {
defaultMessage: 'This setting is deprecated and will be removed in Kibana 9.0.',
}),
docLinksKey: 'generalSettings',
},
category: [],
requiresPageReload: true,
},
[DISABLE_BFETCH_COMPRESSION]: {
name: i18n.translate('bfetch.disableBfetchCompression', {
defaultMessage: 'Disable batch compression',
}),
value: false,
description: i18n.translate('bfetch.disableBfetchCompressionDesc', {
defaultMessage:
'Disable batch compression. This allows you to debug individual requests, but increases response size.',
}),
schema: schema.boolean(),
deprecation: {
message: i18n.translate('bfetch.advancedSettings.disableBfetchCompressionDeprecation', {
defaultMessage: 'This setting is deprecated and will be removed in Kibana 9.0.',
}),
docLinksKey: 'generalSettings',
},
category: [],
requiresPageReload: true,
},
};
}

View file

@ -1,21 +0,0 @@
{
"extends": "../../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types",
},
"include": ["common/**/*", "public/**/*", "server/**/*", "index.ts"],
"kbn_references": [
"@kbn/core",
"@kbn/kibana-utils-plugin",
"@kbn/i18n",
"@kbn/config-schema",
"@kbn/std",
"@kbn/core-http-common",
"@kbn/bfetch-error",
"@kbn/ebt-tools",
"@kbn/item-buffer",
],
"exclude": [
"target/**/*",
]
}

View file

@ -18,7 +18,6 @@
"browser": true,
"server": true,
"requiredPlugins": [
"bfetch",
"expressions",
"uiActions",
"share",
@ -40,4 +39,4 @@
"common"
]
}
}
}

View file

@ -7,7 +7,6 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { bfetchPluginMock } from '@kbn/bfetch-plugin/public/mocks';
import { CoreSetup, CoreStart } from '@kbn/core/public';
import { coreMock } from '@kbn/core/public/mocks';
import { DataViewsContract } from '@kbn/data-views-plugin/common';
@ -38,10 +37,8 @@ describe('Search service', () => {
describe('setup()', () => {
it('exposes proper contract', async () => {
const bfetch = bfetchPluginMock.createSetupContract();
const setup = searchService.setup(mockCoreSetup, {
packageInfo: { version: '8' },
bfetch,
expressions: { registerFunction: jest.fn(), registerType: jest.fn() },
management: managementPluginMock.createSetupContract(),
} as unknown as SearchServiceSetupDependencies);
@ -55,10 +52,8 @@ describe('Search service', () => {
describe('start()', () => {
let data: ISearchStart;
beforeEach(() => {
const bfetch = bfetchPluginMock.createSetupContract();
searchService.setup(mockCoreSetup, {
packageInfo: { version: '8' },
bfetch,
expressions: { registerFunction: jest.fn(), registerType: jest.fn() },
management: managementPluginMock.createSetupContract(),
} as unknown as SearchServiceSetupDependencies);

View file

@ -7,7 +7,6 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { BfetchPublicSetup } from '@kbn/bfetch-plugin/public';
import { ExpressionsSetup } from '@kbn/expressions-plugin/public';
import { DataViewsPublicPluginStart } from '@kbn/data-views-plugin/public';
import { UiActionsSetup, UiActionsStart } from '@kbn/ui-actions-plugin/public';
@ -32,7 +31,6 @@ import { DataViewsContract } from './data_views';
import { NowProviderPublicContract } from './now_provider';
export interface DataSetupDependencies {
bfetch: BfetchPublicSetup;
expressions: ExpressionsSetup;
uiActions: UiActionsSetup;
inspector: InspectorSetup;

View file

@ -9,7 +9,6 @@
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from '@kbn/core/server';
import { ExpressionsServerSetup } from '@kbn/expressions-plugin/server';
import { BfetchServerSetup } from '@kbn/bfetch-plugin/server';
import { PluginStart as DataViewsServerPluginStart } from '@kbn/data-views-plugin/server';
import { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server';
import { FieldFormatsSetup, FieldFormatsStart } from '@kbn/field-formats-plugin/server';
@ -47,7 +46,6 @@ export interface DataPluginStart {
}
export interface DataPluginSetupDependencies {
bfetch: BfetchServerSetup;
expressions: ExpressionsServerSetup;
usageCollection?: UsageCollectionSetup;
fieldFormats: FieldFormatsSetup;
@ -85,7 +83,7 @@ export class DataServerPlugin
public setup(
core: CoreSetup<DataPluginStartDependencies, DataPluginStart>,
{ bfetch, expressions, usageCollection, fieldFormats }: DataPluginSetupDependencies
{ expressions, usageCollection, fieldFormats }: DataPluginSetupDependencies
) {
this.scriptsService.setup(core);
const querySetup = this.queryService.setup(core);
@ -94,7 +92,6 @@ export class DataServerPlugin
core.uiSettings.register(getUiSettings(core.docLinks, this.config.enableUiSettingsValidations));
const searchSetup = this.searchService.setup(core, {
bfetch,
expressions,
usageCollection,
});

View file

@ -16,7 +16,6 @@ import { createFieldFormatsStartMock } from '@kbn/field-formats-plugin/server/mo
import { createIndexPatternsStartMock } from '../data_views/mocks';
import { SearchService, SearchServiceSetupDependencies } from './search_service';
import { bfetchPluginMock } from '@kbn/bfetch-plugin/server/mocks';
import { lastValueFrom, of } from 'rxjs';
import type {
@ -68,10 +67,8 @@ describe('Search service', () => {
describe('setup()', () => {
it('exposes proper contract', async () => {
const bfetch = bfetchPluginMock.createSetupContract();
const setup = plugin.setup(mockCoreSetup, {
packageInfo: { version: '8' },
bfetch,
expressions: {
registerFunction: jest.fn(),
registerType: jest.fn(),
@ -115,7 +112,6 @@ describe('Search service', () => {
mockSessionClient = createSearchSessionsClientMock();
const pluginSetup = plugin.setup(mockCoreSetup, {
bfetch: bfetchPluginMock.createSetupContract(),
expressions: expressionsPluginMock.createSetupContract(),
});
pluginSetup.registerSearchStrategy(ENHANCED_ES_SEARCH_STRATEGY, mockStrategy);

View file

@ -28,7 +28,6 @@ import type {
IEsSearchRequest,
IEsSearchResponse,
} from '@kbn/search-types';
import { BfetchServerSetup } from '@kbn/bfetch-plugin/server';
import { ExpressionsServerSetup } from '@kbn/expressions-plugin/server';
import { FieldFormatsStart } from '@kbn/field-formats-plugin/server';
import { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server';
@ -106,7 +105,6 @@ type StrategyMap = Record<string, ISearchStrategy<any, any>>;
/** @internal */
export interface SearchServiceSetupDependencies {
bfetch: BfetchServerSetup;
expressions: ExpressionsServerSetup;
usageCollection?: UsageCollectionSetup;
}
@ -145,7 +143,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
public setup(
core: CoreSetup<DataPluginStartDependencies, DataPluginStart>,
{ bfetch, expressions, usageCollection }: SearchServiceSetupDependencies
{ expressions, usageCollection }: SearchServiceSetupDependencies
): ISearchSetup {
core.savedObjects.registerType(searchSessionSavedObjectType);
const usage = usageCollection ? usageProvider(core) : undefined;

View file

@ -14,7 +14,6 @@
],
"kbn_references": [
"@kbn/core",
"@kbn/bfetch-plugin",
"@kbn/ui-actions-plugin",
"@kbn/share-plugin",
"@kbn/inspector-plugin",

View file

@ -116,10 +116,6 @@
"@kbn/banners-plugin/*": ["x-pack/plugins/banners/*"],
"@kbn/bazel-runner": ["packages/kbn-bazel-runner"],
"@kbn/bazel-runner/*": ["packages/kbn-bazel-runner/*"],
"@kbn/bfetch-error": ["packages/kbn-bfetch-error"],
"@kbn/bfetch-error/*": ["packages/kbn-bfetch-error/*"],
"@kbn/bfetch-plugin": ["src/plugins/bfetch"],
"@kbn/bfetch-plugin/*": ["src/plugins/bfetch/*"],
"@kbn/calculate-auto": ["packages/kbn-calculate-auto"],
"@kbn/calculate-auto/*": ["packages/kbn-calculate-auto/*"],
"@kbn/calculate-width-from-char-count": ["packages/kbn-calculate-width-from-char-count"],

View file

@ -38,7 +38,6 @@
"taskManager",
"triggersActionsUi",
"usageCollection",
"bfetch",
"uiActions",
"unifiedSearch",
"presentationUtil"

View file

@ -22,7 +22,6 @@ import { SharePluginSetup } from '@kbn/share-plugin/server';
import { ObservabilityPluginSetup } from '@kbn/observability-plugin/server';
import { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server';
import { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
import { BfetchServerSetup } from '@kbn/bfetch-plugin/server';
import { CloudSetup } from '@kbn/cloud-plugin/server';
import { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import { SecurityPluginStart } from '@kbn/security-plugin/server';
@ -75,7 +74,6 @@ export interface SyntheticsPluginsSetupDependencies {
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup;
taskManager: TaskManagerSetupContract;
telemetry: TelemetryPluginSetup;
bfetch: BfetchServerSetup;
share: SharePluginSetup;
}

View file

@ -59,7 +59,6 @@
"@kbn/core-saved-objects-api-server",
"@kbn/core-saved-objects-common",
"@kbn/features-plugin",
"@kbn/bfetch-plugin",
"@kbn/actions-plugin",
"@kbn/core-elasticsearch-server",
"@kbn/core-saved-objects-api-server-mocks",

View file

@ -38,7 +38,6 @@
"triggersActionsUi",
"usageCollection",
"unifiedSearch",
"bfetch",
"charts"
],
"optionalPlugins": [

View file

@ -23,7 +23,6 @@ import { SecurityPluginStart } from '@kbn/security-plugin/server';
import { CloudSetup } from '@kbn/cloud-plugin/server';
import { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import { FleetStartContract } from '@kbn/fleet-plugin/server';
import { BfetchServerSetup } from '@kbn/bfetch-plugin/server';
import { SharePluginSetup } from '@kbn/share-plugin/server';
import { UptimeEsClient } from '../../lib';
import { UptimeConfig } from '../../../../../common/config';
@ -59,7 +58,6 @@ export interface UptimeCorePluginsSetup {
ruleRegistry: RuleRegistryPluginSetupContract;
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup;
taskManager: TaskManagerSetupContract;
bfetch: BfetchServerSetup;
share: SharePluginSetup;
}

View file

@ -58,7 +58,6 @@
"@kbn/features-plugin",
"@kbn/rule-registry-plugin",
"@kbn/security-plugin",
"@kbn/bfetch-plugin",
"@kbn/alerts-as-data-utils",
"@kbn/std",
"@kbn/utility-types",

View file

@ -1,113 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import request from 'superagent';
import { inflateResponse } from '@kbn/bfetch-plugin/public/streaming';
import expect from '@kbn/expect';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import { BFETCH_ROUTE_VERSION_LATEST } from '@kbn/bfetch-plugin/common';
import type { FtrProviderContext } from '../../ftr_provider_context';
function parseBfetchResponse(resp: request.Response, compressed: boolean = false) {
return resp.text
.trim()
.split('\n')
.map((item) => {
return JSON.parse(compressed ? inflateResponse<any>(item) : item);
});
}
export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
// Failing: See https://github.com/elastic/kibana/issues/194716
describe.skip('bsearch', () => {
describe('ES|QL', () => {
it(`should return getColumns response in expected shape`, async () => {
const resp = await supertest
.post(`/internal/bsearch`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: {
params: {
query: 'from logstash-* | keep geo.coordinates | limit 0',
},
},
options: {
strategy: 'esql',
},
},
],
});
const jsonBody = parseBfetchResponse(resp);
expect(resp.status).to.be(200);
expect(jsonBody[0].result.rawResponse).to.eql({
columns: [
{
name: 'geo.coordinates',
type: 'geo_point',
},
],
values: [],
});
});
it(`should return getValues response in expected shape`, async () => {
const resp = await supertest
.post(`/internal/bsearch`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: {
params: {
dropNullColumns: true,
query:
'from logstash-* | keep geo.coordinates, @timestamp | sort @timestamp | limit 1',
},
},
options: {
strategy: 'esql',
},
},
],
});
const jsonBody = parseBfetchResponse(resp);
expect(resp.status).to.be(200);
expect(jsonBody[0].result.rawResponse).to.eql({
all_columns: [
{
name: 'geo.coordinates',
type: 'geo_point',
},
{
name: '@timestamp',
type: 'date',
},
],
columns: [
{
name: 'geo.coordinates',
type: 'geo_point',
},
{
name: '@timestamp',
type: 'date',
},
],
values: [['POINT (-120.9871642 38.68407028)', '2015-09-20T00:00:00.000Z']],
});
});
});
});
}

View file

@ -38,7 +38,7 @@ export default function ({ loadTestFile, getService }) {
loadTestFile(require.resolve('./migrations'));
loadTestFile(require.resolve('./get_tile'));
loadTestFile(require.resolve('./get_grid_tile'));
loadTestFile(require.resolve('./bsearch'));
loadTestFile(require.resolve('./search'));
});
});
}

View file

@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import { SEARCH_API_BASE_URL } from '@kbn/data-plugin/server/search/routes';
import { ESQL_SEARCH_STRATEGY } from '@kbn/data-plugin/common';
import type { FtrProviderContext } from '../../ftr_provider_context';
export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
describe('search', () => {
describe('ES|QL', () => {
it(`should return getColumns response in expected shape`, async () => {
const resp = await supertest
.post(`${SEARCH_API_BASE_URL}/${ESQL_SEARCH_STRATEGY}`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, '1')
.send({
params: {
query: 'from logstash-* | keep geo.coordinates | limit 0',
},
})
.expect(200);
const { took, ...response } = resp.body.rawResponse;
expect(response).to.eql({
columns: [
{
name: 'geo.coordinates',
type: 'geo_point',
},
],
values: [],
});
});
it(`should return getValues response in expected shape`, async () => {
const resp = await supertest
.post(`${SEARCH_API_BASE_URL}/${ESQL_SEARCH_STRATEGY}`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, '1')
.send({
params: {
dropNullColumns: true,
query:
'from logstash-* | keep geo.coordinates, @timestamp | sort @timestamp | limit 1',
},
})
.expect(200);
const { took, ...response } = resp.body.rawResponse;
expect(response).to.eql({
all_columns: [
{
name: 'geo.coordinates',
type: 'geo_point',
},
{
name: '@timestamp',
type: 'date',
},
],
columns: [
{
name: 'geo.coordinates',
type: 'geo_point',
},
{
name: '@timestamp',
type: 'date',
},
],
values: [['POINT (-120.9871642 38.68407028)', '2015-09-20T00:00:00.000Z']],
});
});
});
});
}

View file

@ -82,8 +82,8 @@ export default function ({ getPageObjects, getService }: FtrProviderContext) {
sectionLinks: [
'dataViews',
'filesManagement',
'aiAssistantManagementSelection',
'objects',
'aiAssistantManagementSelection',
'tags',
'search_sessions',
'spaces',

View file

@ -130,7 +130,6 @@
"@kbn/telemetry-tools",
"@kbn/profiling-plugin",
"@kbn/observability-onboarding-plugin",
"@kbn/bfetch-plugin",
"@kbn/uptime-plugin",
"@kbn/ml-category-validator",
"@kbn/observability-ai-assistant-plugin",

View file

@ -4049,14 +4049,6 @@
version "0.0.0"
uid ""
"@kbn/bfetch-error@link:packages/kbn-bfetch-error":
version "0.0.0"
uid ""
"@kbn/bfetch-plugin@link:src/plugins/bfetch":
version "0.0.0"
uid ""
"@kbn/calculate-auto@link:packages/kbn-calculate-auto":
version "0.0.0"
uid ""