add createStreamingRequestHandler API (#66411)

This commit is contained in:
Pierre Gayvallet 2020-05-14 22:32:59 +02:00 committed by GitHub
parent 5333a046c1
commit 10e34fc86d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 12 deletions

View file

@ -21,6 +21,7 @@ import { PluginInitializerContext } from '../../../core/server';
import { BfetchServerPlugin } from './plugin';
export { BfetchServerSetup, BfetchServerStart, BatchProcessingRouteParams } from './plugin';
export { StreamingRequestHandler } from './types';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchServerPlugin(initializerContext);

View file

@ -28,6 +28,7 @@ const createSetupContract = (): Setup => {
const setupContract: Setup = {
addBatchProcessingRoute: jest.fn(),
addStreamingResponseRoute: jest.fn(),
createStreamingRequestHandler: jest.fn(),
};
return setupContract;
};

View file

@ -24,6 +24,8 @@ import {
Plugin,
Logger,
KibanaRequest,
RouteMethod,
RequestHandler,
} from 'src/core/server';
import { schema } from '@kbn/config-schema';
import { Subject } from 'rxjs';
@ -35,6 +37,7 @@ import {
removeLeadingSlash,
normalizeError,
} from '../common';
import { StreamingRequestHandler } from './types';
import { createNDJSONStream } from './streaming';
// eslint-disable-next-line
@ -47,6 +50,7 @@ export interface BatchProcessingRouteParams<BatchItemData, BatchItemResult> {
onBatchItem: (data: BatchItemData) => Promise<BatchItemResult>;
}
/** @public */
export interface BfetchServerSetup {
addBatchProcessingRoute: <BatchItemData extends object, BatchItemResult extends object>(
path: string,
@ -56,11 +60,48 @@ export interface BfetchServerSetup {
path: string,
params: (request: KibanaRequest) => StreamingResponseHandler<Payload, Response>
) => void;
/**
* Create a streaming request handler to be able to use an Observable to return chunked content to the client.
* This is meant to be used with the `fetchStreaming` API of the `bfetch` client-side plugin.
*
* @example
* ```ts
* setup({ http }: CoreStart, { bfetch }: SetupDeps) {
* const router = http.createRouter();
* router.post(
* {
* path: '/api/my-plugin/stream-endpoint,
* validate: {
* body: schema.object({
* term: schema.string(),
* }),
* }
* },
* bfetch.createStreamingResponseHandler(async (ctx, req) => {
* const { term } = req.body;
* const results$ = await myApi.getResults$(term);
* return results$;
* })
* )}
*
* ```
*
* @param streamHandler
*/
createStreamingRequestHandler: <Response, P, Q, B, Method extends RouteMethod = any>(
streamHandler: StreamingRequestHandler<Response, P, Q, B, Method>
) => RequestHandler<P, Q, B, Method>;
}
// eslint-disable-next-line
export interface BfetchServerStart {}
const streamingHeaders = {
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
};
export class BfetchServerPlugin
implements
Plugin<
@ -76,10 +117,12 @@ export class BfetchServerPlugin
const router = core.http.createRouter();
const addStreamingResponseRoute = this.addStreamingResponseRoute({ router, logger });
const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute);
const createStreamingRequestHandler = this.createStreamingRequestHandler({ logger });
return {
addBatchProcessingRoute,
addStreamingResponseRoute,
createStreamingRequestHandler,
};
}
@ -106,19 +149,30 @@ export class BfetchServerPlugin
async (context, request, response) => {
const handlerInstance = handler(request);
const data = request.body;
const headers = {
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
};
return response.ok({
headers,
body: createNDJSONStream(data, handlerInstance, logger),
headers: streamingHeaders,
body: createNDJSONStream(handlerInstance.getResponseStream(data), logger),
});
}
);
};
private createStreamingRequestHandler = ({
logger,
}: {
logger: Logger;
}): BfetchServerSetup['createStreamingRequestHandler'] => streamHandler => async (
context,
request,
response
) => {
const response$ = await streamHandler(context, request);
return response.ok({
headers: streamingHeaders,
body: createNDJSONStream(response$, logger),
});
};
private addBatchProcessingRoute = (
addStreamingResponseRoute: BfetchServerSetup['addStreamingResponseRoute']
): BfetchServerSetup['addBatchProcessingRoute'] => <

View file

@ -17,19 +17,17 @@
* under the License.
*/
import { Observable } from 'rxjs';
import { Logger } from 'src/core/server';
import { Stream, PassThrough } from 'stream';
import { StreamingResponseHandler } from '../../common/types';
const delimiter = '\n';
export const createNDJSONStream = <Payload, Response>(
payload: Payload,
handler: StreamingResponseHandler<Payload, Response>,
export const createNDJSONStream = <Response>(
results: Observable<Response>,
logger: Logger
): Stream => {
const stream = new PassThrough();
const results = handler.getResponseStream(payload);
results.subscribe({
next: (message: Response) => {

View file

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from 'rxjs';
import { KibanaRequest, RequestHandlerContext, RouteMethod } from 'kibana/server';
/**
* Request handler modified to allow to return an observable.
*
* See {@link BfetchServerSetup.createStreamingRequestHandler} for usage example.
* @public
*/
export type StreamingRequestHandler<
Response = unknown,
P = unknown,
Q = unknown,
B = unknown,
Method extends RouteMethod = any
> = (
context: RequestHandlerContext,
request: KibanaRequest<P, Q, B, Method>
) => Observable<Response> | Promise<Observable<Response>>;