bfetch (2) (#53711)

* feat: 🎸 implement ItemBuffer

* test: 💍 add tests for ItemBuffer

* feat: 🎸 add TimedItemBuffer

* test: 💍 add TimedItemBuffer tests

* feat: 🎸 add createBatchedFunction

* chore: 🤖 save wip on higher level batching

* test: 💍 add createBatchedFunction tests

* feat: 🎸 implement createStreamingBatchedFunction() method

* refactor: 💡 rename "data" key to "result"

* feat: 🎸 return error in "error" key in legacy protocol

* feat: 🎸 add server-side to Expressions plugin

* refactor: 💡 move interpreter server-side registries to NP

* feat: 🎸 implement bfetch.addBatchProcessingRoute

* feat: 🎸 improve streaming and batching func to pass request

* feat: 🎸 initial setup of new expressions batching endpoint

* feat: 🎸 expose bfetch.batchedFunction() function

* feat: 🎸 add of() function

of() function awaits a promise and converts it to a 3-tuple representing
its state.

* refactor: 💡 move normalizeError() to /common

* feat: 🎸 improve createStreamingBatchedFunction() function

* refactor: 💡 move GET /api/interpreter/fns to the New Platform

* feat: 🎸 move batched_fetch to the New Platform

* feat: 🎸 implement legacy interpreter batching on server in NP

* feat: 🎸 switch legacy interpreter server functions to NP

* chore: 🤖 remove unused import

* fix: 🐛 correct expressions mocks

* test: 💍 fix batching tests after refactor

* test: 💍 stub bfetch plugin explorer

* test: 💍 add routing and app structure to bfetch_explorer

* test: 💍 add server-side to bfetch_explorer

* test: 💍 create <DoubleInteger> component in bfetch_explorer

* test: 💍 improve bfetch_explorer

* test: 💍 add <CountUntil> demo to bfetch_explorer

* test: 💍 by default redirect to first bfetch_explorer example

* test: 💍 add error example to bfetch_explorer

* docs: ✏️ improve bfetch docs

* docs: ✏️ improve bfetch server-side docs

* chore: 🤖 address self-review comments

* fix: 🐛 use new core ES data client, remove unuseed import

* fix: 🐛 remove unused interface import

* Update examples/bfetch_explorer/public/components/count_until/index.tsx

Co-Authored-By: Lukas Olson <olson.lukas@gmail.com>

* Update examples/bfetch_explorer/public/components/double_integers/index.tsx

Co-Authored-By: Lukas Olson <olson.lukas@gmail.com>

* Update src/plugins/bfetch/common/buffer/item_buffer.ts

Co-Authored-By: Lukas Olson <olson.lukas@gmail.com>

* Update src/plugins/kibana_utils/common/of.ts

Co-Authored-By: Lukas Olson <olson.lukas@gmail.com>

* docs: ✏️ add batchedFunction params to README

* refactor: 💡 rename onflush to onFlush

* feat: 🎸 make maxItemAge optional in TimedItemBuffer

* refactor: 💡 remove promise from fetchStreaming

* test: 💍 add bfetch_explorer functional tests

* test: 💍 rename test plugin to kbn_tp_bfetch_explorer

* fix: 🐛 use stream instead of removed promise

* fix: 🐛 use correct tsconfig.json in bfetch test plugin

* feat: 🎸 add kbn_tp_bfetch_explorer server-side files to tsconfi

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Vadim Dalecky 2020-01-16 05:33:52 -08:00 committed by GitHub
parent 2acb42662c
commit 5c19c82d4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
72 changed files with 3044 additions and 400 deletions

View file

@ -5,4 +5,3 @@ This folder contains example plugins. To run the plugins in this folder, use th
```
yarn start --run-examples
```

View file

@ -0,0 +1,10 @@
{
"id": "bfetchExplorer",
"version": "0.0.1",
"kibanaVersion": "kibana",
"configPath": ["bfetch_explorer"],
"server": true,
"ui": true,
"requiredPlugins": ["bfetch"],
"optionalPlugins": []
}

View file

@ -0,0 +1,17 @@
{
"name": "bfetch_explorer",
"version": "1.0.0",
"main": "target/examples/bfetch_explorer",
"kibana": {
"version": "kibana",
"templateVersion": "1.0.0"
},
"license": "Apache-2.0",
"scripts": {
"kbn": "node ../../scripts/kbn.js",
"build": "rm -rf './target' && tsc"
},
"devDependencies": {
"typescript": "3.7.2"
}
}

View file

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import React, { useState } from 'react';
import useMountedState from 'react-use/lib/useMountedState';
import useList from 'react-use/lib/useList';
import { EuiForm, EuiSpacer, EuiFieldNumber, EuiFormRow, EuiButton } from '@elastic/eui';
import { BfetchPublicSetup } from '../../../../../src/plugins/bfetch/public';
export interface Props {
fetchStreaming: BfetchPublicSetup['fetchStreaming'];
}
export const CountUntil: React.FC<Props> = ({ fetchStreaming }) => {
const isMounted = useMountedState();
const [data, setData] = useState(5);
const [showingResults, setShowingResults] = useState(false);
const [results, { push: pushResult, clear: clearList }] = useList<string>([]);
const [completed, setCompleted] = useState(false);
const [error, setError] = useState<any>(null);
const handleSubmit = () => {
setShowingResults(true);
const { stream } = fetchStreaming({
url: '/bfetch_explorer/count',
body: JSON.stringify({ data }),
});
stream.subscribe({
next: (next: string) => {
if (!isMounted()) return;
pushResult(next);
},
error: (nextError: any) => {
if (!isMounted()) return;
setError(nextError);
},
complete: () => {
if (!isMounted()) return;
setCompleted(true);
},
});
};
const handleReset = () => {
setShowingResults(false);
clearList();
setError(null);
setCompleted(false);
};
if (showingResults) {
return (
<EuiForm data-test-subj="CountUntil">
<pre>{JSON.stringify(error || results, null, 4)}</pre>
<EuiSpacer size="l" />
<EuiButton disabled={!completed} onClick={handleReset}>
Reset
</EuiButton>
</EuiForm>
);
}
return (
<EuiForm data-test-subj="CountUntil">
<EuiFormRow label="Some integer" fullWidth>
<EuiFieldNumber
placeholder="Some integer"
value={data}
onChange={e => setData(Number(e.target.value))}
/>
</EuiFormRow>
<EuiButton type="submit" fill onClick={handleSubmit}>
Start
</EuiButton>
</EuiForm>
);
};

View file

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import React, { useState } from 'react';
import useMountedState from 'react-use/lib/useMountedState';
import useList from 'react-use/lib/useList';
import useCounter from 'react-use/lib/useCounter';
import { EuiForm, EuiSpacer, EuiTextArea, EuiFormRow, EuiButton } from '@elastic/eui';
import { ExplorerService } from '../../plugin';
interface ResultItem {
num: number;
result?: {
num: number;
};
error?: any;
}
const defaultNumbers = [2000, 300, -1, 1000].join('\n');
export interface Props {
double: ExplorerService['double'];
}
export const DoubleIntegers: React.FC<Props> = ({ double }) => {
const isMounted = useMountedState();
const [numbers, setNumbers] = useState(defaultNumbers);
const [showingResults, setShowingResults] = useState(false);
const [numberOfResultsAwaiting, counter] = useCounter(0);
const [results, { push: pushResult, clear: clearList }] = useList<ResultItem>([]);
const handleSubmit = () => {
setShowingResults(true);
const nums = numbers
.split('\n')
.map(num => num.trim())
.filter(Boolean)
.map(Number);
counter.set(nums.length);
nums.forEach(num => {
double({ num }).then(
result => {
if (!isMounted()) return;
counter.dec();
pushResult({ num, result });
},
error => {
if (!isMounted()) return;
counter.dec();
pushResult({ num, error });
}
);
});
};
const handleReset = () => {
setShowingResults(false);
counter.reset();
clearList();
};
if (showingResults) {
return (
<EuiForm data-test-subj="DoubleIntegers">
<pre>{JSON.stringify(results, null, 4)}</pre>
<EuiSpacer size="l" />
<EuiButton disabled={!!numberOfResultsAwaiting} onClick={handleReset}>
Reset
</EuiButton>
</EuiForm>
);
}
return (
<EuiForm data-test-subj="DoubleIntegers">
<EuiFormRow label="Numbers in ms separated by new line" fullWidth>
<EuiTextArea
fullWidth
placeholder="Enter numbers in milliseconds separated by new line"
value={numbers}
onChange={e => setNumbers(e.target.value)}
/>
</EuiFormRow>
<EuiButton type="submit" fill onClick={handleSubmit}>
Send
</EuiButton>
</EuiForm>
);
};

View file

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as React from 'react';
import {
EuiPageBody,
EuiPageContent,
EuiPageContentBody,
EuiPageHeader,
EuiPageHeaderSection,
EuiTitle,
} from '@elastic/eui';
export interface PageProps {
title?: React.ReactNode;
}
export const Page: React.FC<PageProps> = ({ title = 'Untitled', children }) => {
return (
<EuiPageBody>
<EuiPageHeader>
<EuiPageHeaderSection>
<EuiTitle size="l">
<h1>{title}</h1>
</EuiTitle>
</EuiPageHeaderSection>
</EuiPageHeader>
<EuiPageContent>
<EuiPageContentBody style={{ maxWidth: 800, margin: '0 auto' }}>
{children}
</EuiPageContentBody>
</EuiPageContent>
</EuiPageBody>
);
};

View file

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import React from 'react';
import { BrowserRouter as Router, Route, Redirect, Switch } from 'react-router-dom';
import { EuiPage } from '@elastic/eui';
import { useDeps } from '../../hooks/use_deps';
import { Sidebar } from './sidebar';
import { routes } from '../../routes';
export const App: React.FC = () => {
const { appBasePath } = useDeps();
const routeElements: React.ReactElement[] = [];
for (const { items } of routes) {
for (const { id, component } of items) {
routeElements.push(<Route key={id} path={`/${id}`} render={props => component} />);
}
}
return (
<Router basename={appBasePath}>
<EuiPage>
<Sidebar />
<Switch>
{routeElements}
<Redirect to="/count-until" />
</Switch>
</EuiPage>
</Router>
);
};

View file

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as React from 'react';
import { EuiPanel, EuiText } from '@elastic/eui';
import { CountUntil } from '../../../../components/count_until';
import { Page } from '../../../../components/page';
import { useDeps } from '../../../../hooks/use_deps';
// eslint-disable-next-line
export interface Props {}
export const PageCountUntil: React.FC<Props> = () => {
const { plugins } = useDeps();
return (
<Page title={'Count Until'}>
<EuiText>
This demo sends a single number N using <code>fetchStreaming</code> to the server. The
server will stream back N number of messages with 1 second delay each containing a number
from 1 to N, after which it will close the stream.
</EuiText>
<br />
<EuiPanel paddingSize="l">
<CountUntil fetchStreaming={plugins.bfetch.fetchStreaming} />
</EuiPanel>
</Page>
);
};

View file

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as React from 'react';
import { EuiPanel, EuiText } from '@elastic/eui';
import { DoubleIntegers } from '../../../../components/double_integers';
import { Page } from '../../../../components/page';
import { useDeps } from '../../../../hooks/use_deps';
// eslint-disable-next-line
export interface Props {}
export const PageDoubleIntegers: React.FC<Props> = () => {
const { explorer } = useDeps();
return (
<Page title={'Double Integers'}>
<EuiText>
Below is a list of numbers in milliseconds. They are sent as a batch to the server. For each
number server waits given number of milliseconds then doubles the number and streams it
back.
</EuiText>
<br />
<EuiPanel paddingSize="l">
<DoubleIntegers double={explorer.double} />
</EuiPanel>
</Page>
);
};

View file

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import React from 'react';
import { EuiPageSideBar, EuiSideNav } from '@elastic/eui';
import { useHistory } from 'react-router-dom';
import { routes } from '../../../routes';
// eslint-disable-next-line
interface SidebarProps {}
export const Sidebar: React.FC<SidebarProps> = () => {
const history = useHistory();
return (
<EuiPageSideBar>
<EuiSideNav
items={[
{
name: 'bfetch explorer',
id: 'home',
items: routes.map(({ id, title, items }) => ({
id,
name: title,
isSelected: true,
items: items.map(route => ({
id: route.id,
name: route.title,
onClick: () => history.push(`/${route.id}`),
'data-test-subj': route.id,
})),
})),
},
]}
/>
</EuiPageSideBar>
);
};

View file

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { useKibana } from '../../../../src/plugins/kibana_react/public';
import { BfetchDeps } from '../mount';
export const useDeps = () => useKibana().services as BfetchDeps;

View file

@ -17,5 +17,6 @@
* under the License.
*/
// The server endpoint for retrieiving and running Canvas functions.
export const FUNCTIONS_URL = '/api/interpreter/fns';
import { BfetchExplorerPlugin } from './plugin';
export const plugin = () => new BfetchExplorerPlugin();

View file

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as React from 'react';
import { render, unmountComponentAtNode } from 'react-dom';
import { CoreSetup, CoreStart, AppMountParameters } from 'kibana/public';
import { KibanaContextProvider } from '../../../src/plugins/kibana_react/public';
import { BfetchExplorerStartPlugins, ExplorerService } from './plugin';
import { App } from './containers/app';
export interface BfetchDeps {
appBasePath: string;
core: CoreStart;
plugins: BfetchExplorerStartPlugins;
explorer: ExplorerService;
}
export const mount = (
coreSetup: CoreSetup<BfetchExplorerStartPlugins>,
explorer: ExplorerService
) => async ({ appBasePath, element }: AppMountParameters) => {
const [core, plugins] = await coreSetup.getStartServices();
const deps: BfetchDeps = { appBasePath, core, plugins, explorer };
const reactElement = (
<KibanaContextProvider services={deps}>
<App />
</KibanaContextProvider>
);
render(reactElement, element);
return () => unmountComponentAtNode(element);
};

View file

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Plugin, CoreSetup } from 'kibana/public';
import { BfetchPublicSetup, BfetchPublicStart } from '../../../src/plugins/bfetch/public';
import { mount } from './mount';
export interface ExplorerService {
double: (number: { num: number }) => Promise<{ num: number }>;
}
export interface BfetchExplorerSetupPlugins {
bfetch: BfetchPublicSetup;
}
export interface BfetchExplorerStartPlugins {
bfetch: BfetchPublicStart;
}
export class BfetchExplorerPlugin implements Plugin {
public setup(core: CoreSetup<BfetchExplorerStartPlugins>, plugins: BfetchExplorerSetupPlugins) {
const double = plugins.bfetch.batchedFunction<{ num: number }, { num: number }>({
url: '/bfetch_explorer/double',
});
const explorer: ExplorerService = {
double,
};
core.application.register({
id: 'bfetch-explorer',
title: 'bfetch explorer',
mount: mount(core, explorer),
});
}
public start() {}
public stop() {}
}

View file

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import React from 'react';
import { PageDoubleIntegers } from './containers/app/pages/page_double_integers';
import { PageCountUntil } from './containers/app/pages/page_count_until';
interface RouteSectionDef {
title: string;
id: string;
items: RouteDef[];
}
interface RouteDef {
title: string;
id: string;
component: React.ReactNode;
}
export const routes: RouteSectionDef[] = [
{
title: 'fetchStreaming',
id: 'fetchStreaming',
items: [
{
title: 'Count until',
id: 'count-until',
component: <PageCountUntil />,
},
],
},
{
title: 'batchedFunction',
id: 'batchedFunction',
items: [
{
title: 'Double integers',
id: 'double-integers',
component: <PageDoubleIntegers />,
},
],
},
];

View file

@ -17,8 +17,6 @@
* under the License.
*/
import { registerServerFunctions } from './server_functions';
import { BfetchExplorerPlugin } from './plugin';
export function routes(server: any) {
registerServerFunctions(server);
}
export const plugin = () => new BfetchExplorerPlugin();

View file

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Subject } from 'rxjs';
import { Plugin, CoreSetup, CoreStart } from '../../../src/core/server';
import { BfetchServerSetup, BfetchServerStart } from '../../../src/plugins/bfetch/server';
export interface BfetchExplorerSetupPlugins {
bfetch: BfetchServerSetup;
}
export interface BfetchExplorerStartPlugins {
bfetch: BfetchServerStart;
}
export class BfetchExplorerPlugin implements Plugin {
public setup(core: CoreSetup, plugins: BfetchExplorerSetupPlugins) {
plugins.bfetch.addStreamingResponseRoute<string, string>('/bfetch_explorer/count', () => ({
getResponseStream: ({ data }: any) => {
const subject = new Subject<string>();
const countTo = Number(data);
for (let cnt = 1; cnt <= countTo; cnt++) {
setTimeout(() => {
subject.next(String(cnt));
}, cnt * 1000);
}
setTimeout(() => {
subject.complete();
}, countTo * 1000);
return subject;
},
}));
plugins.bfetch.addBatchProcessingRoute<{ num: number }, { num: number }>(
'/bfetch_explorer/double',
() => ({
onBatchItem: async ({ num }) => {
// Validate inputs.
if (num < 0) throw new Error('Invalid number');
// Wait number of specified milliseconds.
await new Promise(r => setTimeout(r, num));
// Double the number and send it back.
return { num: 2 * num };
},
})
);
}
public start(core: CoreStart, plugins: BfetchExplorerStartPlugins) {}
public stop() {}
}

View file

@ -0,0 +1,15 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "./target",
"skipLibCheck": true
},
"include": [
"index.ts",
"public/**/*.ts",
"public/**/*.tsx",
"server/**/*.ts",
"../../typings/**/*",
],
"exclude": []
}

View file

@ -22,35 +22,10 @@
// @ts-ignore
import { register, registryFactory, Registry, Fn } from '@kbn/interpreter/common';
// @ts-ignore
import { routes } from './server/routes';
import { typeSpecs as types, Type } from '../../../plugins/expressions/common';
import { Legacy } from '../../../../kibana';
export class TypesRegistry extends Registry<any, any> {
wrapper(obj: any) {
return new (Type as any)(obj);
}
}
export class FunctionsRegistry extends Registry<any, any> {
wrapper(obj: any) {
return new Fn(obj);
}
}
export const registries = {
types: new TypesRegistry(),
serverFunctions: new FunctionsRegistry(),
};
export async function init(server: Legacy.Server /* options */) {
server.injectUiAppVars('canvas', () => {
register(registries, {
types,
});
const config = server.config();
const basePath = config.get('server.basePath');
const reportingBrowserType = (() => {
@ -63,7 +38,9 @@ export async function init(server: Legacy.Server /* options */) {
return {
kbnIndex: config.get('kibana.index'),
serverFunctions: registries.serverFunctions.toArray(),
serverFunctions: (server.newPlatform.setup.plugins.expressions as any).__LEGACY
.registries()
.serverFunctions.toArray(),
basePath,
reportingBrowserType,
};
@ -71,7 +48,5 @@ export async function init(server: Legacy.Server /* options */) {
// Expose server.plugins.interpreter.register(specs) and
// server.plugins.interpreter.registries() (a getter).
server.expose(registryFactory(registries));
routes(server);
server.expose((server.newPlatform.setup.plugins.expressions as any).__LEGACY);
}

View file

@ -28,62 +28,6 @@
* server side, it should be respective function's internal implementation detail.
*/
import { get, identity } from 'lodash';
// @ts-ignore
import { npSetup, npStart } from 'ui/new_platform';
import { FUNCTIONS_URL } from './consts';
import { batchedFetch } from './batched_fetch';
import { npSetup } from 'ui/new_platform';
export function getType(node: any) {
if (node == null) return 'null';
if (typeof node === 'object') {
if (!node.type) throw new Error('Objects must have a type property');
return node.type;
}
return typeof node;
}
export function serializeProvider(types: any) {
return {
serialize: provider('serialize'),
deserialize: provider('deserialize'),
};
function provider(key: any) {
return (context: any) => {
const type = getType(context);
const typeDef = types[type];
const fn: any = get(typeDef, key) || identity;
return fn(context);
};
}
}
let cached: Promise<void> | null = null;
export const loadLegacyServerFunctionWrappers = async () => {
if (!cached) {
cached = (async () => {
const serverFunctionList = await npSetup.core.http.get(FUNCTIONS_URL);
const types = npSetup.plugins.expressions.__LEGACY.types.toJS();
const { serialize } = serializeProvider(types);
const batch = batchedFetch({
fetchStreaming: npStart.plugins.bfetch.fetchStreaming,
serialize,
});
// For every sever-side function, register a client-side
// function that matches its definition, but which simply
// calls the server-side function endpoint.
Object.keys(serverFunctionList).forEach(functionName => {
const fn = () => ({
...serverFunctionList[functionName],
fn: (context: any, args: any) => batch({ functionName, args, context }),
});
npSetup.plugins.expressions.registerFunction(fn);
});
})();
}
return cached;
};
export const { loadLegacyServerFunctionWrappers } = npSetup.plugins.expressions.__LEGACY;

View file

@ -26,6 +26,7 @@ export const registries = {
browserFunctions: functionsRegistry,
renderers: renderersRegistry,
types: typesRegistry,
loadLegacyServerFunctionWrappers: () => Promise.resolve(),
};
const resetRegistry = (registry: any) => {

View file

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import expect from '@kbn/expect';
import { createHandlers } from '../create_handlers';
const mockRequest = {
headers: 'i can haz headers',
};
const mockServer = {
plugins: {
elasticsearch: {
getCluster: () => ({
callWithRequest: (...args: any) => Promise.resolve(args),
}),
},
},
config: () => ({
has: () => false,
get: (val: any) => val,
}),
info: {
uri: 'serveruri',
},
};
describe('server createHandlers', () => {
it('provides helper methods and properties', () => {
const handlers = createHandlers(mockRequest, mockServer);
expect(handlers).to.have.property('environment', 'server');
expect(handlers).to.have.property('serverUri');
expect(handlers).to.have.property('elasticsearchClient');
});
describe('elasticsearchClient', () => {
it('executes callWithRequest', async () => {
const handlers = createHandlers(mockRequest, mockServer);
const [request, endpoint, payload] = await handlers.elasticsearchClient(
'endpoint',
'payload'
);
expect(request).to.equal(mockRequest);
expect(endpoint).to.equal('endpoint');
expect(payload).to.equal('payload');
});
});
});

View file

@ -1,166 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import Boom from 'boom';
import Joi from 'joi';
import { serializeProvider } from '../../../../../plugins/expressions/common';
import { createHandlers } from '../lib/create_handlers';
const API_ROUTE = '/api/interpreter';
/**
* Register the Canvas function endopints.
*
* @param {*} server - The Kibana server
*/
export function registerServerFunctions(server: any) {
getServerFunctions(server);
runServerFunctions(server);
}
/**
* Register the endpoint that executes a batch of functions, and sends the result back as a single response.
*
* @param {*} server - The Kibana server
*/
function runServerFunctions(server: any) {
server.route({
method: 'POST',
path: `${API_ROUTE}/fns`,
options: {
payload: {
allow: 'application/json',
maxBytes: 26214400, // 25MB payload limit
},
validate: {
payload: Joi.object({
functions: Joi.array()
.items(
Joi.object().keys({
id: Joi.number().required(),
functionName: Joi.string().required(),
args: Joi.object().default({}),
context: Joi.any().default(null),
})
)
.required(),
}).required(),
},
},
async handler(req: any) {
const handlers = await createHandlers(req, server);
const { functions } = req.payload;
// Grab the raw Node response object.
const res = req.raw.res;
// Tell Hapi not to manage the response https://github.com/hapijs/hapi/issues/3884
req._isReplied = true;
// Send the initial headers.
res.writeHead(200, {
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
});
// Write a length-delimited response
const streamResult = (result: any) => {
res.write(JSON.stringify(result) + '\n');
};
// Tries to run an interpreter function, and ensures a consistent error payload on failure.
const tryFunction = async (id: any, fnCall: any) => {
try {
const result = await runFunction(server, handlers, fnCall);
if (typeof result === 'undefined') {
return batchError(id, `Function ${fnCall.functionName} did not return anything.`);
}
return { id, statusCode: 200, result };
} catch (err) {
if (Boom.isBoom(err)) {
return batchError(id, err.output.payload, (err as any).statusCode);
} else if (err instanceof Error) {
return batchError(id, err.message);
}
server.log(['interpreter', 'error'], err);
return batchError(id, 'See server logs for details.');
}
};
// Process each function individually, and stream the responses back to the client
await Promise.all(
functions.map(({ id, ...fnCall }: any) => tryFunction(id, fnCall).then(streamResult))
);
// All of the responses have been written, so we can close the response.
res.end();
},
});
}
/**
* A helper function for bundling up errors.
*/
function batchError(id: any, message: any, statusCode = 500) {
return {
id,
statusCode,
result: { statusCode, message },
};
}
/**
* Register the endpoint that returns the list of server-only functions.
* @param {*} server - The Kibana server
*/
function getServerFunctions(server: any) {
server.route({
method: 'GET',
path: `${API_ROUTE}/fns`,
handler() {
return server.plugins.interpreter.registries().serverFunctions.toJS();
},
});
}
/**
* Run a single Canvas function.
*
* @param {*} server - The Kibana server object
* @param {*} handlers - The Canvas handlers
* @param {*} fnCall - Describes the function being run `{ functionName, args, context }`
*/
async function runFunction(server: any, handlers: any, fnCall: any) {
const registries = server.plugins.interpreter.registries();
const { functionName, args, context } = fnCall;
const types = registries.types.toJS();
const { deserialize } = serializeProvider(types);
const fnDef = registries.serverFunctions.toJS()[functionName];
if (!fnDef) {
throw Boom.notFound(`Function "${functionName}" could not be found.`);
}
return fnDef.fn(deserialize(context), args, handlers);
}

View file

@ -3,7 +3,54 @@
`bfetch` allows to batch HTTP requests and streams responses back.
# Example
We will create a batch processing endpoint that receives a number then doubles it
and streams it back. We will also consider the number to be time in milliseconds
and before streaming the number back the server will wait for the specified number of
milliseconds.
To do that, first create server-side batch processing route using [`addBatchProcessingRoute`](./docs/server/reference.md#addBatchProcessingRoute).
```ts
plugins.bfetch.addBatchProcessingRoute<{ num: number }, { num: number }>(
'/my-plugin/double',
() => ({
onBatchItem: async ({ num }) => {
// Validate inputs.
if (num < 0) throw new Error('Invalid number');
// Wait number of specified milliseconds.
await new Promise(r => setTimeout(r, num));
// Double the number and send it back.
return { num: 2 * num };
},
})
);
```
Now on client-side create `double` function using [`batchedFunction`](./docs/browser/reference.md#batchedFunction).
The newly created `double` function can be called many times and it
will package individual calls into batches and send them to the server.
```ts
const double = plugins.bfetch.batchedFunction<{ num: number }, { num: number }>({
url: '/my-plugin/double',
});
```
Note: the created `double` must accept a single object argument (`{ num: number }` in this case)
and it will return a promise that resolves into an object, too (also `{ num: number }` in this case).
Use the `double` function.
```ts
double({ num: 1 }).then(console.log, console.error); // { num: 2 }
double({ num: 2 }).then(console.log, console.error); // { num: 4 }
double({ num: 3 }).then(console.log, console.error); // { num: 6 }
```
## Reference
- [Browser](./docs/browser/reference.md)
- Server
- [Server](./docs/server/reference.md)

View file

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export interface ErrorLike {
message: string;
}
export interface BatchRequestData<Item> {
batch: Item[];
}
export interface BatchResponseItem<Result extends object, Error extends ErrorLike = ErrorLike> {
id: number;
result?: Result;
error?: Error;
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ItemBufferParams } from './item_buffer';
import { TimedItemBufferParams, TimedItemBuffer } from './timed_item_buffer';
type Fn = (...args: any) => any;
export interface BatchedFunctionParams<Func extends Fn, BatchEntry> {
onCall: (...args: Parameters<Func>) => [ReturnType<Func>, BatchEntry];
onBatch: (items: BatchEntry[]) => void;
flushOnMaxItems?: ItemBufferParams<any>['flushOnMaxItems'];
maxItemAge?: TimedItemBufferParams<any>['maxItemAge'];
}
export const createBatchedFunction = <Func extends Fn, BatchEntry>(
params: BatchedFunctionParams<Func, BatchEntry>
): [Func, TimedItemBuffer<BatchEntry>] => {
const { onCall, onBatch, maxItemAge = 10, flushOnMaxItems = 25 } = params;
const buffer = new TimedItemBuffer<BatchEntry>({
onFlush: onBatch,
maxItemAge,
flushOnMaxItems,
});
const fn: Func = ((...args) => {
const [result, batchEntry] = onCall(...args);
buffer.write(batchEntry);
return result;
}) as Func;
return [fn, buffer];
};

View file

@ -0,0 +1,22 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './item_buffer';
export * from './timed_item_buffer';
export * from './create_batched_function';

View file

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export interface ItemBufferParams<Item> {
/**
* Flushes buffer automatically if number of items in the buffer reaches
* this number. Omit it or set to `Infinity` to never flush on max buffer
* size automatically.
*/
flushOnMaxItems?: number;
/**
* Callback that is called every time buffer is flushed. It receives a single
* argument which is a list of all buffered items. If `.flush()` is called
* when buffer is empty, `.onflush` is called with empty array.
*/
onFlush: (items: Item[]) => void;
}
/**
* A simple buffer that collects items. Can be cleared or flushed; and can
* automatically flush when specified number of items is reached.
*/
export class ItemBuffer<Item> {
private list: Item[] = [];
constructor(public readonly params: ItemBufferParams<Item>) {}
/**
* Get current buffer size.
*/
public get length(): number {
return this.list.length;
}
/**
* Add item to the buffer.
*/
public write(item: Item) {
this.list.push(item);
const { flushOnMaxItems } = this.params;
if (flushOnMaxItems) {
if (this.list.length >= flushOnMaxItems) {
this.flush();
}
}
}
/**
* Remove all items from the buffer.
*/
public clear() {
this.list = [];
}
/**
* Call `.onflush` method and clear buffer.
*/
public flush() {
let list;
[list, this.list] = [this.list, []];
this.params.onFlush(list);
}
}

View file

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createBatchedFunction } from '../create_batched_function';
describe('createBatchedFunction', () => {
test('calls onCall every time fn is called, calls onBatch once flushOnMaxItems reached', async () => {
const onBatch = jest.fn();
const onCall = jest.fn(() => [1, 2] as any);
const [fn] = createBatchedFunction({
onBatch,
onCall,
flushOnMaxItems: 2,
maxItemAge: 10,
});
expect(onCall).toHaveBeenCalledTimes(0);
expect(onBatch).toHaveBeenCalledTimes(0);
fn(123);
expect(onCall).toHaveBeenCalledTimes(1);
expect(onCall).toHaveBeenCalledWith(123);
expect(onBatch).toHaveBeenCalledTimes(0);
fn(456);
expect(onCall).toHaveBeenCalledTimes(2);
expect(onCall).toHaveBeenCalledWith(456);
expect(onBatch).toHaveBeenCalledTimes(1);
expect(onBatch).toHaveBeenCalledWith([2, 2]);
});
test('calls onBatch once timeout is reached', async () => {
const onBatch = jest.fn();
const onCall = jest.fn(() => [4, 3] as any);
const [fn] = createBatchedFunction({
onBatch,
onCall,
flushOnMaxItems: 2,
maxItemAge: 10,
});
expect(onCall).toHaveBeenCalledTimes(0);
expect(onBatch).toHaveBeenCalledTimes(0);
fn(123);
expect(onCall).toHaveBeenCalledTimes(1);
expect(onCall).toHaveBeenCalledWith(123);
expect(onBatch).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 15));
expect(onCall).toHaveBeenCalledTimes(1);
expect(onBatch).toHaveBeenCalledTimes(1);
expect(onBatch).toHaveBeenCalledWith([3]);
});
});

View file

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ItemBuffer } from '../item_buffer';
import { runItemBufferTests } from './run_item_buffer_tests';
runItemBufferTests(ItemBuffer);

View file

@ -0,0 +1,239 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ItemBuffer, ItemBufferParams } from '../item_buffer';
export const runItemBufferTests = (
Buffer: new <Params extends ItemBufferParams<any>>(params: Params) => ItemBuffer<any>
) => {
describe('ItemBuffer', () => {
test('can create with or without optional "flushOnMaxItems" param', () => {
new Buffer({
onFlush: () => {},
});
new Buffer({
onFlush: () => {},
flushOnMaxItems: 123,
});
});
test('can add items to the buffer', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
buf.write('a');
buf.write('b');
buf.write('c');
});
test('returns number of items in the buffer', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
expect(buf.length).toBe(0);
buf.write('a');
expect(buf.length).toBe(1);
buf.write('b');
expect(buf.length).toBe(2);
buf.write('c');
expect(buf.length).toBe(3);
});
test('returns correct number of items after .clear() was called', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
expect(buf.length).toBe(0);
buf.write('a');
expect(buf.length).toBe(1);
buf.clear();
buf.write('b');
expect(buf.length).toBe(1);
buf.write('c');
expect(buf.length).toBe(2);
});
test('returns correct number of items after .flush() was called', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
expect(buf.length).toBe(0);
buf.write('a');
expect(buf.length).toBe(1);
buf.flush();
buf.write('b');
expect(buf.length).toBe(1);
buf.write('c');
expect(buf.length).toBe(2);
});
test('can flush buffer and receive items in chronological order', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
buf.write('a');
buf.write('b');
buf.write('c');
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(1);
expect(onFlush.mock.calls[0][0]).toEqual(['a', 'b', 'c']);
});
test('clears buffer after flush', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
buf.write('a');
buf.write('b');
buf.write('c');
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(1);
expect(onFlush.mock.calls[0][0]).toEqual(['a', 'b', 'c']);
buf.write('d');
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(2);
expect(onFlush.mock.calls[1][0]).toEqual(['d']);
});
test('can call .flush() any time as many times as needed', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
buf.flush();
buf.write(123);
buf.flush();
buf.flush();
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(4);
expect(onFlush.mock.calls[0][0]).toEqual([]);
expect(onFlush.mock.calls[1][0]).toEqual([123]);
expect(onFlush.mock.calls[2][0]).toEqual([]);
expect(onFlush.mock.calls[3][0]).toEqual([]);
});
test('calling .clear() before .flush() cases to return empty list', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
buf.write(1);
buf.write(2);
buf.clear();
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(1);
expect(onFlush.mock.calls[0][0]).toEqual([]);
});
test('can call .clear() any time as many times as needed', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
});
buf.clear();
buf.flush();
buf.write(123);
buf.clear();
buf.flush();
buf.clear();
buf.clear();
buf.flush();
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(4);
expect(onFlush.mock.calls[0][0]).toEqual([]);
expect(onFlush.mock.calls[1][0]).toEqual([]);
expect(onFlush.mock.calls[2][0]).toEqual([]);
expect(onFlush.mock.calls[3][0]).toEqual([]);
});
describe('when `flushOnMaxItems` is set', () => {
test('does not flush automatically before `flushOnMaxItems` is reached', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
flushOnMaxItems: 2,
});
buf.write(1);
expect(onFlush).toHaveBeenCalledTimes(0);
});
test('automatically flushes buffer when `flushOnMaxItems` is reached', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
flushOnMaxItems: 2,
});
buf.write(1);
buf.write(2);
expect(onFlush).toHaveBeenCalledTimes(1);
expect(onFlush.mock.calls[0][0]).toEqual([1, 2]);
});
test('flushes again when `flushOnMaxItems` limit is reached the second time', () => {
const onFlush = jest.fn();
const buf = new Buffer({
onFlush,
flushOnMaxItems: 2,
});
buf.write(1);
buf.write(2);
buf.write(3);
buf.write(4);
buf.write(5);
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(3);
expect(onFlush.mock.calls[0][0]).toEqual([1, 2]);
expect(onFlush.mock.calls[1][0]).toEqual([3, 4]);
expect(onFlush.mock.calls[2][0]).toEqual([5]);
});
});
});
};

View file

@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { TimedItemBuffer } from '../timed_item_buffer';
import { runItemBufferTests } from './run_item_buffer_tests';
describe('TimedItemBuffer', () => {
runItemBufferTests(TimedItemBuffer);
test('does not do unnecessary flushes', async () => {
const onFlush = jest.fn();
const buf = new TimedItemBuffer({
onFlush,
maxItemAge: 3,
});
expect(onFlush).toHaveBeenCalledTimes(0);
buf.write(0);
expect(onFlush).toHaveBeenCalledTimes(0);
buf.flush();
expect(onFlush).toHaveBeenCalledTimes(1);
});
test('does not do extra flush after timeout if buffer was flushed during timeout wait', async () => {
const onFlush = jest.fn();
const buf = new TimedItemBuffer({
onFlush,
maxItemAge: 10,
});
buf.write(0);
await new Promise(r => setTimeout(r, 3));
buf.flush();
await new Promise(r => setTimeout(r, 11));
expect(onFlush).toHaveBeenCalledTimes(1);
});
test('flushes buffer automatically after timeout reached', async () => {
const onFlush = jest.fn();
const buf = new TimedItemBuffer({
onFlush,
maxItemAge: 2,
});
buf.write(1);
buf.write(2);
expect(onFlush).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 3));
expect(onFlush).toHaveBeenCalledTimes(1);
expect(onFlush).toHaveBeenCalledWith([1, 2]);
});
test('does not call flush after timeout if flush was triggered because buffer size reached', async () => {
const onFlush = jest.fn();
const buf = new TimedItemBuffer({
onFlush,
flushOnMaxItems: 2,
maxItemAge: 2,
});
buf.write(1);
buf.write(2);
expect(onFlush).toHaveBeenCalledTimes(1);
await new Promise(r => setTimeout(r, 3));
expect(onFlush).toHaveBeenCalledTimes(1);
});
test('does not automatically flush if `.clear()` was called', async () => {
const onFlush = jest.fn();
const buf = new TimedItemBuffer({
onFlush,
flushOnMaxItems: 25,
maxItemAge: 5,
});
buf.write(1);
buf.write(2);
await new Promise(r => setImmediate(r));
buf.clear();
expect(onFlush).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 6));
expect(onFlush).toHaveBeenCalledTimes(0);
});
});

View file

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ItemBuffer, ItemBufferParams } from './item_buffer';
export interface TimedItemBufferParams<Item> extends ItemBufferParams<Item> {
/**
* Flushes buffer when oldest item reaches age specified by this parameter,
* in milliseconds.
*/
maxItemAge?: number;
}
export class TimedItemBuffer<Item> extends ItemBuffer<Item> {
private timer: any;
constructor(public readonly params: TimedItemBufferParams<Item>) {
super(params);
}
public write(item: Item) {
super.write(item);
if (this.params.maxItemAge && this.length === 1) {
this.timer = setTimeout(this.onTimeout, this.params.maxItemAge);
}
}
public clear() {
clearTimeout(this.timer);
super.clear();
}
public flush() {
clearTimeout(this.timer);
super.flush();
}
private onTimeout = () => {
this.flush();
};
}

View file

@ -19,3 +19,5 @@
export * from './util';
export * from './streaming';
export * from './buffer';
export * from './batch';

View file

@ -20,5 +20,5 @@
import { Observable } from 'rxjs';
export interface StreamingResponseHandler<Payload, Response> {
onRequest(payload: Payload): Observable<Response>;
getResponseStream(payload: Payload): Observable<Response>;
}

View file

@ -17,4 +17,5 @@
* under the License.
*/
export * from './normalize_error';
export * from './remove_leading_slash';

View file

@ -17,16 +17,24 @@
* under the License.
*/
export const createHandlers = (request: any, server: any) => {
const { callWithRequest } = server.plugins.elasticsearch.getCluster('data');
const config = server.config();
import { ErrorLike } from '../batch';
export const normalizeError = <E extends ErrorLike = ErrorLike>(err: any): E => {
if (!err) {
return {
message: 'Unknown error.',
} as E;
}
if (err instanceof Error) {
return { message: err.message } as E;
}
if (typeof err === 'object') {
return {
...err,
message: err.message || 'Unknown error.',
} as E;
}
return {
environment: 'server',
serverUri:
config.has('server.rewriteBasePath') && config.get('server.rewriteBasePath')
? `${server.info.uri}${config.get('server.basePath')}`
: server.info.uri,
elasticsearchClient: async (...args: any) => callWithRequest(request, ...args),
};
message: String(err),
} as E;
};

View file

@ -1,8 +1,37 @@
# `bfetch` browser reference
- [`batchedFunction`](#batchedFunction)
- [`fetchStreaming`](#fetchStreaming)
## `batchedFunction`
Creates a function that will buffer its calls (until timeout&mdash;10ms default&mdash; or capacity reached&mdash;25 default)
and send all calls in one batch to the specified endpoint. The endpoint is expected
to stream results back in ND-JSON format using `Transfer-Encoding: chunked`, which is
implemented by `addBatchProcessingRoute` server-side method of `bfetch` plugin.
The created function is expected to be called with a single object argument and will
return a promise that will resolve to an object.
```ts
const fn = bfetch.batchedFunction({ url: '/my-plugin/something' });
const result = await fn({ foo: 'bar' });
```
Options:
- `url` &mdash; URL endpoint that will receive a batch of requests. This endpoint is expected
to receive batch as a serialized JSON array. It should stream responses back
in ND-JSON format using `Transfer-Encoding: chunked` HTTP/1 streaming.
- `fetchStreaming` &mdash; The instance of `fetchStreaming` function that will perform ND-JSON handling.
There should be a version of this function available in setup contract of `bfetch` plugin.
- `flushOnMaxItems` &mdash; The maximum size of function call buffer before sending the batch request.
- `maxItemAge` &mdash; The maximum timeout in milliseconds of the oldest item in the batch
before sending the batch request.
## `fetchStreaming`
Executes an HTTP request and expects that server streams back results using
@ -12,4 +41,4 @@ HTTP/1 `Transfer-Encoding: chunked`.
const { stream } = bfetch.fetchStreaming({ url: 'http://elastic.co' });
stream.subscribe(value => {});
```
```

View file

@ -0,0 +1,54 @@
# `bfetch` server reference
- [`addBatchProcessingRoute`](#addBatchProcessingRoute)
- [`addStreamingResponseRoute`](#addStreamingResponseRoute)
## `addBatchProcessingRoute`
Sets up a server endpoint that expects to work with [`batchedFunction`](../browser/reference.md#batchedFunction).
The endpoint receives a batch of requests, processes each request and streams results
back immediately as they become available. You only need to implement the
processing of each request (`onBatchItem` function), everything else is handled.
`onBatchItem` function is called for each individual request in the batch.
`onBatchItem` function receives a single object argument which is the payload
of one request; and it must return a promise that resolves to an object, too.
`onBatchItem` function is allowed to throw, in that case the error will be forwarded
to the browser only to the individual request, the rest of the batch will still continue
executing.
```ts
plugins.bfetch.addBatchProcessingRoute<object, object>(
'/my-plugin/double',
request => ({
onBatchItem: async (payload) => {
// ...
return {};
},
})
);
```
`request` is the `KibanaRequest` object. `addBatchProcessingRoute` together with `batchedFunction`
ensure that errors are handled and that all items in the batch get executed.
## `addStreamingResponseRoute`
`addStreamingResponseRoute` is a lower-level interface that receives and `payload`
message returns and observable which results are streamed back as ND-JSON messages
until the observable completes. `addStreamingResponseRoute` does not know about the
type of the messages, it does not handle errors, and it does not have a concept of
batch size&mdash;observable can stream any number of messages until it completes.
```ts
plugins.bfetch.addStreamingResponseRoute('/my-plugin/foo', request => ({
getResponseStream: (payload) => {
const subject = new Subject();
setTimeout(() => { subject.next('123'); }, 100);
setTimeout(() => { subject.complete(); }, 200);
return subject;
},
}));
```

View file

@ -0,0 +1,521 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';
const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Promise.race<'resolved' | 'rejected' | 'pending'>([
new Promise<any>(resolve =>
promise.then(
() => resolve('resolved'),
() => resolve('rejected')
)
),
new Promise<'pending'>(resolve => resolve()).then(() => 'pending'),
]);
const isPending = (promise: Promise<unknown>): Promise<boolean> =>
getPromiseState(promise).then(state => state === 'pending');
const setup = () => {
const xhr = ({} as unknown) as XMLHttpRequest;
const { promise, resolve, reject } = defer<void>();
const stream = new Subject<any>();
const fetchStreaming = (jest.fn(() => ({
xhr,
promise,
stream,
})) as unknown) as jest.SpyInstance & typeof fetchStreamingReal;
return {
fetchStreaming,
xhr,
promise,
resolve,
reject,
stream,
};
};
describe('createStreamingBatchedFunction()', () => {
test('returns a function', () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
});
expect(typeof fn).toBe('function');
});
test('returned function is async', () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
});
const res = fn({});
expect(typeof res.then).toBe('function');
});
describe('when timeout is reached', () => {
test('dispatches batch', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ foo: 'bar' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ baz: 'quix' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 6));
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});
test('does nothing is buffer is empty', async () => {
const { fetchStreaming } = setup();
createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 6));
expect(fetchStreaming).toHaveBeenCalledTimes(0);
});
test('sends POST request to correct endpoint', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
fn({ foo: 'bar' });
await new Promise(r => setTimeout(r, 6));
expect(fetchStreaming.mock.calls[0][0]).toMatchObject({
url: '/test',
method: 'POST',
});
});
test('collects calls into an array batch ordered by in same order as calls', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
fn({ foo: 'bar' });
fn({ baz: 'quix' });
await new Promise(r => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ foo: 'bar' }, { baz: 'quix' }],
});
});
});
describe('when buffer becomes full', () => {
test('dispatches batch request', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ foo: 'bar' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ baz: 'quix' });
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ full: 'yep' });
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});
test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
expect(fetchStreaming.mock.calls[0][0]).toMatchObject({
url: '/test',
method: 'POST',
});
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ a: '1' }, { b: '2' }, { c: '3' }],
});
});
test('dispatches batch on full buffer and also on timeout', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
expect(fetchStreaming).toHaveBeenCalledTimes(1);
fn({ d: '4' });
await new Promise(r => setTimeout(r, 6));
expect(fetchStreaming).toHaveBeenCalledTimes(2);
});
});
describe('when receiving results', () => {
test('does not resolve call promises until request finishes', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
await new Promise(r => setTimeout(r, 6));
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(true);
});
test('resolves only promise of result that was streamed back', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
await new Promise(r => setTimeout(r, 6));
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(true);
expect(await isPending(promise3)).toBe(true);
stream.next(
JSON.stringify({
id: 1,
result: { foo: 'bar' },
}) + '\n'
);
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(false);
expect(await isPending(promise3)).toBe(true);
stream.next(
JSON.stringify({
id: 0,
result: { foo: 'bar 2' },
}) + '\n'
);
expect(await isPending(promise1)).toBe(false);
expect(await isPending(promise2)).toBe(false);
expect(await isPending(promise3)).toBe(true);
});
test('resolves each promise with correct data', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
await new Promise(r => setTimeout(r, 6));
stream.next(
JSON.stringify({
id: 1,
result: { foo: 'bar' },
}) + '\n'
);
stream.next(
JSON.stringify({
id: 2,
result: { foo: 'bar 2' },
}) + '\n'
);
expect(await isPending(promise1)).toBe(true);
expect(await isPending(promise2)).toBe(false);
expect(await isPending(promise3)).toBe(false);
expect(await promise2).toEqual({ foo: 'bar' });
expect(await promise3).toEqual({ foo: 'bar 2' });
});
test('rejects promise on error response', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise = fn({ a: '1' });
await new Promise(r => setTimeout(r, 6));
expect(await isPending(promise)).toBe(true);
stream.next(
JSON.stringify({
id: 0,
error: { message: 'oops' },
}) + '\n'
);
expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toEqual({
message: 'oops',
});
});
test('resolves successful requests even after rejected ones', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
const promise3 = of(fn({ a: '3' }));
await new Promise(r => setTimeout(r, 6));
stream.next(
JSON.stringify({
id: 2,
result: { b: '3' },
}) + '\n'
);
await new Promise(r => setTimeout(r, 1));
stream.next(
JSON.stringify({
id: 1,
error: { b: '2' },
}) + '\n'
);
await new Promise(r => setTimeout(r, 1));
stream.next(
JSON.stringify({
id: 0,
result: { b: '1' },
}) + '\n'
);
await new Promise(r => setTimeout(r, 1));
const [result1] = await promise1;
const [, error2] = await promise2;
const [result3] = await promise3;
expect(result1).toEqual({ b: '1' });
expect(error2).toEqual({ b: '2' });
expect(result3).toEqual({ b: '3' });
});
describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
await new Promise(r => setTimeout(r, 6));
stream.complete();
await new Promise(r => setTimeout(r, 1));
const [, error1] = await promise1;
const [, error2] = await promise2;
expect(error1).toMatchObject({
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
});
expect(error2).toMatchObject({
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
});
});
test('rejects with CONNECTION error only pending promises', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
await new Promise(r => setTimeout(r, 6));
stream.next(
JSON.stringify({
id: 1,
result: { b: '1' },
}) + '\n'
);
stream.complete();
await new Promise(r => setTimeout(r, 1));
const [, error1] = await promise1;
const [result1] = await promise2;
expect(error1).toMatchObject({
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
});
expect(result1).toMatchObject({
b: '1',
});
});
});
describe('when stream errors', () => {
test('rejects pending promises with STREAM error code', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
await new Promise(r => setTimeout(r, 6));
stream.error({
message: 'something went wrong',
});
await new Promise(r => setTimeout(r, 1));
const [, error1] = await promise1;
const [, error2] = await promise2;
expect(error1).toMatchObject({
message: 'something went wrong',
code: 'STREAM',
});
expect(error2).toMatchObject({
message: 'something went wrong',
code: 'STREAM',
});
});
test('rejects with STREAM error only pending promises', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
await new Promise(r => setTimeout(r, 6));
stream.next(
JSON.stringify({
id: 1,
result: { b: '1' },
}) + '\n'
);
stream.error('oops');
await new Promise(r => setTimeout(r, 1));
const [, error1] = await promise1;
const [result1] = await promise2;
expect(error1).toMatchObject({
message: 'oops',
code: 'STREAM',
});
expect(result1).toMatchObject({
b: '1',
});
});
});
});
});

View file

@ -0,0 +1,140 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { defer, Defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
createBatchedFunction,
BatchResponseItem,
ErrorLike,
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';
export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}
export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;
export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
}
export interface StreamingBatchedFunctionParams<Payload, Result> {
/**
* URL endpoint that will receive a batch of requests. This endpoint is expected
* to receive batch as a serialized JSON array. It should stream responses back
* in ND-JSON format using `Transfer-Encoding: chunked` HTTP/1 streaming.
*/
url: string;
/**
* The instance of `fetchStreaming` function that will perform ND-JSON handling.
* There should be a version of this function available in setup contract of `bfetch`
* plugin.
*/
fetchStreaming?: typeof fetchStreaming;
/**
* The maximum size of function call buffer before sending the batch request.
*/
flushOnMaxItems?: ItemBufferParams<any>['flushOnMaxItems'];
/**
* The maximum timeout in milliseconds of the oldest item in the batch
* before sending the batch request.
*/
maxItemAge?: TimedItemBufferParams<any>['maxItemAge'];
}
/**
* Returns a function that does not execute immediately but buffers the call internally until
* `params.flushOnMaxItems` is reached or after `params.maxItemAge` timeout in milliseconds is reached. Once
* one of those thresholds is reached all buffered calls are sent in one batch to the
* server using `params.fetchStreaming` in a POST request. Responses are streamed back
* and each batch item is resolved once corresponding response is received.
*/
export const createStreamingBatchedFunction = <Payload, Result extends object>(
params: StreamingBatchedFunctionParams<Payload, Result>
): BatchedFunc<Payload, Result> => {
const {
url,
fetchStreaming: fetchStreamingInjected = fetchStreaming,
flushOnMaxItems = 25,
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
};
return [future.promise, entry];
},
onBatch: async items => {
try {
let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
});
stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result) {
responsesReceived++;
items[response.id].future.resolve(response.result);
}
},
error: error => {
const normalizedError = normalizeError<BatchedFunctionProtocolError>(error);
normalizedError.code = 'STREAM';
for (const { future } of items) future.reject(normalizedError);
},
complete: () => {
const streamTerminatedPrematurely = responsesReceived !== items.length;
if (streamTerminatedPrematurely) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
};
for (const { future } of items) future.reject(error);
}
},
});
await stream.toPromise();
} catch (error) {
for (const item of items) item.future.reject(error);
}
},
flushOnMaxItems,
maxItemAge,
});
return fn;
};

View file

@ -20,7 +20,8 @@
import { PluginInitializerContext } from '../../../core/public';
import { BfetchPublicPlugin } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicApi } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);

View file

@ -27,6 +27,7 @@ export type Start = jest.Mocked<BfetchPublicStart>;
const createSetupContract = (): Setup => {
const setupContract: Setup = {
fetchStreaming: jest.fn(),
batchedFunction: jest.fn(),
};
return setupContract;
};
@ -34,6 +35,7 @@ const createSetupContract = (): Setup => {
const createStartContract = (): Start => {
const startContract: Start = {
fetchStreaming: jest.fn(),
batchedFunction: jest.fn(),
};
return startContract;
@ -56,7 +58,7 @@ const createPlugin = async () => {
};
};
export const uiActionsPluginMock = {
export const bfetchPluginMock = {
createSetupContract,
createStartContract,
createPlugin,

View file

@ -20,6 +20,11 @@
import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/public';
import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming';
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
BatchedFunc,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
@ -27,12 +32,15 @@ export interface BfetchPublicSetupDependencies {}
// eslint-disable-next-line
export interface BfetchPublicStartDependencies {}
export interface BfetchPublicApi {
export interface BfetchPublicContract {
fetchStreaming: (params: FetchStreamingParams) => ReturnType<typeof fetchStreamingStatic>;
batchedFunction: <Payload, Result extends object>(
params: StreamingBatchedFunctionParams<Payload, Result>
) => BatchedFunc<Payload, Result>;
}
export type BfetchPublicSetup = BfetchPublicApi;
export type BfetchPublicStart = BfetchPublicApi;
export type BfetchPublicSetup = BfetchPublicContract;
export type BfetchPublicStart = BfetchPublicContract;
export class BfetchPublicPlugin
implements
@ -42,7 +50,7 @@ export class BfetchPublicPlugin
BfetchPublicSetupDependencies,
BfetchPublicStartDependencies
> {
private api!: BfetchPublicApi;
private contract!: BfetchPublicContract;
constructor(private readonly initializerContext: PluginInitializerContext) {}
@ -51,16 +59,18 @@ export class BfetchPublicPlugin
const basePath = core.http.basePath.get();
const fetchStreaming = this.fetchStreaming(version, basePath);
const batchedFunction = this.batchedFunction(fetchStreaming);
this.api = {
this.contract = {
fetchStreaming,
batchedFunction,
};
return this.api;
return this.contract;
}
public start(core: CoreStart, plugins: BfetchPublicStartDependencies): BfetchPublicStart {
return this.api;
return this.contract;
}
public stop() {}
@ -78,4 +88,12 @@ export class BfetchPublicPlugin
...(params.headers || {}),
},
});
private batchedFunction = (
fetchStreaming: BfetchPublicContract['fetchStreaming']
): BfetchPublicContract['batchedFunction'] => params =>
createStreamingBatchedFunction({
...params,
fetchStreaming: params.fetchStreaming || fetchStreaming,
});
}

View file

@ -36,14 +36,6 @@ test('returns XHR request', () => {
expect(typeof xhr.readyState).toBe('number');
});
test('returns promise', () => {
setup();
const { promise } = fetchStreaming({
url: 'http://example.com',
});
expect(typeof promise.then).toBe('function');
});
test('returns stream', () => {
setup();
const { stream } = fetchStreaming({
@ -54,12 +46,12 @@ test('returns stream', () => {
test('promise resolves when request completes', async () => {
const env = setup();
const { promise } = fetchStreaming({
const { stream } = fetchStreaming({
url: 'http://example.com',
});
let resolved = false;
promise.then(() => (resolved = true));
stream.toPromise().then(() => (resolved = true));
await tick();
expect(resolved).toBe(false);
@ -142,12 +134,12 @@ test('completes stream observable when request finishes', async () => {
test('promise throws when request errors', async () => {
const env = setup();
const { promise } = fetchStreaming({
const { stream } = fetchStreaming({
url: 'http://example.com',
});
const spy = jest.fn();
promise.catch(spy);
stream.toPromise().catch(spy);
await tick();
expect(spy).toHaveBeenCalledTimes(0);
@ -168,12 +160,11 @@ test('promise throws when request errors', async () => {
test('stream observable errors when request errors', async () => {
const env = setup();
const { promise, stream } = fetchStreaming({
const { stream } = fetchStreaming({
url: 'http://example.com',
});
const spy = jest.fn();
promise.catch(() => {});
stream.subscribe({
error: spy,
});

View file

@ -17,7 +17,6 @@
* under the License.
*/
import { defer } from '../../../kibana_utils/common';
import { fromStreamingXhr } from './from_streaming_xhr';
export interface FetchStreamingParams {
@ -38,7 +37,6 @@ export function fetchStreaming({
body = '',
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();
const { promise, resolve, reject } = defer<void>();
// Begin the request
xhr.open(method, url);
@ -49,17 +47,11 @@ export function fetchStreaming({
const stream = fromStreamingXhr(xhr);
stream.subscribe({
complete: () => resolve(),
error: error => reject(error),
});
// Send the payload to the server
xhr.send(body);
return {
xhr,
promise,
stream,
};
}

View file

@ -20,7 +20,7 @@
import { PluginInitializerContext } from '../../../core/server';
import { BfetchServerPlugin } from './plugin';
export { BfetchServerSetup, BfetchServerStart } from './plugin';
export { BfetchServerSetup, BfetchServerStart, BatchProcessingRouteParams } from './plugin';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchServerPlugin(initializerContext);

View file

@ -26,6 +26,7 @@ export type Start = jest.Mocked<BfetchServerStart>;
const createSetupContract = (): Setup => {
const setupContract: Setup = {
addBatchProcessingRoute: jest.fn(),
addStreamingResponseRoute: jest.fn(),
};
return setupContract;
@ -54,7 +55,7 @@ const createPlugin = async () => {
};
};
export const uiActionsPluginMock = {
export const bfetchPluginMock = {
createSetupContract,
createStartContract,
createPlugin,

View file

@ -17,9 +17,24 @@
* under the License.
*/
import { CoreStart, PluginInitializerContext, CoreSetup, Plugin, Logger } from 'src/core/server';
import {
CoreStart,
PluginInitializerContext,
CoreSetup,
Plugin,
Logger,
KibanaRequest,
} from 'src/core/server';
import { schema } from '@kbn/config-schema';
import { StreamingResponseHandler, removeLeadingSlash } from '../common';
import { Subject } from 'rxjs';
import {
StreamingResponseHandler,
BatchRequestData,
BatchResponseItem,
ErrorLike,
removeLeadingSlash,
normalizeError,
} from '../common';
import { createNDJSONStream } from './streaming';
// eslint-disable-next-line
@ -28,8 +43,19 @@ export interface BfetchServerSetupDependencies {}
// eslint-disable-next-line
export interface BfetchServerStartDependencies {}
export interface BatchProcessingRouteParams<BatchItemData, BatchItemResult> {
onBatchItem: (data: BatchItemData) => Promise<BatchItemResult>;
}
export interface BfetchServerSetup {
addStreamingResponseRoute: (path: string, handler: StreamingResponseHandler<any, any>) => void;
addBatchProcessingRoute: <BatchItemData extends object, BatchItemResult extends object>(
path: string,
handler: (request: KibanaRequest) => BatchProcessingRouteParams<BatchItemData, BatchItemResult>
) => void;
addStreamingResponseRoute: <Payload, Response>(
path: string,
params: (request: KibanaRequest) => StreamingResponseHandler<Payload, Response>
) => void;
}
// eslint-disable-next-line
@ -49,8 +75,10 @@ export class BfetchServerPlugin
const logger = this.initializerContext.logger.get();
const router = core.http.createRouter();
const addStreamingResponseRoute = this.addStreamingResponseRoute({ router, logger });
const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute);
return {
addBatchProcessingRoute,
addStreamingResponseRoute,
};
}
@ -76,17 +104,56 @@ export class BfetchServerPlugin
},
},
async (context, request, response) => {
const handlerInstance = handler(request);
const data = request.body;
const headers = {
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
};
return response.ok({
headers: {
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
},
body: createNDJSONStream(data, handler, logger),
headers,
body: createNDJSONStream(data, handlerInstance, logger),
});
}
);
};
private addBatchProcessingRoute = (
addStreamingResponseRoute: BfetchServerSetup['addStreamingResponseRoute']
): BfetchServerSetup['addBatchProcessingRoute'] => <
BatchItemData extends object,
BatchItemResult extends object,
E extends ErrorLike = ErrorLike
>(
path: string,
handler: (request: KibanaRequest) => BatchProcessingRouteParams<BatchItemData, BatchItemResult>
) => {
addStreamingResponseRoute<
BatchRequestData<BatchItemData>,
BatchResponseItem<BatchItemResult, E>
>(path, request => {
const handlerInstance = handler(request);
return {
getResponseStream: ({ batch }) => {
const subject = new Subject<BatchResponseItem<BatchItemResult, E>>();
let cnt = batch.length;
batch.forEach(async (batchItem, id) => {
try {
const result = await handlerInstance.onBatchItem(batchItem);
subject.next({ id, result });
} catch (err) {
const error = normalizeError<E>(err);
subject.next({ id, error });
} finally {
cnt--;
if (!cnt) subject.complete();
}
});
return subject;
},
};
});
};
}

View file

@ -29,7 +29,7 @@ export const createNDJSONStream = <Payload, Response>(
logger: Logger
): Stream => {
const stream = new PassThrough();
const results = handler.onRequest(payload);
const results = handler.getResponseStream(payload);
results.subscribe({
next: (message: Response) => {

View file

@ -30,11 +30,6 @@ export function getType(node: any) {
}
export function serializeProvider(types: any) {
return {
serialize: provider('serialize'),
deserialize: provider('deserialize'),
};
function provider(key: any) {
return (context: any) => {
const type = getType(context);
@ -43,6 +38,11 @@ export function serializeProvider(types: any) {
return fn(context);
};
}
return {
serialize: provider('serialize'),
deserialize: provider('deserialize'),
};
}
export class Type {

View file

@ -1,9 +1,10 @@
{
"id": "expressions",
"version": "kibana",
"server": false,
"server": true,
"ui": true,
"requiredPlugins": [
"bfetch",
"inspector"
]
}

View file

@ -18,7 +18,7 @@
*/
import { batchedFetch, Request } from './batched_fetch';
import { defer } from '../../../../../plugins/kibana_utils/public';
import { defer } from '../../kibana_utils/public';
import { Subject } from 'rxjs';
const serialize = (o: any) => JSON.stringify(o);

View file

@ -20,13 +20,11 @@
import _ from 'lodash';
import { filter, map } from 'rxjs/operators';
// eslint-disable-next-line
import { split } from '../../../../../plugins/bfetch/public/streaming';
import { BfetchPublicApi } from '../../../../../plugins/bfetch/public';
import { defer } from '../../../../../plugins/kibana_utils/public';
import { FUNCTIONS_URL } from './consts';
import { split, BfetchPublicContract } from '../../bfetch/public';
import { defer } from '../../kibana_utils/public';
export interface Options {
fetchStreaming: BfetchPublicApi['fetchStreaming'];
fetchStreaming: BfetchPublicContract['fetchStreaming'];
serialize: any;
ms?: number;
}
@ -111,9 +109,9 @@ export function batchedFetch({ fetchStreaming, serialize, ms = 10 }: Options) {
* Runs the specified batch of functions on the server, then resolves
* the related promises.
*/
async function processBatch(fetchStreaming: BfetchPublicApi['fetchStreaming'], batch: Batch) {
const { stream, promise } = fetchStreaming({
url: FUNCTIONS_URL,
async function processBatch(fetchStreaming: BfetchPublicContract['fetchStreaming'], batch: Batch) {
const { stream } = fetchStreaming({
url: `/api/interpreter/fns`,
body: JSON.stringify({
functions: Object.values(batch).map(({ request }) => request),
}),
@ -137,7 +135,7 @@ async function processBatch(fetchStreaming: BfetchPublicApi['fetchStreaming'], b
});
try {
await promise;
await stream.toPromise();
} catch (error) {
Object.values(batch).forEach(({ future }) => {
future.reject(error);

View file

@ -23,6 +23,7 @@ import { ExpressionsSetup, ExpressionsStart, plugin as pluginInitializer } from
/* eslint-disable */
import { coreMock } from '../../../core/public/mocks';
import { inspectorPluginMock } from '../../inspector/public/mocks';
import { bfetchPluginMock } from '../../bfetch/public/mocks';
/* eslint-enable */
export type Setup = jest.Mocked<ExpressionsSetup>;
@ -48,6 +49,7 @@ const createSetupContract = (): Setup => {
interpretAst: () => {},
},
}),
loadLegacyServerFunctionWrappers: () => Promise.resolve(),
},
};
return setupContract;
@ -71,6 +73,7 @@ const createPlugin = async () => {
const coreStart = coreMock.createStart();
const plugin = pluginInitializer(pluginInitializerContext);
const setup = await plugin.setup(coreSetup, {
bfetch: bfetchPluginMock.createSetupContract(),
inspector: inspectorPluginMock.createSetupContract(),
});
@ -82,6 +85,7 @@ const createPlugin = async () => {
setup,
doStart: async () =>
await plugin.start(coreStart, {
bfetch: bfetchPluginMock.createStartContract(),
inspector: inspectorPluginMock.createStartContract(),
}),
};

View file

@ -20,6 +20,7 @@
import { PluginInitializerContext, CoreSetup, CoreStart, Plugin } from '../../../core/public';
import { ExpressionInterpretWithHandlers, ExpressionExecutor } from './types';
import { FunctionsRegistry, RenderFunctionsRegistry, TypesRegistry } from './registries';
import { BfetchPublicSetup, BfetchPublicStart } from '../../bfetch/public';
import { Setup as InspectorSetup, Start as InspectorStart } from '../../inspector/public';
import {
setCoreStart,
@ -58,12 +59,15 @@ import { ExpressionLoader, loader } from './loader';
import { ExpressionDataHandler, execute } from './execute';
import { render, ExpressionRenderHandler } from './render';
import { AnyExpressionFunction, AnyExpressionType } from '../common/types';
import { serializeProvider } from '../common';
export interface ExpressionsSetupDeps {
bfetch: BfetchPublicSetup;
inspector: InspectorSetup;
}
export interface ExpressionsStartDeps {
bfetch: BfetchPublicStart;
inspector: InspectorStart;
}
@ -76,6 +80,7 @@ export interface ExpressionsSetup {
renderers: RenderFunctionsRegistry;
types: TypesRegistry;
getExecutor: () => ExpressionExecutor;
loadLegacyServerFunctionWrappers: () => Promise<void>;
};
}
@ -98,7 +103,7 @@ export class ExpressionsPublicPlugin
constructor(initializerContext: PluginInitializerContext) {}
public setup(core: CoreSetup, { inspector }: ExpressionsSetupDeps): ExpressionsSetup {
public setup(core: CoreSetup, { inspector, bfetch }: ExpressionsSetupDeps): ExpressionsSetup {
const { functions, renderers, types } = this;
setRenderersRegistry(renderers);
@ -146,6 +151,31 @@ export class ExpressionsPublicPlugin
setInterpreter(getExecutor().interpreter);
let cached: Promise<void> | null = null;
const loadLegacyServerFunctionWrappers = async () => {
if (!cached) {
cached = (async () => {
const serverFunctionList = await core.http.get(`/api/interpreter/fns`);
const batchedFunction = bfetch.batchedFunction({ url: `/api/interpreter/fns` });
const { serialize } = serializeProvider(types.toJS());
// For every sever-side function, register a client-side
// function that matches its definition, but which simply
// calls the server-side function endpoint.
Object.keys(serverFunctionList).forEach(functionName => {
const fn = () => ({
...serverFunctionList[functionName],
fn: (context: any, args: any) => {
return batchedFunction({ functionName, args, context: serialize(context) });
},
});
registerFunction(fn);
});
})();
}
return cached;
};
const setup: ExpressionsSetup = {
registerFunction,
registerRenderer: (renderer: any) => {
@ -159,6 +189,7 @@ export class ExpressionsPublicPlugin
renderers,
types,
getExecutor,
loadLegacyServerFunctionWrappers,
},
};

View file

@ -0,0 +1,27 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { PluginInitializerContext } from '../../../core/server';
import { ExpressionsServerPlugin } from './plugin';
export { ExpressionsServerSetup, ExpressionsServerStart } from './plugin';
export function plugin(initializerContext: PluginInitializerContext) {
return new ExpressionsServerPlugin(initializerContext);
}

View file

@ -0,0 +1,135 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/* eslint-disable max-classes-per-file */
// TODO: Remove this file once https://github.com/elastic/kibana/issues/46906 is complete.
// @ts-ignore
import { register, registryFactory, Registry, Fn } from '@kbn/interpreter/common';
import Boom from 'boom';
import { schema } from '@kbn/config-schema';
import { CoreSetup, Logger } from 'src/core/server';
import { ExpressionsServerSetupDependencies } from './plugin';
import { typeSpecs as types, Type } from '../common';
import { serializeProvider } from '../common';
export class TypesRegistry extends Registry<any, any> {
wrapper(obj: any) {
return new (Type as any)(obj);
}
}
export class FunctionsRegistry extends Registry<any, any> {
wrapper(obj: any) {
return new Fn(obj);
}
}
export const registries = {
types: new TypesRegistry(),
serverFunctions: new FunctionsRegistry(),
};
export interface LegacyInterpreterServerApi {
registries(): typeof registries;
register(specs: Record<keyof typeof registries, any[]>): typeof registries;
}
export const createLegacyServerInterpreterApi = (): LegacyInterpreterServerApi => {
const api = registryFactory(registries);
register(registries, {
types,
});
return api;
};
export const createLegacyServerEndpoints = (
api: LegacyInterpreterServerApi,
logger: Logger,
core: CoreSetup,
plugins: ExpressionsServerSetupDependencies
) => {
const router = core.http.createRouter();
/**
* Register the endpoint that returns the list of server-only functions.
*/
router.get(
{
path: `/api/interpreter/fns`,
validate: {
body: schema.any(),
},
},
async (context, request, response) => {
const functions = api.registries().serverFunctions.toJS();
const body = JSON.stringify(functions);
return response.ok({
body,
});
}
);
/**
* Run a single Canvas function.
*
* @param {*} server - The Kibana server object
* @param {*} handlers - The Canvas handlers
* @param {*} fnCall - Describes the function being run `{ functionName, args, context }`
*/
async function runFunction(handlers: any, fnCall: any) {
const { functionName, args, context } = fnCall;
const { deserialize } = serializeProvider(registries.types.toJS());
const fnDef = registries.serverFunctions.toJS()[functionName];
if (!fnDef) throw Boom.notFound(`Function "${functionName}" could not be found.`);
const deserialized = deserialize(context);
const result = fnDef.fn(deserialized, args, handlers);
return result;
}
/**
* Register an endpoint that executes a batch of functions, and streams the
* results back using ND-JSON.
*/
plugins.bfetch.addBatchProcessingRoute(`/api/interpreter/fns`, request => {
const scopedClient = core.elasticsearch.dataClient.asScoped(request);
const handlers = {
environment: 'server',
elasticsearchClient: async (
endpoint: string,
clientParams: Record<string, any> = {},
options?: any
) => scopedClient.callAsCurrentUser(endpoint, clientParams, options),
};
return {
onBatchItem: async (fnCall: any) => {
const result = await runFunction(handlers, fnCall);
if (typeof result === 'undefined') {
throw new Error(`Function ${fnCall.functionName} did not return anything.`);
}
return result;
},
};
});
};

View file

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ExpressionsServerSetup, ExpressionsServerStart } from '.';
import { plugin as pluginInitializer } from '.';
import { coreMock } from '../../../core/server/mocks';
/* eslint-disable */
import { bfetchPluginMock } from '../../bfetch/server/mocks';
/* eslint-enable */
export type Setup = jest.Mocked<ExpressionsServerSetup>;
export type Start = jest.Mocked<ExpressionsServerStart>;
const createSetupContract = (): Setup => {
const setupContract: Setup = {
__LEGACY: {
register: jest.fn(),
registries: jest.fn(),
},
};
return setupContract;
};
const createStartContract = (): Start => {
const startContract: Start = {};
return startContract;
};
const createPlugin = async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext();
const coreSetup = coreMock.createSetup();
const coreStart = coreMock.createStart();
const plugin = pluginInitializer(pluginInitializerContext);
const setup = await plugin.setup(coreSetup, {
bfetch: bfetchPluginMock.createSetupContract(),
});
return {
pluginInitializerContext,
coreSetup,
coreStart,
plugin,
setup,
doStart: async () =>
await plugin.start(coreStart, {
bfetch: bfetchPluginMock.createStartContract(),
}),
};
};
export const expressionsPluginMock = {
createSetupContract,
createStartContract,
createPlugin,
};

View file

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/server';
import { BfetchServerSetup, BfetchServerStart } from '../../bfetch/server';
import {
LegacyInterpreterServerApi,
createLegacyServerInterpreterApi,
createLegacyServerEndpoints,
} from './legacy';
// eslint-disable-next-line
export interface ExpressionsServerSetupDependencies {
bfetch: BfetchServerSetup;
}
// eslint-disable-next-line
export interface ExpressionsServerStartDependencies {
bfetch: BfetchServerStart;
}
export interface ExpressionsServerSetup {
__LEGACY: LegacyInterpreterServerApi;
}
// eslint-disable-next-line
export interface ExpressionsServerStart {}
export class ExpressionsServerPlugin
implements
Plugin<
ExpressionsServerSetup,
ExpressionsServerStart,
ExpressionsServerSetupDependencies,
ExpressionsServerStartDependencies
> {
constructor(private readonly initializerContext: PluginInitializerContext) {}
public setup(
core: CoreSetup,
plugins: ExpressionsServerSetupDependencies
): ExpressionsServerSetup {
const logger = this.initializerContext.logger.get();
const legacyApi = createLegacyServerInterpreterApi();
createLegacyServerEndpoints(legacyApi, logger, core, plugins);
return {
__LEGACY: legacyApi,
};
}
public start(
core: CoreStart,
plugins: ExpressionsServerStartDependencies
): ExpressionsServerStart {
return {};
}
public stop() {}
}

View file

@ -18,4 +18,5 @@
*/
export * from './defer';
export * from './of';
export { distinctUntilChangedWithInitialValue } from './distinct_until_changed_with_initial_value';

View file

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { of } from './of';
describe('of()', () => {
describe('when promise resolves', () => {
const promise = new Promise(resolve => resolve()).then(() => 123);
test('first member of 3-tuple is the promise value', async () => {
const [result] = await of(promise);
expect(result).toBe(123);
});
test('second member of 3-tuple is undefined', async () => {
const [, error] = await of(promise);
expect(error).toBe(undefined);
});
test('third, flag member, of 3-tuple is true', async () => {
const [, , resolved] = await of(promise);
expect(resolved).toBe(true);
});
});
describe('when promise rejects', () => {
const promise = new Promise(resolve => resolve()).then(() => {
// eslint-disable-next-line no-throw-literal
throw 123;
});
test('first member of 3-tuple is undefined', async () => {
const [result] = await of(promise);
expect(result).toBe(undefined);
});
test('second member of 3-tuple is thrown error', async () => {
const [, error] = await of(promise);
expect(error).toBe(123);
});
test('third, flag member, of 3-tuple is false', async () => {
const [, , resolved] = await of(promise);
expect(resolved).toBe(false);
});
});
});

View file

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Given a promise awaits it and returns a 3-tuple, with the following members:
*
* - First entry is either the resolved value of the promise or `undefined`.
* - Second entry is either the error thrown by promise or `undefined`.
* - Third entry is a boolean, truthy if promise was resolved and falsy if rejected.
*
* @param promise Promise to convert to 3-tuple.
*/
export const of = async <T, E = any>(
promise: Promise<T>
): Promise<[T | undefined, E | undefined, boolean]> => {
try {
return [await promise, undefined, true];
} catch (error) {
return [undefined, error, false];
}
};

View file

@ -17,7 +17,7 @@
* under the License.
*/
export { defer } from '../common';
export { defer, Defer, of } from '../common';
export * from './core';
export * from './errors';
export * from './field_mapping';

View file

@ -38,6 +38,7 @@ export default async function({ readConfigFile }) {
require.resolve('./test_suites/embeddable_explorer'),
require.resolve('./test_suites/core_plugins'),
require.resolve('./test_suites/management'),
require.resolve('./test_suites/bfetch_explorer'),
],
services: {
...functionalConfig.get('services'),

View file

@ -0,0 +1,10 @@
{
"id": "kbn_tp_bfetch_explorer",
"version": "0.0.1",
"kibanaVersion": "kibana",
"configPath": ["kbn_tp_bfetch_explorer"],
"server": true,
"ui": true,
"requiredPlugins": ["bfetch"],
"optionalPlugins": []
}

View file

@ -0,0 +1,17 @@
{
"name": "kbn_tp_bfetch_explorer",
"version": "1.0.0",
"main": "target/examples/kbn_tp_bfetch_explorer",
"kibana": {
"version": "kibana",
"templateVersion": "1.0.0"
},
"license": "Apache-2.0",
"scripts": {
"kbn": "node ../../scripts/kbn.js",
"build": "rm -rf './target' && tsc"
},
"devDependencies": {
"typescript": "3.7.2"
}
}

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from '../../../../../examples/bfetch_explorer/public';

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from '../../../../../examples/bfetch_explorer/server';

View file

@ -0,0 +1,21 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"outDir": "./target",
"skipLibCheck": true,
"types": [
"node",
"jest",
"react"
]
},
"include": [
"index.ts",
"public/**/*.ts",
"public/**/*.tsx",
"server/**/*.ts",
"server/**/*.tsx",
"../../../../typings/**/*",
],
"exclude": []
}

View file

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../functional/ftr_provider_context';
export default function({ getService }: FtrProviderContext) {
const testSubjects = getService('testSubjects');
const appsMenu = getService('appsMenu');
describe('batchedFunction', () => {
beforeEach(async () => {
await appsMenu.clickLink('bfetch explorer');
await testSubjects.click('count-until');
await testSubjects.click('double-integers');
});
it('executes all requests in a batch', async () => {
const form = await testSubjects.find('DoubleIntegers');
const btn = await form.findByCssSelector('button');
await btn.click();
await new Promise(r => setTimeout(r, 4000));
const pre = await form.findByCssSelector('pre');
const text = await pre.getVisibleText();
const json = JSON.parse(text);
expect(json).to.eql([
{
num: -1,
error: {
message: 'Invalid number',
},
},
{
num: 300,
result: {
num: 600,
},
},
{
num: 1000,
result: {
num: 2000,
},
},
{
num: 2000,
result: {
num: 4000,
},
},
]);
});
it('streams results back', async () => {
const form = await testSubjects.find('DoubleIntegers');
const btn = await form.findByCssSelector('button');
await btn.click();
await new Promise(r => setTimeout(r, 500));
const pre = await form.findByCssSelector('pre');
const text1 = await pre.getVisibleText();
const json1 = JSON.parse(text1);
expect(json1.length > 0).to.be(true);
expect(json1.length < 4).to.be(true);
await new Promise(r => setTimeout(r, 3500));
const text2 = await pre.getVisibleText();
const json2 = JSON.parse(text2);
expect(json2.length).to.be(4);
});
});
}

View file

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { FtrProviderContext } from '../../../functional/ftr_provider_context';
export default function({ getService, getPageObjects, loadTestFile }: FtrProviderContext) {
const browser = getService('browser');
const appsMenu = getService('appsMenu');
const PageObjects = getPageObjects(['common', 'header']);
describe('bfetch explorer', function() {
before(async () => {
await browser.setWindowSize(1300, 900);
await PageObjects.common.navigateToApp('settings');
await appsMenu.clickLink('bfetch explorer');
});
loadTestFile(require.resolve('./batched_function'));
});
}