[7.x] Add http/1 response streaming to Canvas batches (#32027) (#32309)

This commit is contained in:
Chris Davies 2019-03-04 10:24:20 -05:00 committed by GitHub
parent a2eab3e9a8
commit c52abfd863
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 570 additions and 62 deletions

View file

@ -6,6 +6,7 @@ bower_components
/plugins
/optimize
/built_assets
/html_docs
/src/fixtures/vislib/mock_data
/src/legacy/ui/public/angular-bootstrap
/src/legacy/ui/public/flot-charts

View file

@ -23,7 +23,7 @@ import { FUNCTIONS_URL } from './consts';
* Create a function which executes an Expression function on the
* server as part of a larger batch of executions.
*/
export function batchedFetch({ kfetch, serialize, ms = 10 }) {
export function batchedFetch({ ajaxStream, serialize, ms = 10 }) {
// Uniquely identifies each function call in a batch operation
// so that the appropriate promise can be resolved / rejected later.
let id = 0;
@ -42,7 +42,7 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) {
};
const runBatch = () => {
processBatch(kfetch, batch);
processBatch(ajaxStream, batch);
reset();
};
@ -70,14 +70,15 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) {
function createFuture() {
let resolve;
let reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return {
resolve(val) { return resolve(val); },
reject(val) { return reject(val); },
promise: new Promise((res, rej) => {
resolve = res;
reject = rej;
}),
resolve,
reject,
promise,
};
}
@ -85,22 +86,21 @@ function createFuture() {
* Runs the specified batch of functions on the server, then resolves
* the related promises.
*/
async function processBatch(kfetch, batch) {
async function processBatch(ajaxStream, batch) {
try {
const { results } = await kfetch({
pathname: FUNCTIONS_URL,
method: 'POST',
await ajaxStream({
url: FUNCTIONS_URL,
body: JSON.stringify({
functions: Object.values(batch).map(({ request }) => request),
}),
});
onResponse({ id, statusCode, result }) {
const { future } = batch[id];
results.forEach(({ id, result }) => {
const { future } = batch[id];
if (result.statusCode && result.err) {
future.reject(result);
} else {
future.resolve(result);
if (statusCode >= 400) {
future.reject(result);
} else {
future.resolve(result);
}
}
});
} catch (err) {

View file

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { batchedFetch } from './batched_fetch';
const serialize = (o) => JSON.stringify(o);
describe('batchedFetch', () => {
it('resolves the correct promise', async () => {
const ajaxStream = jest.fn(async ({ body, onResponse }) => {
const { functions } = JSON.parse(body);
functions.map(({ id, functionName, context, args }) => onResponse({
id,
statusCode: 200,
result: `${functionName}${context}${args}`,
}));
});
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const result = await Promise.all([
ajax({ functionName: 'a', context: 1, args: 'aaa' }),
ajax({ functionName: 'b', context: 2, args: 'bbb' }),
]);
expect(result).toEqual([
'a1aaa',
'b2bbb',
]);
});
it('rejects responses whose statusCode is >= 300', async () => {
const ajaxStream = jest.fn(async ({ body, onResponse }) => {
const { functions } = JSON.parse(body);
functions.map(({ id, functionName, context, args }) => onResponse({
id,
statusCode: context,
result: context >= 400 ? { err: {} } : `${functionName}${context}${args}`,
}));
});
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const result = await Promise.all([
ajax({ functionName: 'a', context: 500, args: 'aaa' }).catch(() => 'fail'),
ajax({ functionName: 'b', context: 400, args: 'bbb' }).catch(() => 'fail'),
ajax({ functionName: 'c', context: 200, args: 'ccc' }),
]);
expect(result).toEqual([
'fail',
'fail',
'c200ccc'
]);
});
});

View file

@ -23,11 +23,11 @@ import { createHandlers } from './create_handlers';
import { batchedFetch } from './batched_fetch';
import { FUNCTIONS_URL } from './consts';
export async function initializeInterpreter(kfetch, typesRegistry, functionsRegistry) {
export async function initializeInterpreter({ kfetch, ajaxStream, typesRegistry, functionsRegistry }) {
const serverFunctionList = await kfetch({ pathname: FUNCTIONS_URL });
const types = typesRegistry.toJS();
const { serialize } = serializeProvider(types);
const batch = batchedFetch({ kfetch, serialize });
const batch = batchedFetch({ ajaxStream, serialize });
// For every sever-side function, register a client-side
// function that matches its definition, but which simply

View file

@ -35,26 +35,21 @@ jest.mock('./create_handlers', () => ({
describe('kbn-interpreter/interpreter', () => {
it('loads server-side functions', async () => {
const kfetch = jest.fn(async () => ({}));
const ajaxStream = jest.fn(async () => ({}));
await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register: () => {} }));
await initializeInterpreter({
kfetch,
ajaxStream,
typesRegistry: { toJS: () => ({}) },
functionsRegistry: ({ register: () => {} }),
});
expect(kfetch).toHaveBeenCalledTimes(1);
expect(kfetch).toHaveBeenCalledWith({ pathname: FUNCTIONS_URL });
});
it('registers client-side functions that pass through to the server', async () => {
const kfetch = jest.fn(async ({ method }) => {
if (method === 'POST') {
return {
results: [{
id: 1,
result: {
hello: 'world',
},
}],
};
}
const kfetch = jest.fn(async () => {
return {
hello: { name: 'hello' },
world: { name: 'world' },
@ -62,8 +57,16 @@ describe('kbn-interpreter/interpreter', () => {
});
const register = jest.fn();
const ajaxStream = jest.fn(async ({ onResponse }) => {
onResponse({ id: 1, result: { hello: 'world' } });
});
await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register }));
await initializeInterpreter({
kfetch,
ajaxStream,
typesRegistry: { toJS: () => ({}) },
functionsRegistry: ({ register }),
});
expect(register).toHaveBeenCalledTimes(2);
@ -81,9 +84,9 @@ describe('kbn-interpreter/interpreter', () => {
expect(result).toEqual({ hello: 'world' });
expect(kfetch).toHaveBeenCalledWith({
pathname: FUNCTIONS_URL,
method: 'POST',
expect(ajaxStream).toHaveBeenCalledWith({
url: FUNCTIONS_URL,
onResponse: expect.any(Function),
body: JSON.stringify({
functions: [{
id: 1,

View file

@ -20,6 +20,7 @@
import { register } from '@kbn/interpreter/common';
import { initializeInterpreter, registries } from '@kbn/interpreter/public';
import { kfetch } from 'ui/kfetch';
import { ajaxStream } from 'ui/ajax_stream';
import { functions } from './functions';
import { visualization } from './renderers/visualization';
@ -32,7 +33,12 @@ let _resolve;
let _interpreterPromise;
const initialize = async () => {
initializeInterpreter(kfetch, registries.types, registries.browserFunctions).then(interpreter => {
initializeInterpreter({
kfetch,
ajaxStream,
typesRegistry: registries.types,
functionsRegistry: registries.browserFunctions,
}).then(interpreter => {
_resolve({ interpreter });
});
};

View file

@ -65,37 +65,64 @@ function runServerFunctions(server) {
const handlers = await createHandlers(req, server);
const { functions } = req.payload;
// Process each function individually, and bundle up respones / errors into
// the format expected by the front-end batcher.
const results = await Promise.all(functions.map(async ({ id, ...fnCall }) => {
const result = await runFunction(server, handlers, fnCall)
.catch(err => {
if (Boom.isBoom(err)) {
return { err, statusCode: err.statusCode, message: err.output.payload };
}
return { err: 'Internal Server Error', statusCode: 500, message: 'See server logs for details.' };
});
// Grab the raw Node response object.
const res = req.raw.res;
if (result == null) {
const { functionName } = fnCall;
return {
id,
result: {
err: `No result from '${functionName}'`,
statusCode: 500,
message: `Function '${functionName}' did not return anything`
}
};
// Tell Hapi not to manage the response https://github.com/hapijs/hapi/issues/3884
req._isReplied = true;
// Send the initial headers.
res.writeHead(200, {
'Content-Type': 'text/plain',
'Connection': 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
});
// Write a length-delimited response
const streamResult = (result) => {
const payload = JSON.stringify(result) + '\n';
res.write(`${payload.length}:${payload}`);
};
// Tries to run an interpreter function, and ensures a consistent error payload on failure.
const tryFunction = async (id, fnCall) => {
try {
const result = await runFunction(server, handlers, fnCall);
if (result != null) {
return { id, statusCode: 200, result };
}
return batchError(id, `Function ${fnCall.functionName} did not return anything.`);
} catch (err) {
if (Boom.isBoom(err)) {
return batchError(id, err.output.payload, err.statusCode);
}
return batchError(id, 'See server logs for details.');
}
};
return { id, result };
}));
// Process each function individually, and stream the responses back to the client
await Promise.all(functions.map(({ id, ...fnCall }) => tryFunction(id, fnCall).then(streamResult)));
return { results };
// All of the responses have been written, so we can close the response.
res.end();
},
});
}
/**
* A helper function for bundling up errors.
*/
function batchError(id, message, statusCode = 500) {
return {
id,
statusCode,
result: { statusCode, message },
};
}
/**
* Register the endpoint that returns the list of server-only functions.
* @param {*} server - The Kibana server

View file

@ -0,0 +1,199 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ajaxStream, XMLHttpRequestLike } from './ajax_stream';
// tslint:disable-next-line:no-empty
function noop() {}
describe('ajaxStream', () => {
it('pulls items from the stream and calls the handler', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`);
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
sendText(messages[0]);
sendText(messages[1]);
done();
await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'world' });
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
});
it('handles partial messages', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
for (const s of messages) {
sendText(s);
}
done();
await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'world' });
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
});
it('sends the request', async () => {
const handler = jest.fn(() => ({}));
const { req, done } = mockRequest();
const promise = ajaxStream('mehBasePath', { a: 'b' }, req, {
url: '/test/endpoint',
onResponse: handler,
body: 'whatup',
headers: { foo: 'bar' },
});
done();
await promise;
expect(req.open).toHaveBeenCalledWith('POST', 'mehBasePath/test/endpoint');
expect(req.setRequestHeader).toHaveBeenCalledWith('foo', 'bar');
expect(req.setRequestHeader).toHaveBeenCalledWith('a', 'b');
expect(req.send).toHaveBeenCalledWith('whatup');
});
it('rejects if network failure', async () => {
const handler = jest.fn(() => ({}));
const { req, done } = mockRequest();
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
body: 'whatup',
});
done(0);
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});
it('rejects if http status error', async () => {
const handler = jest.fn(() => ({}));
const { req, done } = mockRequest();
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
body: 'whatup',
});
done(400);
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});
it('rejects if the payload contains invalid JSON', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join('');
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
sendText(messages);
done();
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});
it('rejects if the delim is invalid', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = '{ "hi": "there" }';
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
sendText(messages);
done();
expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(
/invalid stream response/i
);
});
it('rejects if the handler throws', async () => {
const handler = jest.fn(() => {
throw new Error('DOH!');
});
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
sendText(messages);
done();
expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(/doh!/i);
});
});
function mockRequest() {
const req: XMLHttpRequestLike = {
onprogress: noop,
onreadystatechange: noop,
open: jest.fn(),
readyState: 0,
responseText: '',
send: jest.fn(),
setRequestHeader: jest.fn(),
abort: jest.fn(),
status: 0,
withCredentials: false,
};
return {
req,
sendText(text: string) {
req.responseText += text;
req.onreadystatechange();
req.onprogress();
},
done(status = 200) {
req.status = status;
req.readyState = 4;
req.onreadystatechange();
},
};
}

View file

@ -0,0 +1,167 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { once } from 'lodash';
/**
* This file contains the client-side logic for processing a streaming AJAX response.
* This allows things like request batching to process individual batch item results
* as soon as the server sends them, instead of waiting for the entire response before
* client-side processing can begin.
*
* The server sends responses in this format: {length}:{json}, for example:
*
* 18:{"hello":"world"}\n16:{"hello":"you"}\n
*/
// T is the response payload (the JSON), and we don't really
// care what it's type / shape is.
export type BatchResponseHandler<T> = (result: T) => void;
export interface BatchOpts<T> {
url: string;
onResponse: BatchResponseHandler<T>;
method?: string;
body?: string;
headers?: { [k: string]: string };
}
// The subset of XMLHttpRequest that we use
export interface XMLHttpRequestLike {
abort: () => void;
onreadystatechange: any;
onprogress: any;
open: (method: string, url: string) => void;
readyState: number;
responseText: string;
send: (body?: string) => void;
setRequestHeader: (header: string, value: string) => void;
status: number;
withCredentials: boolean;
}
// Create a function which, when successively passed streaming response text,
// calls a handler callback with each response in the batch.
function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
let index = 0;
return (text: string) => {
// While there's text to process...
while (index < text.length) {
// Our messages are delimited by colon: len:json
const delim = ':';
const delimIndex = text.indexOf(delim, index);
const payloadStart = delimIndex + delim.length;
// We've got an incomplete batch length
if (delimIndex < 0) {
return;
}
const rawLen = text.slice(index, delimIndex);
const payloadLen = parseInt(rawLen, 10);
const payloadEnd = payloadStart + payloadLen;
// We've got an invalid batch message (e.g. one without a numeric length: prefix)
if (isNaN(payloadLen)) {
throw new Error(`Invalid stream response length: ${rawLen}`);
}
// We've got an incomplete batch message
if (text.length < payloadEnd) {
return;
}
const payload = JSON.parse(text.slice(payloadStart, payloadEnd));
handler(payload);
index = payloadEnd;
}
};
}
/**
* Sends an AJAX request to the server, and processes the result as a
* streaming HTTP/1 response.
*
* @param basePath - The Kibana basepath
* @param defaultHeaders - The default HTTP headers to be sent with each request
* @param req - The XMLHttpRequest
* @param opts - The request options
* @returns A promise which resolves when the entire batch response has been processed.
*/
export function ajaxStream<T>(
basePath: string,
defaultHeaders: { [k: string]: string },
req: XMLHttpRequestLike,
opts: BatchOpts<T>
) {
return new Promise((resolve, reject) => {
const { url, method, headers } = opts;
// There are several paths by which the promise may resolve or reject. We wrap this
// in "once" as a safeguard against cases where we attempt more than one call. (e.g.
// a batch handler fails, so we reject the promise, but then new data comes in for
// a subsequent batch item)
const complete = once((err: Error | undefined = undefined) =>
err ? reject(err) : resolve(req)
);
// Begin the request
req.open(method || 'POST', `${basePath}/${url.replace(/^\//, '')}`);
req.withCredentials = true;
// Set the HTTP headers
Object.entries(Object.assign({}, defaultHeaders, headers)).forEach(([k, v]) =>
req.setRequestHeader(k, v)
);
const batchHandler = processBatchResponseStream(opts.onResponse);
const processBatch = () => {
try {
batchHandler(req.responseText);
} catch (err) {
req.abort();
complete(err);
}
};
req.onprogress = processBatch;
req.onreadystatechange = () => {
// Older browsers don't support onprogress, so we need
// to call this here, too. It's safe to call this multiple
// times even for the same progress event.
processBatch();
// 4 is the magic number that means the request is done
if (req.readyState === 4) {
// 0 indicates a network failure. 400+ messages are considered server errors
if (req.status === 0 || req.status >= 400) {
complete(new Error(`Batch request failed with status ${req.status}`));
} else {
complete();
}
}
};
// Send the payload to the server
req.send(opts.body);
});
}

View file

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import chrome from 'ui/chrome';
import { metadata } from 'ui/metadata';
import { ajaxStream as ajax, BatchOpts } from './ajax_stream';
const defaultHeaders = {
'Content-Type': 'application/json',
'kbn-version': metadata.version,
};
export { BatchOpts } from './ajax_stream';
export function ajaxStream<T>(opts: BatchOpts<T>) {
return ajax(chrome.getBasePath(), defaultHeaders, new XMLHttpRequest(), opts);
}