[kbn-performance-testing-dataset-extractor] organise kibana concurrent calls into streams (#138263)

* use streams for both es and kibana calls

* improve run speed

* update comments

* fixes for review

* update test

* use msearch to get all ES requests

* remove unused method

* Update packages/kbn-performance-testing-dataset-extractor/src/es_client.ts

Co-authored-by: Spencer <email@spalger.com>

* fix code

* specify index

* check span is defined

Co-authored-by: Spencer <email@spalger.com>
This commit is contained in:
Dzmitry Lemechko 2022-08-11 12:58:21 +02:00 committed by GitHub
parent a182b8e9be
commit ca75531de2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 367 additions and 268 deletions

View file

@ -7,7 +7,7 @@
*/
module.exports = {
preset: '@kbn/test/jest_node',
preset: '@kbn/test',
rootDir: '../..',
roots: ['<rootDir>/packages/kbn-performance-testing-dataset-extractor'],
};

View file

@ -16,7 +16,8 @@ import { run } from '@kbn/dev-cli-runner';
import { createFlagError } from '@kbn/dev-cli-errors';
import { EsVersion, readConfigFile } from '@kbn/test';
import path from 'path';
import { extractor, ScalabilitySetup } from './extractor';
import { extractor } from './extractor';
import { ScalabilitySetup } from './types';
interface Vars {
[key: string]: string;

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const DATE_FORMAT = `YYYY-MM-DD'T'HH:mm:ss.SSS'Z'`;

View file

@ -8,7 +8,7 @@
import { Client } from '@elastic/elasticsearch';
import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { SearchRequest, MsearchRequestItem } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ToolingLog } from '@kbn/tooling-log';
interface ClientOptions {
@ -36,6 +36,7 @@ interface Transaction {
name: string;
type: string;
duration: { us: number };
span_count: { started: number };
}
export interface Document {
@ -98,6 +99,7 @@ const addRangeFilter = (range: { startTime: string; endTime: string }): QueryDsl
export class ESClient {
client: Client;
log: ToolingLog;
tracesIndex: string = '.ds-traces-apm-default*';
constructor(options: ClientOptions, log: ToolingLog) {
this.client = new Client({
@ -112,6 +114,7 @@ export class ESClient {
async getTransactions<T>(queryFilters: QueryDslQueryContainer[]) {
const searchRequest: SearchRequest = {
index: this.tracesIndex,
body: {
sort: [
{
@ -172,9 +175,44 @@ export class ESClient {
return await this.getTransactions<TransactionDocument>(queryFilters);
}
async getSpans(transactionId: string) {
const filters = [{ field: 'parent.id', value: transactionId }];
const queryFilters = filters.map((filter) => addBooleanFilter(filter));
return await this.getTransactions<SpanDocument>(queryFilters);
getMsearchRequestItem = (queryFilters: QueryDslQueryContainer[]): MsearchRequestItem => {
return {
query: {
bool: {
filter: [
{
bool: {
filter: queryFilters,
},
},
],
},
},
};
};
async getSpans(transactionIds: string[]) {
const searches = new Array<MsearchRequestItem>();
for (const transactionId of transactionIds) {
const filters = [{ field: 'parent.id', value: transactionId }];
const queryFilters = filters.map((filter) => addBooleanFilter(filter));
const requestItem = this.getMsearchRequestItem(queryFilters);
searches.push({ index: this.tracesIndex }, requestItem);
}
this.log.debug(`Msearch request: ${JSON.stringify(searches)}`);
const result = await this.client.msearch<SpanDocument>({
searches,
});
this.log.debug(`Msearch result: ${JSON.stringify(result)}`);
return result.responses.flatMap((response) => {
if ('error' in response) {
throw new Error(`Msearch failure: ${JSON.stringify(response.error)}`);
} else if (response.hits.hits.length > 0) {
return response.hits.hits;
} else {
return [];
}
});
}
}

View file

@ -1,139 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ESClient, SpanDocument } from './es_client';
import { KibanaRequest } from './server_request';
const httpMethodRegExp = /(GET|POST|DELETE|HEAD|PUT|OPTIONS)/;
const httpPathRegExp = /(?<=GET|POST|DELETE|HEAD|PUT|OPTIONS).*/;
interface Request {
id: string;
transactionId: string;
name: string;
action: string;
request: {
method?: string;
path?: string;
params?: string;
body?: JSON;
};
date: string;
duration: number;
}
interface Stream {
startTime: number;
endTime: number;
requests: Request[];
}
const strToJSON = (str: string): JSON | undefined => {
try {
return JSON.parse(str);
} catch (e) {
return;
}
};
const findFirstMatch = (regExp: RegExp, testString: string) => {
const found = regExp.exec(testString);
return found ? found[0] : undefined;
};
const parseQueryStatement = (statement: string): { params?: string; body?: JSON } => {
// github.com/elastic/apm-agent-nodejs/blob/5ba1b2609d18b12a64e1e559236717dd38d64a51/lib/instrumentation/elasticsearch-shared.js#L27-L29
// Some ES endpoints support both query params and a body, statement string might contain both of it
const split = statement.split('\n\n');
if (split.length === 2) {
return { params: split[0], body: strToJSON(split[1]) };
} else {
const body = strToJSON(split[0]);
return body ? { body } : { params: split[0] };
}
};
export const fetchRequests = async (esClient: ESClient, requests: KibanaRequest[]) => {
const esRequests = new Array<Request>();
for (const request of requests) {
const transactionId = request.transaction.id;
const hits = await esClient.getSpans(transactionId);
const spans = hits
.map((hit) => hit!._source as SpanDocument)
.map((hit) => {
const query = hit?.span.db?.statement ? parseQueryStatement(hit?.span.db?.statement) : {};
return {
id: hit.span.id,
transactionId: hit.transaction.id,
name: hit.span.name,
action: hit.span?.action,
request: {
method: findFirstMatch(httpMethodRegExp, hit.span.name),
path: findFirstMatch(httpPathRegExp, hit.span.name.replace(/\s+/g, '')),
params: query?.params,
body: query?.body,
},
date: hit['@timestamp'],
duration: hit.span?.duration?.us,
};
})
// filter out requests without method, path and POST/PUT/DELETE without body
.filter(
(hit) =>
hit &&
hit.request?.method &&
hit.request?.path &&
(hit.request?.method === 'GET' || hit.request?.body)
);
esRequests.push(...spans);
}
return esRequests;
};
export const requestsToStreams = (requests: Request[]) => {
const sorted = requests.sort((a, b) => new Date(a.date).getTime() - new Date(b.date).getTime());
const streams = new Map<string, Stream>();
for (const request of sorted) {
const startTime = new Date(request.date).getTime();
const endTime = new Date(request.date).getTime() + request.duration / 1000;
// searching if query starts before any existing stream ended
const match = Array.from(streams.keys()).filter((key) => {
const streamEndTime = streams.get(key)?.endTime;
return streamEndTime ? startTime < streamEndTime : false;
});
const stream = streams.get(match[0]);
if (stream) {
// adding query to the existing stream
stream.requests.push(request);
// updating the stream endTime if needed
if (endTime > stream.endTime) {
stream.endTime = endTime;
}
// saving updated stream
streams.set(match[0], stream);
} else {
// add a new stream
streams.set(request.date, {
startTime,
endTime,
requests: [request],
});
}
}
const values = Array.from(streams.values());
return values.map((stream) => {
return {
startTime: new Date(stream.startTime).toISOString(),
endTime: new Date(stream.endTime).toISOString(),
requests: stream.requests,
};
});
};

View file

@ -13,38 +13,10 @@ import path from 'path';
import { ToolingLog } from '@kbn/tooling-log';
import { SearchHit } from '@elastic/elasticsearch/lib/api/types';
import { ESClient, Document, TransactionDocument } from './es_client';
import { getRequests } from './server_request';
import { fetchRequests, requestsToStreams } from './es_request';
const DATE_FORMAT = `YYYY-MM-DD'T'HH:mm:ss.SSS'Z'`;
interface CLIParams {
param: {
journeyName: string;
scalabilitySetup: ScalabilitySetup;
buildId: string;
withoutStaticResources: boolean;
};
client: {
baseURL: string;
username: string;
password: string;
};
log: ToolingLog;
}
interface InjectionStep {
action: string;
minUsersCount?: number;
maxUsersCount: number;
duration: string;
}
export interface ScalabilitySetup {
warmup: InjectionStep[];
test: InjectionStep[];
maxDuration: string;
}
import { getESRequests, getKibanaRequests } from './request';
import { requestsToStreams } from './stream';
import { CLIParams, Request } from './types';
import { DATE_FORMAT } from './constants';
const calculateTransactionTimeRage = (hit: SearchHit<Document>) => {
const trSource = hit._source as Document;
@ -100,12 +72,13 @@ export const extractor = async ({ param, client, log }: CLIParams) => {
const source = hits[0]!._source as TransactionDocument;
const kibanaVersion = source.service.version;
const kibanaRequests = getRequests(hits, withoutStaticResources, log);
const esRequests = await fetchRequests(esClient, kibanaRequests);
const kibanaRequests = getKibanaRequests(hits, withoutStaticResources);
const esRequests = await getESRequests(esClient, kibanaRequests);
log.info(
`Found ${kibanaRequests.length} Kibana server and ${esRequests.length} Elasticsearch requests`
);
const streams = requestsToStreams(esRequests);
const esStreams = requestsToStreams<Request>(esRequests);
const kibanaStreams = requestsToStreams<Request>(kibanaRequests);
const outputDir = path.resolve('target/scalability_traces');
const fileName = `${journeyName.replace(/ /g, '')}-${buildId}.json`;
@ -116,7 +89,7 @@ export const extractor = async ({ param, client, log }: CLIParams) => {
journeyName,
kibanaVersion,
scalabilitySetup,
requests: kibanaRequests,
streams: kibanaStreams,
},
path.resolve(outputDir, 'server'),
fileName,
@ -128,7 +101,7 @@ export const extractor = async ({ param, client, log }: CLIParams) => {
{
journeyName,
kibanaVersion,
streams: Array.from(streams.values()),
streams: esStreams,
},
path.resolve(outputDir, 'es'),
fileName,

View file

@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { SearchHit } from '@elastic/elasticsearch/lib/api/types';
import { ESClient, TransactionDocument, Headers, SpanDocument } from './es_client';
import { Request } from './types';
const httpMethodRegExp = /(GET|POST|DELETE|HEAD|PUT|OPTIONS)/;
const httpPathRegExp = /(?<=GET|POST|DELETE|HEAD|PUT|OPTIONS).*/;
const staticResourcesRegExp = /\.(css|ico|js|json|jpeg|jpg|gif|png|svg|otf|ttf|woff|woff2)$/;
const strToJSON = (str: string): JSON | undefined => {
try {
return JSON.parse(str);
} catch (e) {
return;
}
};
const findFirstMatch = (regExp: RegExp, testString: string) => {
const found = regExp.exec(testString);
return found ? found[0] : undefined;
};
const parseQueryStatement = (statement: string): { params?: string; body?: JSON } => {
// github.com/elastic/apm-agent-nodejs/blob/5ba1b2609d18b12a64e1e559236717dd38d64a51/lib/instrumentation/elasticsearch-shared.js#L27-L29
// Some ES endpoints support both query params and a body, statement string might contain both of it
const split = statement.split('\n\n');
if (split.length === 2) {
return { params: split[0], body: strToJSON(split[1]) };
} else {
const body = strToJSON(split[0]);
return body ? { body } : { params: split[0] };
}
};
const combineHeaderFieldValues = (headers: Headers): { [key: string]: string } => {
return Object.assign(
{},
...Object.keys(headers).map((key) => ({ [key]: headers[key].join(', ') }))
);
};
export const getKibanaRequests = (
hits: Array<SearchHit<TransactionDocument>>,
withoutStaticResources: boolean
): Request[] => {
const data = hits
.map((hit) => hit!._source as TransactionDocument)
.map((hit) => {
const payload = hit.http.request?.body?.original;
return {
transactionId: hit.transaction.id,
name: hit.transaction.name,
date: hit['@timestamp'],
duration: hit.transaction.duration.us,
http: {
method: hit.http.request.method,
path: hit.url.path,
headers: combineHeaderFieldValues(hit.http.request.headers),
body: payload ? JSON.stringify(strToJSON(payload)) : undefined,
statusCode: hit.http.response.status_code,
},
spanCount: hit.transaction.span_count.started,
};
});
return withoutStaticResources
? data.filter((item) => !staticResourcesRegExp.test(item.http.path))
: data;
};
export const getESRequests = async (esClient: ESClient, requests: Request[]) => {
const esRequests = new Array<Request>();
const transactionIds = requests
.filter((r) => r.spanCount && r?.spanCount > 0)
.map((r) => r.transactionId);
const hits = await esClient.getSpans(transactionIds);
for (const hit of hits.map((i) => i!._source as SpanDocument)) {
const query = hit?.span?.db?.statement ? parseQueryStatement(hit?.span?.db?.statement) : {};
const method = findFirstMatch(httpMethodRegExp, hit.span.name);
const path = findFirstMatch(httpPathRegExp, hit.span.name.replace(/\s+/g, ''));
// filter out requests without method, path and POST/PUT/DELETE without body
if (method && path && (method === 'GET' || query?.body)) {
esRequests.push({
transactionId: hit.transaction.id,
spanId: hit.span.id,
name: hit.span.name,
date: hit['@timestamp'],
duration: hit.span?.duration?.us,
http: {
method,
path,
params: query?.params,
body: query?.body,
},
});
}
}
return esRequests;
};

View file

@ -1,85 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { SearchHit } from '@elastic/elasticsearch/lib/api/types';
import { ToolingLog } from '@kbn/tooling-log';
import { TransactionDocument, Headers } from './es_client';
const staticResourcesRegExp = /\.(css|ico|js|json|jpeg|jpg|gif|png|otf|ttf|woff|woff2)$/;
export interface KibanaRequest {
traceId: string;
parentId?: string;
processor: string;
environment: string;
request: {
timestamp: string;
method: string;
path: string;
headers: { [key: string]: string };
body?: string;
statusCode: number;
};
transaction: {
id: string;
name: string;
type: string;
};
}
const parsePayload = (payload: string, traceId: string, log: ToolingLog): string | undefined => {
let body;
try {
body = JSON.parse(payload);
} catch (error) {
log.error(`Failed to parse payload - trace_id: '${traceId}'`);
}
return body;
};
const combineHeaderFieldValues = (headers: Headers): { [key: string]: string } => {
return Object.assign(
{},
...Object.keys(headers).map((key) => ({ [key]: headers[key].join(', ') }))
);
};
export const getRequests = (
hits: Array<SearchHit<TransactionDocument>>,
withoutStaticResources: boolean,
log: ToolingLog
): KibanaRequest[] => {
const data = hits
.map((hit) => hit!._source as TransactionDocument)
.map((hit) => {
const payload = hit.http.request?.body?.original;
return {
traceId: hit.trace.id,
parentId: hit?.parent?.id,
processor: hit.processor,
environment: hit.service.environment,
request: {
timestamp: hit['@timestamp'],
method: hit.http.request.method,
path: hit.url.path,
headers: combineHeaderFieldValues(hit.http.request.headers),
body: payload ? JSON.stringify(parsePayload(payload, hit.trace.id, log)) : undefined,
statusCode: hit.http.response.status_code,
},
transaction: {
id: hit.transaction.id,
name: hit.transaction.name,
type: hit.transaction.type,
},
};
});
return withoutStaticResources
? data.filter((item) => !staticResourcesRegExp.test(item.request.path))
: data;
};

View file

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { requestsToStreams, getTime } from './stream';
test('requests to stream', () => {
const requests = new Array(
{
transactionId: 'a8ba23df51ebb853',
spanId: '85c8de731132bea2',
name: 'Elasticsearch: GET /.kibana_8.5.0/_doc/config%3A8.5.0',
date: '2022-08-08T17:43:58.647Z',
duration: 398,
http: {
method: 'GET',
path: '/.kibana_8.5.0/_doc/config%3A8.5.0',
},
},
{
transactionId: '15ba53df5t9bb165',
spanId: '15v8de5341v2neat',
name: 'Elasticsearch: GET /.kibana_8.5.0/_doc/config%3A8.5.0',
date: '2022-08-08T17:43:58.641Z',
duration: 1988,
http: {
method: 'GET',
path: '/.kibana_8.5.0/_doc/config%3A8.5.0',
},
},
{
transactionId: '90d8037a799382ac',
spanId: 'b819755f297188d9',
name: 'Elasticsearch: GET /_security/_authenticate',
date: '2022-08-08T17:43:58.649Z',
duration: 1002,
http: {
method: 'GET',
path: '/_security/_authenticate',
},
},
{
transactionId: '1381d9ed5af967f9',
spanId: '690d11ebfefd06ad',
name: 'Elasticsearch: GET /.kibana_8.5.0/_doc/config%3A8.5.0',
date: '2022-08-08T17:43:58.648Z',
duration: 2400,
http: {
method: 'GET',
path: '/.kibana_8.5.0/_doc/config%3A8.5.0',
},
},
{
transactionId: '96174ca1fbd14763',
spanId: '4c81025cb74c9cd6',
name: 'Elasticsearch: GET /_security/_authenticate',
date: '2022-08-08T17:43:58.640Z',
duration: 4166,
http: {
method: 'GET',
path: '/_security/_authenticate',
},
}
);
const streams = requestsToStreams(requests);
const sorted = requests.sort((a, b) => getTime(a.date) - getTime(b.date));
expect(streams.length).toBe(3);
expect(streams[0].requests.length).toBe(2);
expect(streams[0].startTime).toBe(streams[0].requests[0].date);
expect(streams[0].startTime).toBe(sorted[0].date);
expect(streams[1].requests.length).toBe(1);
expect(getTime(streams[1].startTime)).toBeGreaterThan(getTime(streams[0].endTime));
expect(streams[2].requests.length).toBe(2);
});

View file

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Request, Stream } from './types';
export const getTime = (date: string) => new Date(date).getTime();
/**
* Combines concurrent requests into the streams and returns it as Array
* @param requests requests array
*/
export const requestsToStreams = <T extends Request>(requests: T[]) => {
const sorted = requests.sort((a, b) => getTime(a.date) - getTime(b.date));
const streams = new Map<string, Stream<T>>();
for (const request of sorted) {
const startTime = getTime(request.date) * 1000;
const endTime = getTime(request.date) * 1000 + request.duration;
// Checking if request starts before any existing stream ended
const match = Array.from(streams.keys()).filter((key) => {
const streamEndTimestamp = streams.get(key)?.endTime;
return streamEndTimestamp ? startTime < streamEndTimestamp : false;
});
const stream = streams.get(match[0]);
if (stream) {
// Adding request to the existing stream
stream.requests.push(request);
// Updating the stream end time if needed
if (endTime > stream.endTime) {
stream.endTime = endTime;
}
// Saving the updated stream
streams.set(match[0], stream);
} else {
// Otherwise adding a new stream
streams.set(request.date, {
startTime,
endTime,
requests: [request],
});
}
}
const values = Array.from(streams.values());
return values.map((stream) => {
return {
startTime: new Date(stream.startTime / 1000).toISOString(),
endTime: new Date(stream.endTime / 1000).toISOString(),
requests: stream.requests,
};
});
};

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ToolingLog } from '@kbn/tooling-log';
export interface Request {
transactionId: string;
spanId?: string;
name: string;
date: string;
duration: number;
http: {
method: string;
path: string;
headers?: { [key: string]: string };
params?: string;
body?: JSON | string;
};
spanCount?: number;
}
export interface Stream<T extends Request> {
startTime: number;
endTime: number;
requests: T[];
}
export interface InjectionStep {
action: string;
minUsersCount?: number;
maxUsersCount: number;
duration: string;
}
export interface ScalabilitySetup {
warmup: InjectionStep[];
test: InjectionStep[];
maxDuration: string;
}
export interface CLIParams {
param: {
journeyName: string;
scalabilitySetup: ScalabilitySetup;
buildId: string;
withoutStaticResources: boolean;
};
client: {
baseURL: string;
username: string;
password: string;
};
log: ToolingLog;
}