mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
Optimize the expression interpreter by introducing batching of remote function calls.
This commit is contained in:
parent
8b5ec47720
commit
659e0b0a4c
6 changed files with 242 additions and 40 deletions
111
packages/kbn-interpreter/src/public/batched_fetch.js
Normal file
111
packages/kbn-interpreter/src/public/batched_fetch.js
Normal file
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import { FUNCTIONS_URL } from './consts';
|
||||
|
||||
/**
|
||||
* Create a function which executes an Expression function on the
|
||||
* server as part of a larger batch of executions.
|
||||
*/
|
||||
export function batchedFetch({ kfetch, serialize, ms = 10 }) {
|
||||
// Uniquely identifies each function call in a batch operation
|
||||
// so that the appropriate promise can be resolved / rejected later.
|
||||
let id = 0;
|
||||
|
||||
// A map like { id: { future, request } }, which is used to
|
||||
// track all of the function calls in a batch operation.
|
||||
let batch = {};
|
||||
let timeout;
|
||||
|
||||
const nextId = () => ++id;
|
||||
|
||||
const reset = () => {
|
||||
id = 0;
|
||||
batch = {};
|
||||
timeout = undefined;
|
||||
};
|
||||
|
||||
const runBatch = () => {
|
||||
processBatch(kfetch, batch);
|
||||
reset();
|
||||
};
|
||||
|
||||
return ({ functionName, context, args }) => {
|
||||
if (!timeout) {
|
||||
timeout = setTimeout(runBatch, ms);
|
||||
}
|
||||
|
||||
const id = nextId();
|
||||
const future = createFuture();
|
||||
|
||||
batch[id] = {
|
||||
future,
|
||||
request: { id, functionName, args, context: serialize(context) },
|
||||
};
|
||||
|
||||
return future.promise;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* An externally resolvable / rejectable promise, used to make sure
|
||||
* individual batch responses go to the correct caller.
|
||||
*/
|
||||
function createFuture() {
|
||||
let resolve;
|
||||
let reject;
|
||||
|
||||
return {
|
||||
resolve(val) { return resolve(val); },
|
||||
reject(val) { return reject(val); },
|
||||
promise: new Promise((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the specified batch of functions on the server, then resolves
|
||||
* the related promises.
|
||||
*/
|
||||
async function processBatch(kfetch, batch) {
|
||||
try {
|
||||
const { results } = await kfetch({
|
||||
pathname: FUNCTIONS_URL,
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
functions: Object.values(batch).map(({ request }) => request),
|
||||
}),
|
||||
});
|
||||
|
||||
results.forEach(({ id, result }) => {
|
||||
const { future } = batch[id];
|
||||
if (result.statusCode && result.err) {
|
||||
future.reject(result);
|
||||
} else {
|
||||
future.resolve(result);
|
||||
}
|
||||
});
|
||||
} catch (err) {
|
||||
Object.values(batch).forEach(({ future }) => {
|
||||
future.reject(err);
|
||||
});
|
||||
}
|
||||
}
|
21
packages/kbn-interpreter/src/public/consts.js
Normal file
21
packages/kbn-interpreter/src/public/consts.js
Normal file
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
// The server endpoint for retrieiving and running Canvas functions.
|
||||
export const FUNCTIONS_URL = '/api/canvas/fns';
|
|
@ -20,11 +20,14 @@
|
|||
import { interpreterProvider } from '../common/interpreter/interpret';
|
||||
import { serializeProvider } from '../common/lib/serialize';
|
||||
import { createHandlers } from './create_handlers';
|
||||
|
||||
export const FUNCTIONS_URL = '/api/canvas/fns';
|
||||
import { batchedFetch } from './batched_fetch';
|
||||
import { FUNCTIONS_URL } from './consts';
|
||||
|
||||
export async function initializeInterpreter(kfetch, typesRegistry, functionsRegistry) {
|
||||
const serverFunctionList = await kfetch({ pathname: FUNCTIONS_URL });
|
||||
const types = typesRegistry.toJS();
|
||||
const { serialize } = serializeProvider(types);
|
||||
const batch = batchedFetch({ kfetch, serialize });
|
||||
|
||||
// For every sever-side function, register a client-side
|
||||
// function that matches its definition, but which simply
|
||||
|
@ -32,20 +35,7 @@ export async function initializeInterpreter(kfetch, typesRegistry, functionsRegi
|
|||
Object.keys(serverFunctionList).forEach(functionName => {
|
||||
functionsRegistry.register(() => ({
|
||||
...serverFunctionList[functionName],
|
||||
async fn(context, args) {
|
||||
const types = typesRegistry.toJS();
|
||||
const { serialize } = serializeProvider(types);
|
||||
const result = await kfetch({
|
||||
pathname: `${FUNCTIONS_URL}/${functionName}`,
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
args,
|
||||
context: serialize(context),
|
||||
}),
|
||||
});
|
||||
|
||||
return result;
|
||||
},
|
||||
fn: (context, args) => batch({ functionName, args, context }),
|
||||
}));
|
||||
});
|
||||
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { initializeInterpreter, FUNCTIONS_URL } from './interpreter';
|
||||
import { FUNCTIONS_URL } from './consts';
|
||||
import { initializeInterpreter } from './interpreter';
|
||||
|
||||
jest.mock('../common/interpreter/interpret', () => ({
|
||||
interpreterProvider: () => () => ({}),
|
||||
|
@ -42,10 +43,23 @@ describe('kbn-interpreter/interpreter', () => {
|
|||
});
|
||||
|
||||
it('registers client-side functions that pass through to the server', async () => {
|
||||
const kfetch = jest.fn(async () => ({
|
||||
hello: { name: 'hello' },
|
||||
world: { name: 'world' },
|
||||
}));
|
||||
const kfetch = jest.fn(async ({ method }) => {
|
||||
if (method === 'POST') {
|
||||
return {
|
||||
results: [{
|
||||
id: 1,
|
||||
result: {
|
||||
hello: 'world',
|
||||
},
|
||||
}],
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
hello: { name: 'hello' },
|
||||
world: { name: 'world' },
|
||||
};
|
||||
});
|
||||
|
||||
const register = jest.fn();
|
||||
|
||||
|
@ -63,12 +77,21 @@ describe('kbn-interpreter/interpreter', () => {
|
|||
const context = {};
|
||||
const args = { quote: 'All we have to decide is what to do with the time that is given us.' };
|
||||
|
||||
await hello.fn(context, args);
|
||||
const result = await hello.fn(context, args);
|
||||
|
||||
expect(result).toEqual({ hello: 'world' });
|
||||
|
||||
expect(kfetch).toHaveBeenCalledWith({
|
||||
pathname: `${FUNCTIONS_URL}/hello`,
|
||||
pathname: FUNCTIONS_URL,
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ args, context }),
|
||||
body: JSON.stringify({
|
||||
functions: [{
|
||||
id: 1,
|
||||
functionName: 'hello',
|
||||
args,
|
||||
context,
|
||||
}]
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -74,7 +74,6 @@ export const DEV_ONLY_LICENSE_WHITELIST = [
|
|||
|
||||
// Globally overrides a license for a given package@version
|
||||
export const LICENSE_OVERRIDES = {
|
||||
'scriptjs@2.5.8': ['MIT'], // license header appended in the dist
|
||||
'react-lib-adler32@1.0.1': ['BSD'], // adler32 extracted from react source,
|
||||
'cycle@1.0.3': ['CC0-1.0'], // conversion to a public-domain like license
|
||||
'jsts@1.1.2': ['Eclipse Distribution License - v 1.0'], //cf. https://github.com/bjornharrtell/jsts
|
||||
|
|
|
@ -21,31 +21,69 @@ import Boom from 'boom';
|
|||
import { serializeProvider } from '@kbn/interpreter/common';
|
||||
import { API_ROUTE } from '../../common/constants';
|
||||
import { createHandlers } from '../lib/create_handlers';
|
||||
import Joi from 'joi';
|
||||
|
||||
/**
|
||||
* Register the Canvas function endopints.
|
||||
*
|
||||
* @param {*} server - The Kibana server
|
||||
*/
|
||||
export function registerServerFunctions(server) {
|
||||
// Execute functions, kind of RPC like.
|
||||
getServerFunctions(server);
|
||||
runServerFunctions(server);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the endpoint that executes a batch of functions, and sends the result back as a single response.
|
||||
*
|
||||
* @param {*} server - The Kibana server
|
||||
*/
|
||||
function runServerFunctions(server) {
|
||||
server.route({
|
||||
method: 'POST',
|
||||
path: `${API_ROUTE}/fns/{functionName}`,
|
||||
path: `${API_ROUTE}/fns`,
|
||||
options: {
|
||||
validate: {
|
||||
payload: Joi.object({
|
||||
functions: Joi.array().items(
|
||||
Joi.object()
|
||||
.keys({
|
||||
id: Joi.number().required(),
|
||||
functionName: Joi.string().required(),
|
||||
args: Joi.object().default({}),
|
||||
context: Joi.object().default({}),
|
||||
}),
|
||||
).required(),
|
||||
}).required(),
|
||||
},
|
||||
},
|
||||
async handler(req) {
|
||||
const types = server.plugins.interpreter.types.toJS();
|
||||
const { deserialize } = serializeProvider(types);
|
||||
const { functionName } = req.params;
|
||||
const { args, context } = req.payload;
|
||||
const fnDef = server.plugins.interpreter.serverFunctions.toJS()[functionName];
|
||||
|
||||
if (!fnDef) {
|
||||
throw Boom.notFound(`Function "${functionName}" could not be found.`);
|
||||
}
|
||||
|
||||
const handlers = await createHandlers(req, server);
|
||||
const result = await fnDef.fn(deserialize(context), args, handlers);
|
||||
const { functions } = req.payload;
|
||||
|
||||
return result;
|
||||
// Process each function individually, and bundle up respones / errors into
|
||||
// the format expected by the front-end batcher.
|
||||
const results = await Promise.all(functions.map(async ({ id, ...fnCall }) => {
|
||||
const result = await runFunction(server, handlers, fnCall)
|
||||
.catch(err => {
|
||||
if (Boom.isBoom(err)) {
|
||||
return { err, statusCode: err.statusCode, message: err.output.payload };
|
||||
}
|
||||
return { err: 'Internal Server Error', statusCode: 500, message: 'See server logs for details.' };
|
||||
});
|
||||
return { id, result };
|
||||
}));
|
||||
|
||||
return { results };
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Give the client the list of server-functions.
|
||||
/**
|
||||
* Register the endpoint that returns the list of server-only functions.
|
||||
* @param {*} server - The Kibana server
|
||||
*/
|
||||
function getServerFunctions(server) {
|
||||
server.route({
|
||||
method: 'GET',
|
||||
path: `${API_ROUTE}/fns`,
|
||||
|
@ -54,3 +92,23 @@ export function registerServerFunctions(server) {
|
|||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a single Canvas function.
|
||||
*
|
||||
* @param {*} server - The Kibana server object
|
||||
* @param {*} handlers - The Canvas handlers
|
||||
* @param {*} fnCall - Describes the function being run `{ functionName, args, context }`
|
||||
*/
|
||||
async function runFunction(server, handlers, fnCall) {
|
||||
const { functionName, args, context } = fnCall;
|
||||
const types = server.plugins.interpreter.types.toJS();
|
||||
const { deserialize } = serializeProvider(types);
|
||||
const fnDef = server.plugins.interpreter.serverFunctions.toJS()[functionName];
|
||||
|
||||
if (!fnDef) {
|
||||
throw Boom.notFound(`Function "${functionName}" could not be found.`);
|
||||
}
|
||||
|
||||
return fnDef.fn(deserialize(context), args, handlers);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue