[bfetch] Use versioned router (#161317)

## Summary

Part of https://github.com/elastic/kibana/issues/157095.

Uses the new versioned router capabilities for the bfetch plugin.

### Checklist

- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Lukas Olson 2023-07-07 16:48:02 -07:00 committed by GitHub
parent 86fa655990
commit 4b7d18b5c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 158 additions and 110 deletions

View file

@ -8,3 +8,4 @@
export const DISABLE_BFETCH_COMPRESSION = 'bfetch:disableCompression';
export const DISABLE_BFETCH = 'bfetch:disable';
export const BFETCH_ROUTE_VERSION_LATEST = '1';

View file

@ -11,5 +11,9 @@ export type { StreamingResponseHandler } from './streaming';
export type { ItemBufferParams, TimedItemBufferParams, BatchedFunctionParams } from './buffer';
export { ItemBuffer, TimedItemBuffer, createBatchedFunction } from './buffer';
export type { ErrorLike, BatchRequestData, BatchResponseItem, BatchItemWrapper } from './batch';
export { DISABLE_BFETCH_COMPRESSION, DISABLE_BFETCH } from './constants';
export {
DISABLE_BFETCH_COMPRESSION,
DISABLE_BFETCH,
BFETCH_ROUTE_VERSION_LATEST,
} from './constants';
export { BfetchRequestError } from './bfetch_error';

View file

@ -8,11 +8,15 @@
import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '@kbn/core/public';
import { createStartServicesGetter } from '@kbn/kibana-utils-plugin/public';
import { X_ELASTIC_INTERNAL_ORIGIN_REQUEST } from '@kbn/core-http-common';
import {
ELASTIC_HTTP_VERSION_HEADER,
X_ELASTIC_INTERNAL_ORIGIN_REQUEST,
} from '@kbn/core-http-common';
import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming';
import { DISABLE_BFETCH_COMPRESSION, removeLeadingSlash } from '../common';
import { createStreamingBatchedFunction, StreamingBatchedFunctionParams } from './batching';
import { BatchedFunc } from './batching/types';
import { BFETCH_ROUTE_VERSION_LATEST } from '../common/constants';
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface BfetchPublicSetupDependencies {}
@ -47,14 +51,19 @@ export class BfetchPublicPlugin
core: CoreSetup<any, any>,
plugins: BfetchPublicSetupDependencies
): BfetchPublicSetup {
const { version } = this.initializerContext.env.packageInfo;
const { version: kibanaVersion } = this.initializerContext.env.packageInfo;
const basePath = core.http.basePath.get();
const startServices = createStartServicesGetter(core.getStartServices);
const getIsCompressionDisabled = () =>
startServices().core.uiSettings.get<boolean>(DISABLE_BFETCH_COMPRESSION);
const fetchStreaming = this.fetchStreaming(version, basePath, getIsCompressionDisabled);
const fetchStreaming = this.fetchStreaming(
BFETCH_ROUTE_VERSION_LATEST,
kibanaVersion,
basePath,
getIsCompressionDisabled
);
const batchedFunction = this.batchedFunction(fetchStreaming, getIsCompressionDisabled);
this.contract = {
@ -74,6 +83,7 @@ export class BfetchPublicPlugin
private fetchStreaming =
(
version: string,
kibanaVersion: string,
basePath: string,
getIsCompressionDisabled: () => boolean
): BfetchPublicSetup['fetchStreaming'] =>
@ -83,8 +93,9 @@ export class BfetchPublicPlugin
url: `${basePath}/${removeLeadingSlash(params.url)}`,
headers: {
'Content-Type': 'application/json',
'kbn-version': version,
'kbn-version': kibanaVersion,
[X_ELASTIC_INTERNAL_ORIGIN_REQUEST]: 'Kibana',
[ELASTIC_HTTP_VERSION_HEADER]: version,
...(params.headers || {}),
},
getIsCompressionDisabled,

View file

@ -18,9 +18,9 @@ import {
RequestHandler,
KibanaResponseFactory,
} from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { map$ } from '@kbn/std';
import { RouteConfigOptions } from '@kbn/core-http-server';
import { schema } from '@kbn/config-schema';
import { BFETCH_ROUTE_VERSION_LATEST } from '../common/constants';
import {
StreamingResponseHandler,
BatchRequestData,
@ -55,8 +55,7 @@ export interface BfetchServerSetup {
context: RequestHandlerContext
) => StreamingResponseHandler<Payload, Response>,
method?: 'GET' | 'POST' | 'PUT' | 'DELETE',
pluginRouter?: ReturnType<CoreSetup['http']['createRouter']>,
options?: RouteConfigOptions<'get' | 'post' | 'put' | 'delete'>
pluginRouter?: ReturnType<CoreSetup['http']['createRouter']>
) => void;
}
@ -119,17 +118,18 @@ export class BfetchServerPlugin
router: ReturnType<CoreSetup['http']['createRouter']>;
logger: Logger;
}): BfetchServerSetup['addStreamingResponseRoute'] =>
(path, handler, method = 'POST', pluginRouter, options) => {
(path, handler, method = 'POST', pluginRouter) => {
const httpRouter = pluginRouter || router;
const routeDefinition = {
path: `/${removeLeadingSlash(path)}`,
version: BFETCH_ROUTE_VERSION_LATEST,
validate: {
body: schema.any(),
query: schema.object({ compress: schema.boolean({ defaultValue: false }) }),
request: {
body: schema.any(),
query: schema.object({ compress: schema.boolean({ defaultValue: false }) }),
},
},
options,
};
const routeHandler: RequestHandler<unknown, Query> = async (
context: RequestHandlerContext,
request: KibanaRequest<unknown, Query, any>,
@ -146,16 +146,24 @@ export class BfetchServerPlugin
switch (method) {
case 'GET':
httpRouter.get(routeDefinition, routeHandler);
httpRouter.versioned
.get({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
case 'POST':
httpRouter.post(routeDefinition, routeHandler);
httpRouter.versioned
.post({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
case 'PUT':
httpRouter.put(routeDefinition, routeHandler);
httpRouter.versioned
.put({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
case 'DELETE':
httpRouter.delete(routeDefinition, routeHandler);
httpRouter.versioned
.delete({ access: 'internal', path: `/${removeLeadingSlash(path)}` })
.addVersion(routeDefinition, routeHandler);
break;
default:
throw new Error(`Handler for method ${method} is not defined`);

View file

@ -10,7 +10,6 @@
"@kbn/i18n",
"@kbn/config-schema",
"@kbn/std",
"@kbn/core-http-server",
"@kbn/core-http-common",
],
"exclude": [

View file

@ -9,6 +9,8 @@
import expect from '@kbn/expect';
import request from 'superagent';
import { inflateResponse } from '@kbn/bfetch-plugin/public/streaming';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import { BFETCH_ROUTE_VERSION_LATEST } from '@kbn/bfetch-plugin/common';
import { FtrProviderContext } from '../../ftr_provider_context';
import { painlessErrReq } from './painless_err_req';
import { verifyErrorResponse } from './verify_error';
@ -29,25 +31,28 @@ export default function ({ getService }: FtrProviderContext) {
describe('bsearch', () => {
describe('post', () => {
it('should return 200 a single response', async () => {
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
const resp = await supertest
.post(`/internal/bsearch`)
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
},
options: {
strategy: 'es',
},
},
options: {
strategy: 'es',
},
},
],
});
],
});
const jsonBody = parseBfetchResponse(resp);
@ -59,25 +64,28 @@ export default function ({ getService }: FtrProviderContext) {
});
it('should return 200 a single response from compressed', async () => {
const resp = await supertest.post(`/internal/bsearch?compress=true`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
const resp = await supertest
.post(`/internal/bsearch?compress=true`)
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
},
options: {
strategy: 'es',
},
},
options: {
strategy: 'es',
},
},
],
});
],
});
const jsonBody = parseBfetchResponse(resp, true);
@ -89,34 +97,37 @@ export default function ({ getService }: FtrProviderContext) {
});
it('should return a batch of successful responses', async () => {
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
const resp = await supertest
.post(`/internal/bsearch`)
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
},
},
},
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
},
},
},
],
});
],
});
expect(resp.status).to.be(200);
const parsedResponse = parseBfetchResponse(resp);
@ -129,25 +140,28 @@ export default function ({ getService }: FtrProviderContext) {
});
it('should return error for not found strategy', async () => {
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
const resp = await supertest
.post(`/internal/bsearch`)
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
},
options: {
strategy: 'wtf',
},
},
options: {
strategy: 'wtf',
},
},
],
});
],
});
expect(resp.status).to.be(200);
parseBfetchResponse(resp).forEach((responseJson, i) => {
@ -157,26 +171,29 @@ export default function ({ getService }: FtrProviderContext) {
});
it('should return 400 when index type is provided in "es" strategy', async () => {
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: {
index: '.kibana',
indexType: 'baad',
params: {
body: {
query: {
match_all: {},
const resp = await supertest
.post(`/internal/bsearch`)
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: {
index: '.kibana',
indexType: 'baad',
params: {
body: {
query: {
match_all: {},
},
},
},
},
options: {
strategy: 'es',
},
},
options: {
strategy: 'es',
},
},
],
});
],
});
expect(resp.status).to.be(200);
parseBfetchResponse(resp).forEach((responseJson, i) => {
@ -194,16 +211,19 @@ export default function ({ getService }: FtrProviderContext) {
await esArchiver.unload('test/functional/fixtures/es_archiver/logstash_functional');
});
it('should return 400 "search_phase_execution_exception" for Painless error in "es" strategy', async () => {
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: painlessErrReq,
options: {
strategy: 'es',
const resp = await supertest
.post(`/internal/bsearch`)
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{
request: painlessErrReq,
options: {
strategy: 'es',
},
},
},
],
});
],
});
expect(resp.status).to.be(200);
parseBfetchResponse(resp).forEach((responseJson, i) => {

View file

@ -11,6 +11,7 @@ import request from 'superagent';
import type SuperTest from 'supertest';
import { IEsSearchResponse } from '@kbn/data-plugin/common';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import { BFETCH_ROUTE_VERSION_LATEST } from '@kbn/bfetch-plugin/common';
import { FtrService } from '../ftr_provider_context';
/**
@ -84,6 +85,7 @@ export class BsearchService extends FtrService {
const resp = await supertest
.post(`${spaceUrl}/internal/bsearch`)
.set('kbn-xsrf', 'true')
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{

View file

@ -13,6 +13,7 @@ import request from 'superagent';
import type SuperTest from 'supertest';
import { IEsSearchResponse } from '@kbn/data-plugin/common';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import { BFETCH_ROUTE_VERSION_LATEST } from '@kbn/bfetch-plugin/common';
import { FtrService } from '../ftr_provider_context';
const parseBfetchResponse = (resp: request.Response): Array<Record<string, any>> => {
@ -112,6 +113,7 @@ export class BsearchSecureService extends FtrService {
.auth(auth.username, auth.password)
.set('kbn-xsrf', 'true')
.set('x-elastic-internal-origin', 'Kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, BFETCH_ROUTE_VERSION_LATEST)
.send({
batch: [
{

View file

@ -134,6 +134,7 @@
"@kbn/telemetry-tools",
"@kbn/profiling-plugin",
"@kbn/observability-onboarding-plugin",
"@kbn/bfetch-plugin",
"@kbn/uptime-plugin"
]
}