[scalability testing] get ES queries for each single user performance journey (#137895)

* [kbn-performance-testing-dataset-extractor] fetch & save ES queries

* run apm extractor for all journeys

* fix array name

* cleanup ES query, update namings according review

* update naming
This commit is contained in:
Dzmitry Lemechko 2022-08-03 14:46:39 +02:00 committed by GitHub
parent 62ce378b25
commit 007e6c5d98
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 370 additions and 156 deletions

View file

@ -13,7 +13,7 @@ GCS_BUCKET="gs://kibana-performance/scalability-tests"
.buildkite/scripts/bootstrap.sh
echo "--- Extract APM metrics"
scalabilityJourneys=("login" "promotion_tracking_dashboard")
scalabilityJourneys=("login" "ecommerce_dashboard" "flight_dashboard" "web_logs_dashboard" "promotion_tracking_dashboard" "many_fields_discover")
for i in "${scalabilityJourneys[@]}"; do
JOURNEY_NAME="${i}"

View file

@ -58,8 +58,9 @@ export async function runExtractor() {
const scalabilitySetup: ScalabilitySetup = config.get('scalabilitySetup');
if (!scalabilitySetup) {
log.error(`'scalabilitySetup' must be defined in config file!`);
return;
log.warning(
`'scalabilitySetup' is not defined in config file, output file for Kibana scalability run won't be generated`
);
}
const env = config.get(`kbnTestServer.env`);

View file

@ -17,11 +17,6 @@ interface ClientOptions {
password: string;
}
interface Labels {
journeyName: string;
maxUsersCount: string;
}
export interface Headers {
readonly [key: string]: string[];
}
@ -44,21 +39,33 @@ interface Transaction {
}
export interface Document {
labels: Labels;
character: string;
quote: string;
service: { version: string };
parent?: { id: string };
processor: string;
trace: { id: string };
'@timestamp': string;
environment: string;
labels?: { journeyName: string; maxUsersCount: string };
parent?: { id: string };
service: { name: string; environment: string };
trace: { id: string };
transaction: Transaction;
}
export interface SpanDocument extends Omit<Document, 'transaction'> {
transaction: { id: string };
span: {
id: string;
name: string;
action: string;
duration: { us: number };
db?: { statement?: string };
};
}
export interface TransactionDocument extends Omit<Document, 'service'> {
service: { name: string; environment: string; version: string };
processor: string;
url: { path: string };
http: {
request: Request;
response: Response;
};
transaction: Transaction;
}
const addBooleanFilter = (filter: { field: string; value: string }): QueryDslQueryContainer => {
@ -88,81 +95,86 @@ const addRangeFilter = (range: { startTime: string; endTime: string }): QueryDsl
};
};
export function initClient(options: ClientOptions, log: ToolingLog) {
const client = new Client({
node: options.node,
auth: {
username: options.username,
password: options.password,
},
});
export class ESClient {
client: Client;
log: ToolingLog;
return {
async getKibanaServerTransactions(
buildId: string,
journeyName: string,
range?: { startTime: string; endTime: string }
) {
const filters = [
{ field: 'transaction.type', value: 'request' },
{ field: 'processor.event', value: 'transaction' },
{ field: 'labels.testBuildId', value: buildId },
{ field: 'labels.journeyName', value: journeyName },
];
const queryFilters = filters.map((filter) => addBooleanFilter(filter));
if (range) {
queryFilters.push(addRangeFilter(range));
}
return await this.getTransactions(queryFilters);
},
async getFtrTransactions(buildId: string, journeyName: string) {
const filters = [
{ field: 'service.name', value: 'functional test runner' },
{ field: 'processor.event', value: 'transaction' },
{ field: 'labels.testBuildId', value: buildId },
{ field: 'labels.journeyName', value: journeyName },
{ field: 'labels.performancePhase', value: 'TEST' },
];
const queryFilters = filters.map((filter) => addBooleanFilter(filter));
return await this.getTransactions(queryFilters);
},
constructor(options: ClientOptions, log: ToolingLog) {
this.client = new Client({
node: options.node,
auth: {
username: options.username,
password: options.password,
},
});
this.log = log;
}
async getTransactions(queryFilters: QueryDslQueryContainer[]) {
const searchRequest: SearchRequest = {
body: {
track_total_hits: true,
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'boolean',
},
},
],
size: 10000,
stored_fields: ['*'],
_source: true,
query: {
bool: {
must: [],
filter: [
{
bool: {
filter: queryFilters,
},
},
],
should: [],
must_not: [],
async getTransactions<T>(queryFilters: QueryDslQueryContainer[]) {
const searchRequest: SearchRequest = {
body: {
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'boolean',
},
},
],
size: 10000,
query: {
bool: {
filter: [
{
bool: {
filter: queryFilters,
},
},
],
},
},
};
},
};
log.debug(`Search request: ${JSON.stringify(searchRequest)}`);
const result = await client.search<Document>(searchRequest);
log.debug(`Search result: ${JSON.stringify(result)}`);
return result?.hits?.hits;
},
};
this.log.debug(`Search request: ${JSON.stringify(searchRequest)}`);
const result = await this.client.search<T>(searchRequest);
this.log.debug(`Search result: ${JSON.stringify(result)}`);
return result?.hits?.hits;
}
async getFtrServiceTransactions(buildId: string, journeyName: string) {
const filters = [
{ field: 'service.name', value: 'functional test runner' },
{ field: 'processor.event', value: 'transaction' },
{ field: 'labels.testBuildId', value: buildId },
{ field: 'labels.journeyName', value: journeyName },
{ field: 'labels.performancePhase', value: 'TEST' },
];
const queryFilters = filters.map((filter) => addBooleanFilter(filter));
return await this.getTransactions<Document>(queryFilters);
}
async getKibanaServerTransactions(
buildId: string,
journeyName: string,
range?: { startTime: string; endTime: string }
) {
const filters = [
{ field: 'transaction.type', value: 'request' },
{ field: 'processor.event', value: 'transaction' },
{ field: 'labels.testBuildId', value: buildId },
{ field: 'labels.journeyName', value: journeyName },
];
const queryFilters = filters.map((filter) => addBooleanFilter(filter));
if (range) {
queryFilters.push(addRangeFilter(range));
}
return await this.getTransactions<TransactionDocument>(queryFilters);
}
async getSpans(transactionId: string) {
const filters = [{ field: 'parent.id', value: transactionId }];
const queryFilters = filters.map((filter) => addBooleanFilter(filter));
return await this.getTransactions<SpanDocument>(queryFilters);
}
}

View file

@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ESClient, SpanDocument } from './es_client';
import { KibanaRequest } from './server_request';
const httpMethodRegExp = /(GET|POST|DELETE|HEAD|PUT|OPTIONS)/;
const httpPathRegExp = /(?<=GET|POST|DELETE|HEAD|PUT|OPTIONS).*/;
interface Request {
id: string;
transactionId: string;
name: string;
action: string;
request: {
method?: string;
path?: string;
params?: string;
body?: JSON;
};
date: string;
duration: number;
}
interface Stream {
startTime: number;
endTime: number;
requests: Request[];
}
const strToJSON = (str: string): JSON | undefined => {
try {
return JSON.parse(str);
} catch (e) {
return;
}
};
const findFirstMatch = (regExp: RegExp, testString: string) => {
const found = regExp.exec(testString);
return found ? found[0] : undefined;
};
const parseQueryStatement = (statement: string): { params?: string; body?: JSON } => {
// github.com/elastic/apm-agent-nodejs/blob/5ba1b2609d18b12a64e1e559236717dd38d64a51/lib/instrumentation/elasticsearch-shared.js#L27-L29
// Some ES endpoints support both query params and a body, statement string might contain both of it
const split = statement.split('\n\n');
if (split.length === 2) {
return { params: split[0], body: strToJSON(split[1]) };
} else {
const body = strToJSON(split[0]);
return body ? { body } : { params: split[0] };
}
};
export const fetchRequests = async (esClient: ESClient, requests: KibanaRequest[]) => {
const esRequests = new Array<Request>();
for (const request of requests) {
const transactionId = request.transaction.id;
const hits = await esClient.getSpans(transactionId);
const spans = hits
.map((hit) => hit!._source as SpanDocument)
.map((hit) => {
const query = hit?.span.db?.statement ? parseQueryStatement(hit?.span.db?.statement) : {};
return {
id: hit.span.id,
transactionId: hit.transaction.id,
name: hit.span.name,
action: hit.span?.action,
request: {
method: findFirstMatch(httpMethodRegExp, hit.span.name),
path: findFirstMatch(httpPathRegExp, hit.span.name.replace(/\s+/g, '')),
params: query?.params,
body: query?.body,
},
date: hit['@timestamp'],
duration: hit.span?.duration?.us,
};
})
// filter out requests without method, path and POST/PUT/DELETE without body
.filter(
(hit) =>
hit &&
hit.request?.method &&
hit.request?.path &&
(hit.request?.method === 'GET' || hit.request?.body)
);
esRequests.push(...spans);
}
return esRequests;
};
export const requestsToStreams = (requests: Request[]) => {
const sorted = requests.sort((a, b) => new Date(a.date).getTime() - new Date(b.date).getTime());
const streams = new Map<string, Stream>();
for (const request of sorted) {
const startTime = new Date(request.date).getTime();
const endTime = new Date(request.date).getTime() + request.duration / 1000;
// searching if query starts before any existing stream ended
const match = Array.from(streams.keys()).filter((key) => {
const streamEndTime = streams.get(key)?.endTime;
return streamEndTime ? startTime < streamEndTime : false;
});
const stream = streams.get(match[0]);
if (stream) {
// adding query to the existing stream
stream.requests.push(request);
// updating the stream endTime if needed
if (endTime > stream.endTime) {
stream.endTime = endTime;
}
// saving updated stream
streams.set(match[0], stream);
} else {
// add a new stream
streams.set(request.date, {
startTime,
endTime,
requests: [request],
});
}
}
const values = Array.from(streams.values());
return values.map((stream) => {
return {
startTime: new Date(stream.startTime).toISOString(),
endTime: new Date(stream.endTime).toISOString(),
requests: stream.requests,
};
});
};

View file

@ -12,10 +12,11 @@ import { existsSync } from 'fs';
import path from 'path';
import { ToolingLog } from '@kbn/tooling-log';
import { SearchHit } from '@elastic/elasticsearch/lib/api/types';
import { initClient, Document, Headers } from './es_client';
import { ESClient, Document, TransactionDocument } from './es_client';
import { getRequests } from './server_request';
import { fetchRequests, requestsToStreams } from './es_request';
const DATE_FORMAT = `YYYY-MM-DD'T'HH:mm:ss.SSS'Z'`;
const STATIC_RESOURCES_PATTERN = /\.(css|ico|js|json|jpeg|jpg|gif|png|otf|ttf|woff|woff2)$/;
interface CLIParams {
param: {
@ -45,23 +46,6 @@ export interface ScalabilitySetup {
maxDuration: string;
}
const parsePayload = (payload: string, traceId: string, log: ToolingLog): string | undefined => {
let body;
try {
body = JSON.parse(payload);
} catch (error) {
log.error(`Failed to parse payload - trace_id: '${traceId}'`);
}
return body;
};
const combineHeaderFieldValues = (headers: Headers) => {
return Object.assign(
{},
...Object.keys(headers).map((key) => ({ [key]: headers[key].join(', ') }))
);
};
const calculateTransactionTimeRage = (hit: SearchHit<Document>) => {
const trSource = hit._source as Document;
const startTime = trSource['@timestamp'];
@ -70,39 +54,14 @@ const calculateTransactionTimeRage = (hit: SearchHit<Document>) => {
return { startTime, endTime };
};
const getTraceItems = (
hits: Array<SearchHit<Document>>,
withoutStaticResources: boolean,
log: ToolingLog
) => {
const data = hits
.map((hit) => hit!._source as Document)
.map((hit) => {
const payload = hit.http.request?.body?.original;
return {
traceId: hit.trace.id,
parentId: hit?.parent?.id,
processor: hit.processor,
environment: hit.environment,
request: {
timestamp: hit['@timestamp'],
method: hit.http.request.method,
path: hit.url.path,
headers: combineHeaderFieldValues(hit.http.request.headers),
body: payload ? JSON.stringify(parsePayload(payload, hit.trace.id, log)) : undefined,
statusCode: hit.http.response.status_code,
},
transaction: {
id: hit.transaction.id,
name: hit.transaction.name,
type: hit.transaction.type,
},
};
});
const saveFile = async (output: any, outputDir: string, fileName: string, log: ToolingLog) => {
const filePath = path.resolve(outputDir, fileName);
return withoutStaticResources
? data.filter((item) => !STATIC_RESOURCES_PATTERN.test(item.request.path))
: data;
if (!existsSync(outputDir)) {
await fs.mkdir(outputDir, { recursive: true });
}
await fs.writeFile(filePath, JSON.stringify(output, null, 2), 'utf8');
log.info(`Output file saved: ${filePath}`);
};
export const extractor = async ({ param, client, log }: CLIParams) => {
@ -115,11 +74,11 @@ export const extractor = async ({ param, client, log }: CLIParams) => {
log.info(
`Searching transactions with 'labels.testBuildId=${buildId}' and 'labels.journeyName=${journeyName}'`
);
const esClient = initClient(authOptions, log);
const ftrTransactionHits = await esClient.getFtrTransactions(buildId, journeyName);
const esClient = new ESClient(authOptions, log);
const ftrTransactionHits = await esClient.getFtrServiceTransactions(buildId, journeyName);
if (!ftrTransactionHits || ftrTransactionHits.length === 0) {
log.warning(
`No transactions found. Can't calculate journey time range, output file won't be generated.`
`No 'functional test runner' transactions found. Can't calculate journey time range, output file won't be generated.`
);
return;
}
@ -134,27 +93,45 @@ export const extractor = async ({ param, client, log }: CLIParams) => {
// Filtering out setup/teardown related transactions by time range from 'functional test runner' transaction
const hits = await esClient.getKibanaServerTransactions(buildId, journeyName, timeRange);
if (!hits || hits.length === 0) {
log.warning(`No transactions found. Output file won't be generated.`);
log.warning(`No Kibana server transactions found. Output file won't be generated.`);
return;
}
const source = hits[0]!._source as Document;
const source = hits[0]!._source as TransactionDocument;
const kibanaVersion = source.service.version;
const output = {
journeyName,
kibanaVersion,
scalabilitySetup,
requests: getTraceItems(hits, withoutStaticResources, log),
};
const kibanaRequests = getRequests(hits, withoutStaticResources, log);
const esRequests = await fetchRequests(esClient, kibanaRequests);
log.info(
`Found ${kibanaRequests.length} Kibana server and ${esRequests.length} Elasticsearch requests`
);
const streams = requestsToStreams(esRequests);
const outputDir = path.resolve('target/scalability_traces');
const fileName = `${output.journeyName.replace(/ /g, '')}-${buildId}.json`;
const filePath = path.resolve(outputDir, fileName);
const fileName = `${journeyName.replace(/ /g, '')}-${buildId}.json`;
log.info(`Found ${output.requests.length} transactions, output file: ${filePath}`);
if (!existsSync(outputDir)) {
await fs.mkdir(outputDir, { recursive: true });
if (scalabilitySetup) {
await saveFile(
{
journeyName,
kibanaVersion,
scalabilitySetup,
requests: kibanaRequests,
},
path.resolve(outputDir, 'server'),
fileName,
log
);
}
await fs.writeFile(filePath, JSON.stringify(output, null, 2), 'utf8');
await saveFile(
{
journeyName,
kibanaVersion,
streams: Array.from(streams.values()),
},
path.resolve(outputDir, 'es'),
fileName,
log
);
};

View file

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { SearchHit } from '@elastic/elasticsearch/lib/api/types';
import { ToolingLog } from '@kbn/tooling-log';
import { TransactionDocument, Headers } from './es_client';
const staticResourcesRegExp = /\.(css|ico|js|json|jpeg|jpg|gif|png|otf|ttf|woff|woff2)$/;
export interface KibanaRequest {
traceId: string;
parentId?: string;
processor: string;
environment: string;
request: {
timestamp: string;
method: string;
path: string;
headers: { [key: string]: string };
body?: string;
statusCode: number;
};
transaction: {
id: string;
name: string;
type: string;
};
}
const parsePayload = (payload: string, traceId: string, log: ToolingLog): string | undefined => {
let body;
try {
body = JSON.parse(payload);
} catch (error) {
log.error(`Failed to parse payload - trace_id: '${traceId}'`);
}
return body;
};
const combineHeaderFieldValues = (headers: Headers): { [key: string]: string } => {
return Object.assign(
{},
...Object.keys(headers).map((key) => ({ [key]: headers[key].join(', ') }))
);
};
export const getRequests = (
hits: Array<SearchHit<TransactionDocument>>,
withoutStaticResources: boolean,
log: ToolingLog
): KibanaRequest[] => {
const data = hits
.map((hit) => hit!._source as TransactionDocument)
.map((hit) => {
const payload = hit.http.request?.body?.original;
return {
traceId: hit.trace.id,
parentId: hit?.parent?.id,
processor: hit.processor,
environment: hit.service.environment,
request: {
timestamp: hit['@timestamp'],
method: hit.http.request.method,
path: hit.url.path,
headers: combineHeaderFieldValues(hit.http.request.headers),
body: payload ? JSON.stringify(parsePayload(payload, hit.trace.id, log)) : undefined,
statusCode: hit.http.response.status_code,
},
transaction: {
id: hit.transaction.id,
name: hit.transaction.name,
type: hit.transaction.type,
},
};
});
return withoutStaticResources
? data.filter((item) => !staticResourcesRegExp.test(item.request.path))
: data;
};