mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
Feat/expression threading (#24598)
Replaces https://github.com/elastic/kibana/pull/23301 Closes https://github.com/elastic/kibana/issues/23080 --- This is a minimal threading implementation for Canvas. There's still a lot to be done to make this concept great, but this is a start. What it does: - Creates a server side abstraction on top of the interpreter - Determines where to send the expression by checking the first function to be run - Loads common functions in a separate worker thread on the server. - Routes to a single forked worker (thread), the main thread (server), or the browser (browser), in that order - Defers back to the router when a function isn't found. Fails if the function isn't found in any of the above 3 environments - Times out the worker if it takes too long, and respawns it as needed. - Simplifies the error dialog to remove the stack. What is does not.: - Round robin a pool of workers - Queue. If one expression in the threaded env fails then anything sent to it in the meantime will fail. The upstream environment handles managing timeouts. I think this would only make sense todo with a pool. - Client side. This doesn't implement web workers, but we could use roughly the same architecture. - Implement a specific, pluggable `worker` environment on the server. Right now it's just common functions, so plugin authors will always end up in a thread if they put their function in the common directory. What I don't like: - The socketProvider code. This was reused across the server & browser, but now that it's only used in the browser there's no good reason for the abstraction - The serialize/deserialize stuff feels messy. Do we really need serialization?
This commit is contained in:
parent
a0543b1aec
commit
b8b0229fd6
21 changed files with 496 additions and 171 deletions
13
x-pack/plugins/canvas/common/interpreter/create_error.js
Normal file
13
x-pack/plugins/canvas/common/interpreter/create_error.js
Normal file
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
export const createError = err => ({
|
||||
type: 'error',
|
||||
error: {
|
||||
stack: process.env.NODE_ENV === 'production' ? undefined : err.stack,
|
||||
message: typeof err === 'string' ? err : err.message,
|
||||
},
|
||||
});
|
|
@ -11,19 +11,7 @@ import { fromExpression } from '../lib/ast';
|
|||
import { getByAlias } from '../lib/get_by_alias';
|
||||
import { typesRegistry } from '../lib/types_registry';
|
||||
import { castProvider } from './cast';
|
||||
|
||||
const createError = (err, { name, context, args }) => ({
|
||||
type: 'error',
|
||||
error: {
|
||||
stack: err.stack,
|
||||
message: typeof err === 'string' ? err : err.message,
|
||||
},
|
||||
info: {
|
||||
context,
|
||||
args,
|
||||
functionName: name,
|
||||
},
|
||||
});
|
||||
import { createError } from './create_error';
|
||||
|
||||
export function interpretProvider(config) {
|
||||
const { functions, onFunctionNotFound, types } = config;
|
||||
|
@ -32,7 +20,7 @@ export function interpretProvider(config) {
|
|||
|
||||
return interpret;
|
||||
|
||||
function interpret(node, context = null) {
|
||||
async function interpret(node, context = null) {
|
||||
switch (getType(node)) {
|
||||
case 'expression':
|
||||
return invokeChain(node.chain, context);
|
||||
|
@ -58,7 +46,11 @@ export function interpretProvider(config) {
|
|||
// in this case, it will try to execute the function in another context
|
||||
if (!fnDef) {
|
||||
chain.unshift(link);
|
||||
return onFunctionNotFound({ type: 'expression', chain: chain }, context);
|
||||
try {
|
||||
return await onFunctionNotFound({ type: 'expression', chain: chain }, context);
|
||||
} catch (e) {
|
||||
return createError(e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -69,16 +61,15 @@ export function interpretProvider(config) {
|
|||
const newContext = await invokeFunction(fnDef, context, resolvedArgs);
|
||||
|
||||
// if something failed, just return the failure
|
||||
if (getType(newContext) === 'error') {
|
||||
console.log('newContext error', newContext);
|
||||
return newContext;
|
||||
}
|
||||
if (getType(newContext) === 'error') return newContext;
|
||||
|
||||
// Continue re-invoking chain until it's empty
|
||||
return await invokeChain(chain, newContext);
|
||||
} catch (err) {
|
||||
console.error(`common/interpret ${fnName}: invokeChain rejected`, err);
|
||||
return createError(err, { name: fnName, context, args: fnArgs });
|
||||
} catch (e) {
|
||||
// Everything that throws from a function will hit this
|
||||
// The interpreter should *never* fail. It should always return a `{type: error}` on failure
|
||||
e.message = `[${fnName}] > ${e.message}`;
|
||||
return createError(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,6 +156,7 @@ export function interpretProvider(config) {
|
|||
return argAsts.map(argAst => {
|
||||
return async (ctx = context) => {
|
||||
const newContext = await interpret(argAst, ctx);
|
||||
// This is why when any sub-expression errors, the entire thing errors
|
||||
if (getType(newContext) === 'error') throw newContext.error;
|
||||
return cast(newContext, argDefs[argName].types);
|
||||
};
|
||||
|
|
|
@ -46,19 +46,14 @@ export function socketInterpreterProvider({
|
|||
// set a unique message ID so the code knows what response to process
|
||||
const id = uuid();
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
return new Promise(resolve => {
|
||||
const { serialize, deserialize } = serializeProvider(types);
|
||||
|
||||
const listener = resp => {
|
||||
if (resp.error) {
|
||||
// cast error strings back into error instances
|
||||
const err = resp.error instanceof Error ? resp.error : new Error(resp.error);
|
||||
if (resp.stack) err.stack = resp.stack;
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(deserialize(resp.value));
|
||||
}
|
||||
};
|
||||
// This will receive {type: [msgSuccess || msgError] value: foo}
|
||||
// However it doesn't currently do anything with it. Which means `value`, regardless
|
||||
// of failure or success, needs to be something the interpreters would logically return
|
||||
// er, a primative or a {type: foo} object
|
||||
const listener = resp => resolve(deserialize(resp.value));
|
||||
|
||||
socket.once(`resp:${id}`, listener);
|
||||
|
||||
|
|
|
@ -7,13 +7,9 @@
|
|||
import { routes } from './server/routes';
|
||||
import { functionsRegistry } from './common/lib/functions_registry';
|
||||
import { commonFunctions } from './common/functions';
|
||||
import { loadServerPlugins } from './server/lib/load_server_plugins';
|
||||
import { populateServerRegistries } from './server/lib/server_registries';
|
||||
import { registerCanvasUsageCollector } from './server/usage';
|
||||
import {
|
||||
ecommerceSavedObjects,
|
||||
flightsSavedObjects,
|
||||
webLogsSavedObjects,
|
||||
} from './server/sample_data';
|
||||
import { loadSampleData } from './server/sample_data';
|
||||
|
||||
export default async function(server /*options*/) {
|
||||
server.injectUiAppVars('canvas', () => {
|
||||
|
@ -34,30 +30,10 @@ export default async function(server /*options*/) {
|
|||
// There are some common functions that use private APIs, load them here
|
||||
commonFunctions.forEach(func => functionsRegistry.register(func));
|
||||
|
||||
await loadServerPlugins();
|
||||
routes(server);
|
||||
registerCanvasUsageCollector(server);
|
||||
loadSampleData(server);
|
||||
|
||||
const now = new Date();
|
||||
const nowTimestamp = now.toISOString();
|
||||
function updateCanvasWorkpadTimestamps(savedObjects) {
|
||||
return savedObjects.map(savedObject => {
|
||||
if (savedObject.type === 'canvas-workpad') {
|
||||
savedObject.attributes['@timestamp'] = nowTimestamp;
|
||||
savedObject.attributes['@created'] = nowTimestamp;
|
||||
}
|
||||
|
||||
return savedObject;
|
||||
});
|
||||
}
|
||||
|
||||
server.addSavedObjectsToSampleDataset(
|
||||
'ecommerce',
|
||||
updateCanvasWorkpadTimestamps(ecommerceSavedObjects)
|
||||
);
|
||||
server.addSavedObjectsToSampleDataset(
|
||||
'flights',
|
||||
updateCanvasWorkpadTimestamps(flightsSavedObjects)
|
||||
);
|
||||
server.addSavedObjectsToSampleDataset('logs', updateCanvasWorkpadTimestamps(webLogsSavedObjects));
|
||||
// Do not initialize the app until the registries are populated
|
||||
await populateServerRegistries(['serverFunctions', 'types']);
|
||||
routes(server);
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import { connect } from 'react-redux';
|
|||
import { compose, withProps } from 'recompose';
|
||||
import { createSocket } from '../../socket';
|
||||
import { initialize as initializeInterpreter } from '../../lib/interpreter';
|
||||
import { populateBrowserRegistries } from '../../lib/browser_registries';
|
||||
import { getAppReady, getBasePath } from '../../state/selectors/app';
|
||||
import { appReady, appError } from '../../state/actions/app';
|
||||
import { trackRouteChange } from './track_route_change';
|
||||
|
@ -28,6 +29,7 @@ const mapDispatchToProps = dispatch => ({
|
|||
setAppReady: basePath => async () => {
|
||||
// initialize the socket and interpreter
|
||||
createSocket(basePath);
|
||||
await populateBrowserRegistries();
|
||||
await initializeInterpreter();
|
||||
|
||||
// set app state to ready
|
||||
|
|
|
@ -11,7 +11,6 @@ import { get } from 'lodash';
|
|||
import { ShowDebugging } from './show_debugging';
|
||||
|
||||
export const Error = ({ payload }) => {
|
||||
const functionName = get(payload, 'info.functionName');
|
||||
const message = get(payload, 'error.message');
|
||||
|
||||
return (
|
||||
|
@ -21,10 +20,7 @@ export const Error = ({ payload }) => {
|
|||
iconType="cross"
|
||||
title="Whoops! Expression failed"
|
||||
>
|
||||
<p>
|
||||
The function <strong>"{functionName}"</strong> failed
|
||||
{message ? ' with the following message:' : '.'}
|
||||
</p>
|
||||
<p>{message ? 'Expression failed with the message:' : ''}</p>
|
||||
{message && <p style={{ padding: '0 16px' }}>{message}</p>}
|
||||
|
||||
<ShowDebugging payload={payload} />
|
||||
|
|
74
x-pack/plugins/canvas/public/lib/browser_registries.js
Normal file
74
x-pack/plugins/canvas/public/lib/browser_registries.js
Normal file
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import chrome from 'ui/chrome';
|
||||
import $script from 'scriptjs';
|
||||
import { typesRegistry } from '../../common/lib/types_registry';
|
||||
import {
|
||||
argTypeRegistry,
|
||||
datasourceRegistry,
|
||||
transformRegistry,
|
||||
modelRegistry,
|
||||
viewRegistry,
|
||||
} from '../expression_types';
|
||||
import { elementsRegistry } from './elements_registry';
|
||||
import { renderFunctionsRegistry } from './render_functions_registry';
|
||||
import { functionsRegistry as browserFunctions } from './functions_registry';
|
||||
import { loadPrivateBrowserFunctions } from './load_private_browser_functions';
|
||||
|
||||
const registries = {
|
||||
browserFunctions: browserFunctions,
|
||||
commonFunctions: browserFunctions,
|
||||
elements: elementsRegistry,
|
||||
types: typesRegistry,
|
||||
renderers: renderFunctionsRegistry,
|
||||
transformUIs: transformRegistry,
|
||||
datasourceUIs: datasourceRegistry,
|
||||
modelUIs: modelRegistry,
|
||||
viewUIs: viewRegistry,
|
||||
argumentUIs: argTypeRegistry,
|
||||
};
|
||||
|
||||
let resolve = null;
|
||||
let called = false;
|
||||
|
||||
const populatePromise = new Promise(_resolve => {
|
||||
resolve = _resolve;
|
||||
});
|
||||
|
||||
export const getBrowserRegistries = () => {
|
||||
return populatePromise;
|
||||
};
|
||||
|
||||
export const populateBrowserRegistries = () => {
|
||||
if (called) throw new Error('function should only be called once per process');
|
||||
called = true;
|
||||
|
||||
// loadPrivateBrowserFunctions is sync. No biggie.
|
||||
loadPrivateBrowserFunctions();
|
||||
|
||||
const remainingTypes = Object.keys(registries);
|
||||
const populatedTypes = {};
|
||||
|
||||
function loadType() {
|
||||
const type = remainingTypes.pop();
|
||||
window.canvas = window.canvas || {};
|
||||
window.canvas.register = d => registries[type].register(d);
|
||||
|
||||
// Load plugins one at a time because each needs a different loader function
|
||||
// $script will only load each of these once, we so can call this as many times as we need?
|
||||
const pluginPath = chrome.addBasePath(`/api/canvas/plugins?type=${type}`);
|
||||
$script(pluginPath, () => {
|
||||
populatedTypes[type] = registries[type];
|
||||
|
||||
if (remainingTypes.length) loadType();
|
||||
else resolve(populatedTypes);
|
||||
});
|
||||
}
|
||||
|
||||
if (remainingTypes.length) loadType();
|
||||
return populatePromise;
|
||||
};
|
|
@ -10,10 +10,11 @@ import { getSocket } from '../socket';
|
|||
import { typesRegistry } from '../../common/lib/types_registry';
|
||||
import { createHandlers } from './create_handlers';
|
||||
import { functionsRegistry } from './functions_registry';
|
||||
import { loadBrowserPlugins } from './load_browser_plugins';
|
||||
import { getBrowserRegistries } from './browser_registries';
|
||||
|
||||
let socket;
|
||||
let functionList;
|
||||
let resolve;
|
||||
const functionList = new Promise(_resolve => (resolve = _resolve));
|
||||
|
||||
export async function initialize() {
|
||||
socket = getSocket();
|
||||
|
@ -29,14 +30,14 @@ export async function initialize() {
|
|||
|
||||
// Create the function list
|
||||
socket.emit('getFunctionList');
|
||||
functionList = new Promise(resolve => socket.once('functionList', resolve));
|
||||
socket.once('functionList', resolve);
|
||||
return functionList;
|
||||
}
|
||||
|
||||
// Use the above promise to seed the interpreter with the functions it can defer to
|
||||
export async function interpretAst(ast, context) {
|
||||
// Load plugins before attempting to get functions, otherwise this gets racey
|
||||
return Promise.all([functionList, loadBrowserPlugins()])
|
||||
return Promise.all([functionList, getBrowserRegistries()])
|
||||
.then(([serverFunctionList]) => {
|
||||
return socketInterpreterProvider({
|
||||
types: typesRegistry.toJS(),
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import chrome from 'ui/chrome';
|
||||
import $script from 'scriptjs';
|
||||
import { typesRegistry } from '../../common/lib/types_registry';
|
||||
import {
|
||||
argTypeRegistry,
|
||||
datasourceRegistry,
|
||||
transformRegistry,
|
||||
modelRegistry,
|
||||
viewRegistry,
|
||||
} from '../expression_types';
|
||||
import { elementsRegistry } from './elements_registry';
|
||||
import { renderFunctionsRegistry } from './render_functions_registry';
|
||||
import { functionsRegistry as browserFunctions } from './functions_registry';
|
||||
import { loadPrivateBrowserFunctions } from './load_private_browser_functions';
|
||||
|
||||
const types = {
|
||||
browserFunctions: browserFunctions,
|
||||
commonFunctions: browserFunctions,
|
||||
elements: elementsRegistry,
|
||||
types: typesRegistry,
|
||||
renderers: renderFunctionsRegistry,
|
||||
transformUIs: transformRegistry,
|
||||
datasourceUIs: datasourceRegistry,
|
||||
modelUIs: modelRegistry,
|
||||
viewUIs: viewRegistry,
|
||||
argumentUIs: argTypeRegistry,
|
||||
};
|
||||
|
||||
export const loadBrowserPlugins = () =>
|
||||
new Promise(resolve => {
|
||||
loadPrivateBrowserFunctions();
|
||||
const remainingTypes = Object.keys(types);
|
||||
function loadType() {
|
||||
const type = remainingTypes.pop();
|
||||
window.canvas = window.canvas || {};
|
||||
window.canvas.register = d => types[type].register(d);
|
||||
// Load plugins one at a time because each needs a different loader function
|
||||
// $script will only load each of these once, we so can call this as many times as we need?
|
||||
const pluginPath = chrome.addBasePath(`/api/canvas/plugins?type=${type}`);
|
||||
$script(pluginPath, () => {
|
||||
if (remainingTypes.length) loadType();
|
||||
else resolve(true);
|
||||
});
|
||||
}
|
||||
|
||||
loadType();
|
||||
});
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
import io from 'socket.io-client';
|
||||
import { functionsRegistry } from '../common/lib/functions_registry';
|
||||
import { loadBrowserPlugins } from './lib/load_browser_plugins';
|
||||
import { getBrowserRegistries } from './lib/browser_registries';
|
||||
|
||||
let socket;
|
||||
|
||||
|
@ -14,7 +14,7 @@ export function createSocket(basePath) {
|
|||
socket = io(undefined, { path: `${basePath}/socket.io` });
|
||||
|
||||
socket.on('getFunctionList', () => {
|
||||
const pluginsLoaded = loadBrowserPlugins();
|
||||
const pluginsLoaded = getBrowserRegistries();
|
||||
|
||||
pluginsLoaded.then(() => socket.emit('functionList', functionsRegistry.toJS()));
|
||||
});
|
||||
|
|
|
@ -9,7 +9,9 @@ import ss from 'stream-stream';
|
|||
import { getPluginPaths } from './get_plugin_paths';
|
||||
|
||||
export const getPluginStream = type => {
|
||||
const stream = ss();
|
||||
const stream = ss({
|
||||
separator: '\n',
|
||||
});
|
||||
|
||||
getPluginPaths(type).then(files => {
|
||||
files.forEach(file => {
|
||||
|
|
44
x-pack/plugins/canvas/server/lib/route_expression/browser.js
Normal file
44
x-pack/plugins/canvas/server/lib/route_expression/browser.js
Normal file
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import uuid from 'uuid/v4';
|
||||
|
||||
export const browser = ({ socket, serialize, deserialize }) => {
|
||||
// Note that we need to be careful about how many times routeExpressionProvider is called, because of the socket.once below.
|
||||
// It's too bad we can't get a list of browser plugins on the server
|
||||
const getClientFunctions = new Promise(resolve => {
|
||||
socket.emit('getFunctionList');
|
||||
socket.once('functionList', resolve);
|
||||
});
|
||||
|
||||
return getClientFunctions.then(functions => {
|
||||
return {
|
||||
interpret: (ast, context) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const id = uuid();
|
||||
const listener = resp => {
|
||||
if (resp.type === 'msgError') {
|
||||
const { value } = resp;
|
||||
// cast error strings back into error instances
|
||||
const err = value instanceof Error ? value : new Error(value);
|
||||
if (value.stack) err.stack = value.stack;
|
||||
// Reject's with a legit error. Check! Environments should always reject with an error when something bad happens
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(deserialize(resp.value));
|
||||
}
|
||||
};
|
||||
|
||||
// {type: msgSuccess or msgError, value: foo}. Doesn't matter if it's success or error, we do the same thing for now
|
||||
socket.once(`resp:${id}`, listener);
|
||||
|
||||
socket.emit('run', { ast, context: serialize(context), id });
|
||||
});
|
||||
},
|
||||
getFunctions: () => Object.keys(functions),
|
||||
};
|
||||
});
|
||||
};
|
32
x-pack/plugins/canvas/server/lib/route_expression/index.js
Normal file
32
x-pack/plugins/canvas/server/lib/route_expression/index.js
Normal file
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import { createError } from '../../../common/interpreter/create_error';
|
||||
|
||||
export const routeExpressionProvider = environments => {
|
||||
async function routeExpression(ast, context = null) {
|
||||
// List of environments in order of preference
|
||||
|
||||
return Promise.all(environments).then(environments => {
|
||||
const environmentFunctions = environments.map(env => env.getFunctions());
|
||||
|
||||
// Grab name of the first function in the chain
|
||||
const fnName = ast.chain[0].function.toLowerCase();
|
||||
|
||||
// Check each environment for that function
|
||||
for (let i = 0; i < environmentFunctions.length; i++) {
|
||||
if (environmentFunctions[i].includes(fnName)) {
|
||||
// If we find it, run in that environment, and only that environment
|
||||
return environments[i].interpret(ast, context).catch(e => createError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// If the function isn't found in any environment, give up
|
||||
throw new Error(`Function not found: [${fnName}]`);
|
||||
});
|
||||
}
|
||||
|
||||
return routeExpression;
|
||||
};
|
33
x-pack/plugins/canvas/server/lib/route_expression/server.js
Normal file
33
x-pack/plugins/canvas/server/lib/route_expression/server.js
Normal file
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { getServerRegistries } from '../server_registries';
|
||||
import { interpretProvider } from '../../../common/interpreter/interpret';
|
||||
import { createHandlers } from '../create_handlers';
|
||||
import { getRequest } from '../../lib/get_request';
|
||||
|
||||
export const server = ({ onFunctionNotFound, server, socket }) => {
|
||||
const pluginsReady = getServerRegistries(['serverFunctions', 'types']);
|
||||
|
||||
return Promise.all([pluginsReady, getRequest(server, socket.handshake)]).then(
|
||||
([{ serverFunctions, types }, request]) => {
|
||||
// 'request' is the modified hapi request object
|
||||
return {
|
||||
interpret: (ast, context) => {
|
||||
const interpret = interpretProvider({
|
||||
types: types.toJS(),
|
||||
functions: serverFunctions.toJS(),
|
||||
handlers: createHandlers(request, server),
|
||||
onFunctionNotFound,
|
||||
});
|
||||
|
||||
return interpret(ast, context);
|
||||
},
|
||||
getFunctions: () => Object.keys(serverFunctions.toJS()),
|
||||
};
|
||||
}
|
||||
);
|
||||
};
|
|
@ -0,0 +1,9 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
// The babel-register below uses .babelrc by default.
|
||||
require('babel-register');
|
||||
require('./worker');
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { fork } from 'child_process';
|
||||
import { resolve } from 'path';
|
||||
import uuid from 'uuid/v4';
|
||||
|
||||
// If the worker doesn't response in 10s, kill it.
|
||||
const WORKER_TIMEOUT = 20000;
|
||||
const workerPath = resolve(__dirname, 'babeled.js');
|
||||
const heap = {};
|
||||
let worker = null;
|
||||
|
||||
export function getWorker() {
|
||||
if (worker) return worker;
|
||||
worker = fork(workerPath, {});
|
||||
|
||||
// 'exit' happens whether we kill the worker or it just dies.
|
||||
// No need to look for 'error', our worker is intended to be long lived so it isn't running, it's an issue
|
||||
worker.on('exit', () => {
|
||||
// Heads up: there is no worker.off
|
||||
worker = null;
|
||||
// Restart immediately on exit since node takes a couple seconds to spin up
|
||||
worker = getWorker();
|
||||
});
|
||||
|
||||
worker.on('message', msg => {
|
||||
const { type, value, id } = msg;
|
||||
if (type === 'run') {
|
||||
const { threadId } = msg;
|
||||
const { ast, context } = value;
|
||||
heap[threadId]
|
||||
.onFunctionNotFound(ast, context)
|
||||
.then(value => {
|
||||
worker.send({ type: 'msgSuccess', id, value: value });
|
||||
})
|
||||
.catch(e => heap[threadId].reject(e));
|
||||
}
|
||||
|
||||
if (type === 'msgSuccess' && heap[id]) heap[id].resolve(value);
|
||||
|
||||
// TODO: I don't think it is even possible to hit this
|
||||
if (type === 'msgError' && heap[id]) heap[id].reject(new Error(value));
|
||||
});
|
||||
|
||||
return worker;
|
||||
}
|
||||
|
||||
// All serialize/deserialize must occur in here. We should not return serialized stuff to the expressionRouter
|
||||
export const thread = ({ onFunctionNotFound, serialize, deserialize }) => {
|
||||
const getWorkerFunctions = new Promise(resolve => {
|
||||
const worker = getWorker();
|
||||
worker.send({ type: 'getFunctions' });
|
||||
worker.on('message', msg => {
|
||||
if (msg.type === 'functionList') resolve(msg.value);
|
||||
});
|
||||
});
|
||||
|
||||
return getWorkerFunctions.then(functions => {
|
||||
return {
|
||||
interpret: (ast, context) => {
|
||||
const worker = getWorker();
|
||||
const id = uuid();
|
||||
worker.send({ type: 'run', id, value: { ast, context: serialize(context) } });
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
heap[id] = {
|
||||
time: new Date().getTime(),
|
||||
resolve: value => {
|
||||
delete heap[id];
|
||||
resolve(deserialize(value));
|
||||
},
|
||||
reject: e => {
|
||||
delete heap[id];
|
||||
reject(e);
|
||||
},
|
||||
onFunctionNotFound: (ast, context) =>
|
||||
onFunctionNotFound(ast, deserialize(context)).then(serialize),
|
||||
};
|
||||
|
||||
//
|
||||
setTimeout(() => {
|
||||
if (!heap[id]) return; // Looks like this has already been cleared from the heap.
|
||||
if (worker) worker.kill();
|
||||
|
||||
// The heap will be cleared because the reject on heap will delete its own id
|
||||
heap[id].reject(new Error('Request timed out'));
|
||||
}, WORKER_TIMEOUT);
|
||||
});
|
||||
},
|
||||
|
||||
getFunctions: () => functions,
|
||||
};
|
||||
});
|
||||
};
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import uuid from 'uuid/v4';
|
||||
import { populateServerRegistries } from '../../server_registries';
|
||||
import { interpretProvider } from '../../../../common/interpreter/interpret';
|
||||
import { serializeProvider } from '../../../../common/lib/serialize';
|
||||
|
||||
// We actually DO need populateServerRegistries here since this is a different node process
|
||||
const pluginsReady = populateServerRegistries(['commonFunctions', 'types']);
|
||||
const heap = {};
|
||||
|
||||
process.on('message', msg => {
|
||||
const { type, id, value } = msg;
|
||||
const threadId = id;
|
||||
|
||||
pluginsReady.then(({ commonFunctions, types }) => {
|
||||
types = types.toJS();
|
||||
const { serialize, deserialize } = serializeProvider(types);
|
||||
const interpret = interpretProvider({
|
||||
types,
|
||||
functions: commonFunctions.toJS(),
|
||||
handlers: { environment: 'serverThreaded' },
|
||||
onFunctionNotFound: (ast, context) => {
|
||||
const id = uuid();
|
||||
// This needs to send a message to the main thread, and receive a response. Uhg.
|
||||
process.send({
|
||||
type: 'run',
|
||||
threadId,
|
||||
id,
|
||||
value: {
|
||||
ast,
|
||||
context: serialize(context),
|
||||
},
|
||||
});
|
||||
|
||||
// Note that there is no facility to reject here. That's because this would only occur as the result of something that happens in the main thread, and we reject there
|
||||
return new Promise(resolve => {
|
||||
heap[id] = { resolve };
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
if (type === 'getFunctions')
|
||||
process.send({ type: 'functionList', value: Object.keys(commonFunctions.toJS()) });
|
||||
|
||||
if (type === 'msgSuccess') {
|
||||
heap[id].resolve(deserialize(value));
|
||||
delete heap[id];
|
||||
}
|
||||
|
||||
if (type === 'run') {
|
||||
const { ast, context } = msg.value;
|
||||
|
||||
interpret(ast, deserialize(context))
|
||||
.then(value => {
|
||||
process.send({ type: 'msgSuccess', value: serialize(value), id });
|
||||
})
|
||||
// TODO: I don't think it is even possible to hit this
|
||||
.catch(value => {
|
||||
process.send({ type: 'msgError', value, id });
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
|
@ -8,32 +8,48 @@ import { typesRegistry } from '../../common/lib/types_registry';
|
|||
import { functionsRegistry as serverFunctions } from '../../common/lib/functions_registry';
|
||||
import { getPluginPaths } from './get_plugin_paths';
|
||||
|
||||
const types = {
|
||||
const registries = {
|
||||
serverFunctions: serverFunctions,
|
||||
commonFunctions: serverFunctions,
|
||||
types: typesRegistry,
|
||||
};
|
||||
|
||||
const loaded = new Promise(resolve => {
|
||||
const remainingTypes = Object.keys(types);
|
||||
let resolve = null;
|
||||
let called = false;
|
||||
|
||||
const populatePromise = new Promise(_resolve => {
|
||||
resolve = _resolve;
|
||||
});
|
||||
|
||||
export const getServerRegistries = () => {
|
||||
return populatePromise;
|
||||
};
|
||||
|
||||
export const populateServerRegistries = types => {
|
||||
if (called) throw new Error('function should only be called once per process');
|
||||
called = true;
|
||||
if (!types || !types.length) throw new Error('types is required');
|
||||
|
||||
const remainingTypes = types;
|
||||
const populatedTypes = {};
|
||||
|
||||
const loadType = () => {
|
||||
const type = remainingTypes.pop();
|
||||
getPluginPaths(type).then(paths => {
|
||||
global.canvas = global.canvas || {};
|
||||
global.canvas.register = d => types[type].register(d);
|
||||
global.canvas.register = d => registries[type].register(d);
|
||||
|
||||
paths.forEach(path => {
|
||||
require(path);
|
||||
});
|
||||
|
||||
global.canvas = undefined;
|
||||
populatedTypes[type] = registries[type];
|
||||
if (remainingTypes.length) loadType();
|
||||
else resolve(true);
|
||||
else resolve(populatedTypes);
|
||||
});
|
||||
};
|
||||
|
||||
loadType();
|
||||
});
|
||||
|
||||
export const loadServerPlugins = () => loaded;
|
||||
if (remainingTypes.length) loadType();
|
||||
return populatePromise;
|
||||
};
|
|
@ -5,51 +5,45 @@
|
|||
*/
|
||||
|
||||
import socket from 'socket.io';
|
||||
import { createHandlers } from '../lib/create_handlers';
|
||||
import { socketInterpreterProvider } from '../../common/interpreter/socket_interpret';
|
||||
import { serializeProvider } from '../../common/lib/serialize';
|
||||
import { functionsRegistry } from '../../common/lib/functions_registry';
|
||||
import { typesRegistry } from '../../common/lib/types_registry';
|
||||
import { loadServerPlugins } from '../lib/load_server_plugins';
|
||||
import { getRequest } from '../lib/get_request';
|
||||
import { getServerRegistries } from '../lib/server_registries';
|
||||
import { routeExpressionProvider } from '../lib/route_expression';
|
||||
import { browser } from '../lib/route_expression/browser';
|
||||
import { thread } from '../lib/route_expression/thread';
|
||||
import { server as serverEnv } from '../lib/route_expression/server';
|
||||
|
||||
export function socketApi(server) {
|
||||
const io = socket(server.listener, { path: '/socket.io' });
|
||||
|
||||
io.on('connection', socket => {
|
||||
// Create the function list
|
||||
socket.emit('getFunctionList');
|
||||
const getClientFunctions = new Promise(resolve => socket.once('functionList', resolve));
|
||||
const types = typesRegistry.toJS();
|
||||
const { serialize, deserialize } = serializeProvider(types);
|
||||
|
||||
// I'd love to find a way to generalize all of these, but they each need a different set of things
|
||||
// Note that ORDER MATTERS here. The environments will be tried in this order. Do not reorder this array.
|
||||
const routeExpression = routeExpressionProvider([
|
||||
thread({ onFunctionNotFound, serialize, deserialize }),
|
||||
serverEnv({ onFunctionNotFound, socket, server }),
|
||||
browser({ onFunctionNotFound, socket, serialize, deserialize }),
|
||||
]);
|
||||
|
||||
function onFunctionNotFound(ast, context) {
|
||||
return routeExpression(ast, context);
|
||||
}
|
||||
|
||||
socket.on('getFunctionList', () => {
|
||||
loadServerPlugins().then(() => socket.emit('functionList', functionsRegistry.toJS()));
|
||||
getServerRegistries().then(({ serverFunctions }) =>
|
||||
socket.emit('functionList', serverFunctions.toJS())
|
||||
);
|
||||
});
|
||||
|
||||
const handler = ({ ast, context, id }) => {
|
||||
Promise.all([getClientFunctions, getRequest(server, socket.handshake)]).then(
|
||||
([clientFunctions, request]) => {
|
||||
// request is the modified hapi request object
|
||||
const types = typesRegistry.toJS();
|
||||
const interpret = socketInterpreterProvider({
|
||||
types,
|
||||
functions: functionsRegistry.toJS(),
|
||||
handlers: createHandlers(request, server),
|
||||
referableFunctions: clientFunctions,
|
||||
socket: socket,
|
||||
});
|
||||
|
||||
const { serialize, deserialize } = serializeProvider(types);
|
||||
return interpret(ast, deserialize(context))
|
||||
.then(value => {
|
||||
socket.emit(`resp:${id}`, { value: serialize(value) });
|
||||
})
|
||||
.catch(e => {
|
||||
socket.emit(`resp:${id}`, {
|
||||
error: e.message,
|
||||
stack: e.stack,
|
||||
});
|
||||
});
|
||||
}
|
||||
return (
|
||||
routeExpression(ast, deserialize(context))
|
||||
.then(value => socket.emit(`resp:${id}`, { type: 'msgSuccess', value: serialize(value) }))
|
||||
// TODO: I don't think it is possible to hit this right now? Maybe ever?
|
||||
.catch(e => socket.emit(`resp:${id}`, { type: 'msgError', value: e }))
|
||||
);
|
||||
};
|
||||
|
||||
|
|
|
@ -7,5 +7,6 @@
|
|||
import ecommerceSavedObjects from './ecommerce_saved_objects.json';
|
||||
import flightsSavedObjects from './flights_saved_objects.json';
|
||||
import webLogsSavedObjects from './web_logs_saved_objects.json';
|
||||
import { loadSampleData } from './load_sample_data';
|
||||
|
||||
export { ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects };
|
||||
export { loadSampleData, ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects };
|
||||
|
|
32
x-pack/plugins/canvas/server/sample_data/load_sample_data.js
Normal file
32
x-pack/plugins/canvas/server/sample_data/load_sample_data.js
Normal file
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects } from './index';
|
||||
|
||||
export function loadSampleData(server) {
|
||||
const now = new Date();
|
||||
const nowTimestamp = now.toISOString();
|
||||
function updateCanvasWorkpadTimestamps(savedObjects) {
|
||||
return savedObjects.map(savedObject => {
|
||||
if (savedObject.type === 'canvas-workpad') {
|
||||
savedObject.attributes['@timestamp'] = nowTimestamp;
|
||||
savedObject.attributes['@created'] = nowTimestamp;
|
||||
}
|
||||
|
||||
return savedObject;
|
||||
});
|
||||
}
|
||||
|
||||
server.addSavedObjectsToSampleDataset(
|
||||
'ecommerce',
|
||||
updateCanvasWorkpadTimestamps(ecommerceSavedObjects)
|
||||
);
|
||||
server.addSavedObjectsToSampleDataset(
|
||||
'flights',
|
||||
updateCanvasWorkpadTimestamps(flightsSavedObjects)
|
||||
);
|
||||
server.addSavedObjectsToSampleDataset('logs', updateCanvasWorkpadTimestamps(webLogsSavedObjects));
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue