mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Profiling] Add Profiling application (#140722)
Co-authored-by: Joseph Crail <jbcrail@gmail.com> Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Dario Gieselaar <dario.gieselaar@elastic.co> Co-authored-by: Joseph Crail <joseph.crail@elastic.co> Co-authored-by: Stephanie Boomsma <stephanieboomsma@optimyze.cloud> Co-authored-by: inge4pres <fgualazzi@gmail.com> Co-authored-by: inge4pres <francesco.gualazzi@elastic.co> Co-authored-by: Francesco Gualazzi <inge4pres@users.noreply.github.com> Co-authored-by: Tim Rühsen <tim.ruhsen@elastic.co> Co-authored-by: Tim Rühsen <tim.ruehsen@gmx.de>
This commit is contained in:
parent
33ff3538b8
commit
b66cf585ca
119 changed files with 8754 additions and 6 deletions
43
x-pack/plugins/profiling/server/feature.ts
Normal file
43
x-pack/plugins/profiling/server/feature.ts
Normal file
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import { DEFAULT_APP_CATEGORIES } from '@kbn/core/server';
|
||||
|
||||
export const PROFILING_SERVER_FEATURE_ID = 'profiling';
|
||||
|
||||
export const PROFILING_FEATURE = {
|
||||
id: PROFILING_SERVER_FEATURE_ID,
|
||||
name: i18n.translate('xpack.profiling.featureRegistry.profilingFeatureName', {
|
||||
defaultMessage: 'Profiling',
|
||||
}),
|
||||
order: 1200,
|
||||
category: DEFAULT_APP_CATEGORIES.observability,
|
||||
app: ['kibana'],
|
||||
catalogue: [],
|
||||
// see x-pack/plugins/features/common/feature_kibana_privileges.ts
|
||||
privileges: {
|
||||
all: {
|
||||
app: ['kibana'],
|
||||
catalogue: [],
|
||||
savedObject: {
|
||||
all: [],
|
||||
read: [],
|
||||
},
|
||||
ui: ['show'],
|
||||
},
|
||||
read: {
|
||||
app: ['kibana'],
|
||||
catalogue: [],
|
||||
savedObject: {
|
||||
all: [],
|
||||
read: [],
|
||||
},
|
||||
ui: [],
|
||||
},
|
||||
},
|
||||
};
|
30
x-pack/plugins/profiling/server/index.ts
Normal file
30
x-pack/plugins/profiling/server/index.ts
Normal file
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { schema, TypeOf } from '@kbn/config-schema';
|
||||
import type { PluginConfigDescriptor, PluginInitializerContext } from '@kbn/core/server';
|
||||
import { ProfilingPlugin } from './plugin';
|
||||
|
||||
const configSchema = schema.object({
|
||||
enabled: schema.boolean({ defaultValue: false }),
|
||||
});
|
||||
|
||||
type ProfilingConfig = TypeOf<typeof configSchema>;
|
||||
|
||||
// plugin config
|
||||
export const config: PluginConfigDescriptor<ProfilingConfig> = {
|
||||
schema: configSchema,
|
||||
};
|
||||
|
||||
// This exports static code and TypeScript types,
|
||||
// as well as, Kibana Platform `plugin()` initializer.
|
||||
|
||||
export function plugin(initializerContext: PluginInitializerContext) {
|
||||
return new ProfilingPlugin(initializerContext);
|
||||
}
|
||||
|
||||
export type { ProfilingPluginSetup, ProfilingPluginStart } from './types';
|
61
x-pack/plugins/profiling/server/plugin.ts
Normal file
61
x-pack/plugins/profiling/server/plugin.ts
Normal file
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from '@kbn/core/server';
|
||||
|
||||
import { PROFILING_FEATURE } from './feature';
|
||||
import { registerRoutes } from './routes';
|
||||
import {
|
||||
ProfilingPluginSetup,
|
||||
ProfilingPluginSetupDeps,
|
||||
ProfilingPluginStart,
|
||||
ProfilingPluginStartDeps,
|
||||
ProfilingRequestHandlerContext,
|
||||
} from './types';
|
||||
|
||||
export class ProfilingPlugin
|
||||
implements
|
||||
Plugin<
|
||||
ProfilingPluginSetup,
|
||||
ProfilingPluginStart,
|
||||
ProfilingPluginSetupDeps,
|
||||
ProfilingPluginStartDeps
|
||||
>
|
||||
{
|
||||
private readonly logger: Logger;
|
||||
|
||||
constructor(initializerContext: PluginInitializerContext) {
|
||||
this.logger = initializerContext.logger.get();
|
||||
}
|
||||
|
||||
public setup(core: CoreSetup<ProfilingPluginStartDeps>, deps: ProfilingPluginSetupDeps) {
|
||||
this.logger.debug('profiling: Setup');
|
||||
const router = core.http.createRouter<ProfilingRequestHandlerContext>();
|
||||
|
||||
deps.features.registerKibanaFeature(PROFILING_FEATURE);
|
||||
|
||||
core.getStartServices().then(([_, depsStart]) => {
|
||||
registerRoutes({
|
||||
router,
|
||||
logger: this.logger!,
|
||||
dependencies: {
|
||||
start: depsStart,
|
||||
setup: deps,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
public start(core: CoreStart) {
|
||||
this.logger.debug('profiling: Started');
|
||||
return {};
|
||||
}
|
||||
|
||||
public stop() {}
|
||||
}
|
20
x-pack/plugins/profiling/server/routes/compat.ts
Normal file
20
x-pack/plugins/profiling/server/routes/compat.ts
Normal file
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
// Code that works around incompatibilities between different
|
||||
// versions of Kibana / ES.
|
||||
// Currently, we work with 8.1 and 8.3 and thus this code only needs
|
||||
// to address the incompatibilities between those two versions.
|
||||
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { ProfilingRequestHandlerContext } from '../types';
|
||||
|
||||
export async function getClient(
|
||||
context: ProfilingRequestHandlerContext
|
||||
): Promise<ElasticsearchClient> {
|
||||
return (await context.core).elasticsearch.client.asCurrentUser;
|
||||
}
|
125
x-pack/plugins/profiling/server/routes/downsampling.ts
Normal file
125
x-pack/plugins/profiling/server/routes/downsampling.ts
Normal file
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { Logger } from '@kbn/core/server';
|
||||
import seedrandom from 'seedrandom';
|
||||
import { StackTraceID } from '../../common/profiling';
|
||||
import { ProfilingESClient } from '../utils/create_profiling_es_client';
|
||||
import { ProjectTimeQuery } from './query';
|
||||
|
||||
export interface DownsampledEventsIndex {
|
||||
name: string;
|
||||
sampleRate: number;
|
||||
}
|
||||
|
||||
function getFullDownsampledIndex(index: string, pow: number, factor: number): string {
|
||||
const downsampledIndexPrefix = index.replaceAll('-all', '') + '-' + factor + 'pow';
|
||||
return downsampledIndexPrefix + pow.toString().padStart(2, '0');
|
||||
}
|
||||
|
||||
// Return the index that has between targetSampleSize..targetSampleSize*samplingFactor entries.
|
||||
// The starting point is the number of entries from the profiling-events-5pow<initialExp> index.
|
||||
//
|
||||
// More details on how the down-sampling works can be found at the write path
|
||||
// https://github.com/elastic/prodfiler/blob/bdcc2711c6cd7e89d63b58a17329fb9fdbabe008/pf-elastic-collector/elastic.go
|
||||
export function getSampledTraceEventsIndex(
|
||||
index: string,
|
||||
targetSampleSize: number,
|
||||
sampleCountFromInitialExp: number,
|
||||
initialExp: number
|
||||
): DownsampledEventsIndex {
|
||||
const maxExp = 11;
|
||||
const samplingFactor = 5;
|
||||
const fullEventsIndex: DownsampledEventsIndex = { name: index, sampleRate: 1 };
|
||||
|
||||
if (sampleCountFromInitialExp === 0) {
|
||||
// Take the shortcut to the full events index.
|
||||
return fullEventsIndex;
|
||||
}
|
||||
|
||||
let pow = Math.floor(
|
||||
initialExp -
|
||||
Math.log((targetSampleSize * samplingFactor) / sampleCountFromInitialExp) / Math.log(5) +
|
||||
1
|
||||
);
|
||||
|
||||
if (pow < 1) {
|
||||
return fullEventsIndex;
|
||||
}
|
||||
|
||||
if (pow > maxExp) {
|
||||
pow = maxExp;
|
||||
}
|
||||
|
||||
return {
|
||||
name: getFullDownsampledIndex(index, pow, samplingFactor),
|
||||
sampleRate: 1 / samplingFactor ** pow,
|
||||
};
|
||||
}
|
||||
|
||||
export async function findDownsampledIndex({
|
||||
logger,
|
||||
client,
|
||||
index,
|
||||
filter,
|
||||
sampleSize,
|
||||
}: {
|
||||
logger: Logger;
|
||||
client: ProfilingESClient;
|
||||
index: string;
|
||||
filter: ProjectTimeQuery;
|
||||
sampleSize: number;
|
||||
}): Promise<DownsampledEventsIndex> {
|
||||
// Start with counting the results in the index down-sampled by 5^6.
|
||||
// That is in the middle of our down-sampled indexes.
|
||||
const initialExp = 6;
|
||||
let sampleCountFromInitialExp = 0;
|
||||
try {
|
||||
const resp = await client.search('find_downsampled_index', {
|
||||
index: getFullDownsampledIndex(index, initialExp, 5),
|
||||
body: {
|
||||
query: filter,
|
||||
size: 0,
|
||||
track_total_hits: true,
|
||||
},
|
||||
});
|
||||
sampleCountFromInitialExp = resp.hits.total.value;
|
||||
} catch (e) {
|
||||
logger.info(e.message);
|
||||
}
|
||||
|
||||
logger.info('sampleCountFromPow6 ' + sampleCountFromInitialExp);
|
||||
return getSampledTraceEventsIndex(index, sampleSize, sampleCountFromInitialExp, initialExp);
|
||||
}
|
||||
|
||||
export function downsampleEventsRandomly(
|
||||
stackTraceEvents: Map<StackTraceID, number>,
|
||||
p: number,
|
||||
seed: string
|
||||
): number {
|
||||
let totalCount = 0;
|
||||
|
||||
// Make the RNG predictable to get reproducible results.
|
||||
const random = seedrandom(seed);
|
||||
|
||||
for (const [id, count] of stackTraceEvents) {
|
||||
let newCount = 0;
|
||||
for (let i = 0; i < count; i++) {
|
||||
if (random() < p) {
|
||||
newCount++;
|
||||
}
|
||||
}
|
||||
if (newCount) {
|
||||
stackTraceEvents.set(id, newCount);
|
||||
totalCount += newCount;
|
||||
} else {
|
||||
stackTraceEvents.delete(id);
|
||||
}
|
||||
}
|
||||
|
||||
return totalCount;
|
||||
}
|
61
x-pack/plugins/profiling/server/routes/flamechart.test.ts
Normal file
61
x-pack/plugins/profiling/server/routes/flamechart.test.ts
Normal file
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { DownsampledEventsIndex, getSampledTraceEventsIndex } from './downsampling';
|
||||
|
||||
describe('Using down-sampled indexes', () => {
|
||||
test('getSampledTraceEventsIndex', () => {
|
||||
const targetSampleSize = 20000;
|
||||
const initialExp = 6;
|
||||
const tests: Array<{
|
||||
sampleCountFromPow6: number;
|
||||
expected: DownsampledEventsIndex;
|
||||
}> = [
|
||||
{
|
||||
// stay with the input downsampled index
|
||||
sampleCountFromPow6: targetSampleSize,
|
||||
expected: { name: 'profiling-events-5pow06', sampleRate: 1 / 5 ** 6 },
|
||||
},
|
||||
{
|
||||
// stay with the input downsampled index
|
||||
sampleCountFromPow6: targetSampleSize * 5 - 1,
|
||||
expected: { name: 'profiling-events-5pow06', sampleRate: 1 / 5 ** 6 },
|
||||
},
|
||||
{
|
||||
// go down one downsampling step
|
||||
sampleCountFromPow6: targetSampleSize * 5,
|
||||
expected: { name: 'profiling-events-5pow07', sampleRate: 1 / 5 ** 7 },
|
||||
},
|
||||
{
|
||||
// go up one downsampling step
|
||||
sampleCountFromPow6: targetSampleSize - 1,
|
||||
expected: { name: 'profiling-events-5pow05', sampleRate: 1 / 5 ** 5 },
|
||||
},
|
||||
{
|
||||
// go to the full events index
|
||||
sampleCountFromPow6: 0,
|
||||
expected: { name: 'profiling-events-all', sampleRate: 1 },
|
||||
},
|
||||
{
|
||||
// go to the most downsampled index
|
||||
sampleCountFromPow6: targetSampleSize * 5 ** 8,
|
||||
expected: { name: 'profiling-events-5pow11', sampleRate: 1 / 5 ** 11 },
|
||||
},
|
||||
];
|
||||
|
||||
for (const t of tests) {
|
||||
expect(
|
||||
getSampledTraceEventsIndex(
|
||||
'profiling-events-all',
|
||||
targetSampleSize,
|
||||
t.sampleCountFromPow6,
|
||||
initialExp
|
||||
)
|
||||
).toEqual(t.expected);
|
||||
}
|
||||
});
|
||||
});
|
79
x-pack/plugins/profiling/server/routes/flamechart.ts
Normal file
79
x-pack/plugins/profiling/server/routes/flamechart.ts
Normal file
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import { RouteRegisterParameters } from '.';
|
||||
import { getRoutePaths } from '../../common';
|
||||
import { FlameGraph } from '../../common/flamegraph';
|
||||
import { createProfilingEsClient } from '../utils/create_profiling_es_client';
|
||||
import { withProfilingSpan } from '../utils/with_profiling_span';
|
||||
import { getClient } from './compat';
|
||||
import { getExecutablesAndStackTraces } from './get_executables_and_stacktraces';
|
||||
import { createCommonFilter } from './query';
|
||||
|
||||
export function registerFlameChartSearchRoute({ router, logger }: RouteRegisterParameters) {
|
||||
const paths = getRoutePaths();
|
||||
router.get(
|
||||
{
|
||||
path: paths.Flamechart,
|
||||
validate: {
|
||||
query: schema.object({
|
||||
timeFrom: schema.number(),
|
||||
timeTo: schema.number(),
|
||||
kuery: schema.string(),
|
||||
}),
|
||||
},
|
||||
},
|
||||
async (context, request, response) => {
|
||||
const { timeFrom, timeTo, kuery } = request.query;
|
||||
const targetSampleSize = 20000; // minimum number of samples to get statistically sound results
|
||||
|
||||
try {
|
||||
const esClient = await getClient(context);
|
||||
const filter = createCommonFilter({
|
||||
timeFrom,
|
||||
timeTo,
|
||||
kuery,
|
||||
});
|
||||
|
||||
const { stackTraces, executables, stackFrames, eventsIndex, totalCount, stackTraceEvents } =
|
||||
await getExecutablesAndStackTraces({
|
||||
logger,
|
||||
client: createProfilingEsClient({ request, esClient }),
|
||||
filter,
|
||||
sampleSize: targetSampleSize,
|
||||
});
|
||||
|
||||
const flamegraph = await withProfilingSpan('collect_flamegraph', async () => {
|
||||
return new FlameGraph({
|
||||
sampleRate: eventsIndex.sampleRate,
|
||||
totalCount,
|
||||
events: stackTraceEvents,
|
||||
stackTraces,
|
||||
stackFrames,
|
||||
executables,
|
||||
totalSeconds: timeTo - timeFrom,
|
||||
}).toElastic();
|
||||
});
|
||||
|
||||
logger.info('returning payload response to client');
|
||||
|
||||
return response.ok({
|
||||
body: flamegraph,
|
||||
});
|
||||
} catch (e) {
|
||||
logger.error(e);
|
||||
return response.customError({
|
||||
statusCode: e.statusCode ?? 500,
|
||||
body: {
|
||||
message: e.message,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
102
x-pack/plugins/profiling/server/routes/frames.ts
Normal file
102
x-pack/plugins/profiling/server/routes/frames.ts
Normal file
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import { Logger } from '@kbn/logging';
|
||||
import { RouteRegisterParameters } from '.';
|
||||
import { getRoutePaths } from '../../common';
|
||||
import {
|
||||
createStackFrameMetadata,
|
||||
Executable,
|
||||
StackFrame,
|
||||
StackFrameMetadata,
|
||||
} from '../../common/profiling';
|
||||
import { createProfilingEsClient, ProfilingESClient } from '../utils/create_profiling_es_client';
|
||||
import { mgetStackFrames, mgetExecutables } from './stacktrace';
|
||||
|
||||
async function getFrameInformation({
|
||||
frameID,
|
||||
executableID,
|
||||
logger,
|
||||
client,
|
||||
}: {
|
||||
frameID: string;
|
||||
executableID: string;
|
||||
logger: Logger;
|
||||
client: ProfilingESClient;
|
||||
}): Promise<StackFrameMetadata | undefined> {
|
||||
const [stackFrames, executables] = await Promise.all([
|
||||
mgetStackFrames({
|
||||
logger,
|
||||
client,
|
||||
stackFrameIDs: new Set([frameID]),
|
||||
}),
|
||||
mgetExecutables({
|
||||
logger,
|
||||
client,
|
||||
executableIDs: new Set([executableID]),
|
||||
}),
|
||||
]);
|
||||
|
||||
const frame = Array.from(stackFrames.values())[0] as StackFrame | undefined;
|
||||
const executable = Array.from(executables.values())[0] as Executable | undefined;
|
||||
|
||||
if (frame) {
|
||||
return createStackFrameMetadata({
|
||||
FrameID: frameID,
|
||||
FileID: executableID,
|
||||
SourceFilename: frame.FileName,
|
||||
FunctionName: frame.FunctionName,
|
||||
ExeFileName: executable?.FileName,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export function registerFrameInformationRoute(params: RouteRegisterParameters) {
|
||||
const { logger, router } = params;
|
||||
|
||||
const routePaths = getRoutePaths();
|
||||
|
||||
router.get(
|
||||
{
|
||||
path: routePaths.FrameInformation,
|
||||
validate: {
|
||||
query: schema.object({
|
||||
frameID: schema.string(),
|
||||
executableID: schema.string(),
|
||||
}),
|
||||
},
|
||||
},
|
||||
async (context, request, response) => {
|
||||
const { frameID, executableID } = request.query;
|
||||
|
||||
const client = createProfilingEsClient({
|
||||
request,
|
||||
esClient: (await context.core).elasticsearch.client.asCurrentUser,
|
||||
});
|
||||
|
||||
try {
|
||||
const frame = await getFrameInformation({
|
||||
frameID,
|
||||
executableID,
|
||||
logger,
|
||||
client,
|
||||
});
|
||||
|
||||
return response.ok({ body: frame });
|
||||
} catch (error: any) {
|
||||
logger.error(error);
|
||||
return response.custom({
|
||||
statusCode: error.statusCode ?? 500,
|
||||
body: {
|
||||
message: error.message ?? 'An internal server error occured',
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
84
x-pack/plugins/profiling/server/routes/functions.ts
Normal file
84
x-pack/plugins/profiling/server/routes/functions.ts
Normal file
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { schema, TypeOf } from '@kbn/config-schema';
|
||||
import { RouteRegisterParameters } from '.';
|
||||
import { getRoutePaths } from '../../common';
|
||||
import { createTopNFunctions } from '../../common/functions';
|
||||
import { createProfilingEsClient } from '../utils/create_profiling_es_client';
|
||||
import { withProfilingSpan } from '../utils/with_profiling_span';
|
||||
import { getClient } from './compat';
|
||||
import { getExecutablesAndStackTraces } from './get_executables_and_stacktraces';
|
||||
import { createCommonFilter } from './query';
|
||||
|
||||
const querySchema = schema.object({
|
||||
timeFrom: schema.number(),
|
||||
timeTo: schema.number(),
|
||||
startIndex: schema.number(),
|
||||
endIndex: schema.number(),
|
||||
kuery: schema.string(),
|
||||
});
|
||||
|
||||
type QuerySchemaType = TypeOf<typeof querySchema>;
|
||||
|
||||
export function registerTopNFunctionsSearchRoute({ router, logger }: RouteRegisterParameters) {
|
||||
const paths = getRoutePaths();
|
||||
router.get(
|
||||
{
|
||||
path: paths.TopNFunctions,
|
||||
validate: {
|
||||
query: querySchema,
|
||||
},
|
||||
},
|
||||
async (context, request, response) => {
|
||||
try {
|
||||
const { timeFrom, timeTo, startIndex, endIndex, kuery }: QuerySchemaType = request.query;
|
||||
|
||||
const targetSampleSize = 20000; // minimum number of samples to get statistically sound results
|
||||
const esClient = await getClient(context);
|
||||
const filter = createCommonFilter({
|
||||
timeFrom,
|
||||
timeTo,
|
||||
kuery,
|
||||
});
|
||||
|
||||
const { stackFrames, stackTraceEvents, stackTraces, executables } =
|
||||
await getExecutablesAndStackTraces({
|
||||
client: createProfilingEsClient({ request, esClient }),
|
||||
filter,
|
||||
logger,
|
||||
sampleSize: targetSampleSize,
|
||||
});
|
||||
|
||||
const topNFunctions = await withProfilingSpan('collect_topn_functions', async () => {
|
||||
return createTopNFunctions(
|
||||
stackTraceEvents,
|
||||
stackTraces,
|
||||
stackFrames,
|
||||
executables,
|
||||
startIndex,
|
||||
endIndex
|
||||
);
|
||||
});
|
||||
|
||||
logger.info('returning payload response to client');
|
||||
|
||||
return response.ok({
|
||||
body: topNFunctions,
|
||||
});
|
||||
} catch (e) {
|
||||
logger.error(e);
|
||||
return response.customError({
|
||||
statusCode: e.statusCode ?? 500,
|
||||
body: {
|
||||
message: e.message,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Logger } from '@kbn/logging';
|
||||
import { INDEX_EVENTS } from '../../common';
|
||||
import { ProfilingESClient } from '../utils/create_profiling_es_client';
|
||||
import { withProfilingSpan } from '../utils/with_profiling_span';
|
||||
import { downsampleEventsRandomly, findDownsampledIndex } from './downsampling';
|
||||
import { logExecutionLatency } from './logger';
|
||||
import { ProjectTimeQuery } from './query';
|
||||
import {
|
||||
mgetExecutables,
|
||||
mgetStackFrames,
|
||||
mgetStackTraces,
|
||||
searchEventsGroupByStackTrace,
|
||||
} from './stacktrace';
|
||||
|
||||
export async function getExecutablesAndStackTraces({
|
||||
logger,
|
||||
client,
|
||||
filter,
|
||||
sampleSize,
|
||||
}: {
|
||||
logger: Logger;
|
||||
client: ProfilingESClient;
|
||||
filter: ProjectTimeQuery;
|
||||
sampleSize: number;
|
||||
}) {
|
||||
return withProfilingSpan('get_executables_and_stack_traces', async () => {
|
||||
const eventsIndex = await findDownsampledIndex({
|
||||
logger,
|
||||
client,
|
||||
index: INDEX_EVENTS,
|
||||
filter,
|
||||
sampleSize,
|
||||
});
|
||||
|
||||
const { totalCount, stackTraceEvents } = await searchEventsGroupByStackTrace({
|
||||
logger,
|
||||
client,
|
||||
index: eventsIndex,
|
||||
filter,
|
||||
});
|
||||
|
||||
// Manual downsampling if totalCount exceeds sampleSize by 10%.
|
||||
let p = 1.0;
|
||||
if (totalCount > sampleSize * 1.1) {
|
||||
p = sampleSize / totalCount;
|
||||
logger.info('downsampling events with p=' + p);
|
||||
await logExecutionLatency(logger, 'downsampling events', async () => {
|
||||
const downsampledTotalCount = downsampleEventsRandomly(
|
||||
stackTraceEvents,
|
||||
p,
|
||||
filter.toString()
|
||||
);
|
||||
logger.info('downsampled total count: ' + downsampledTotalCount);
|
||||
});
|
||||
logger.info('unique downsampled stacktraces: ' + stackTraceEvents.size);
|
||||
}
|
||||
|
||||
// Adjust the sample counts from down-sampled to fully sampled.
|
||||
// Be aware that downsampling drops entries from stackTraceEvents, so that
|
||||
// the sum of the upscaled count values is less that totalCount.
|
||||
for (const [id, count] of stackTraceEvents) {
|
||||
stackTraceEvents.set(id, Math.floor(count / (eventsIndex.sampleRate * p)));
|
||||
}
|
||||
|
||||
const { stackTraces, stackFrameDocIDs, executableDocIDs } = await mgetStackTraces({
|
||||
logger,
|
||||
client,
|
||||
events: stackTraceEvents,
|
||||
});
|
||||
|
||||
return withProfilingSpan('get_stackframes_and_executables', () =>
|
||||
Promise.all([
|
||||
mgetStackFrames({ logger, client, stackFrameIDs: stackFrameDocIDs }),
|
||||
mgetExecutables({ logger, client, executableIDs: executableDocIDs }),
|
||||
])
|
||||
).then(([stackFrames, executables]) => {
|
||||
return {
|
||||
stackTraces,
|
||||
executables,
|
||||
stackFrames,
|
||||
stackTraceEvents,
|
||||
totalCount,
|
||||
eventsIndex,
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
45
x-pack/plugins/profiling/server/routes/index.ts
Normal file
45
x-pack/plugins/profiling/server/routes/index.ts
Normal file
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { IRouter, Logger } from '@kbn/core/server';
|
||||
import {
|
||||
ProfilingPluginSetupDeps,
|
||||
ProfilingPluginStartDeps,
|
||||
ProfilingRequestHandlerContext,
|
||||
} from '../types';
|
||||
|
||||
import { registerFlameChartSearchRoute } from './flamechart';
|
||||
import { registerFrameInformationRoute } from './frames';
|
||||
import { registerTopNFunctionsSearchRoute } from './functions';
|
||||
|
||||
import {
|
||||
registerTraceEventsTopNContainersSearchRoute,
|
||||
registerTraceEventsTopNDeploymentsSearchRoute,
|
||||
registerTraceEventsTopNHostsSearchRoute,
|
||||
registerTraceEventsTopNStackTracesSearchRoute,
|
||||
registerTraceEventsTopNThreadsSearchRoute,
|
||||
} from './topn';
|
||||
|
||||
export interface RouteRegisterParameters {
|
||||
router: IRouter<ProfilingRequestHandlerContext>;
|
||||
logger: Logger;
|
||||
dependencies: {
|
||||
start: ProfilingPluginStartDeps;
|
||||
setup: ProfilingPluginSetupDeps;
|
||||
};
|
||||
}
|
||||
|
||||
export function registerRoutes(params: RouteRegisterParameters) {
|
||||
registerFlameChartSearchRoute(params);
|
||||
registerTopNFunctionsSearchRoute(params);
|
||||
registerTraceEventsTopNContainersSearchRoute(params);
|
||||
registerTraceEventsTopNDeploymentsSearchRoute(params);
|
||||
registerTraceEventsTopNHostsSearchRoute(params);
|
||||
registerTraceEventsTopNStackTracesSearchRoute(params);
|
||||
registerTraceEventsTopNThreadsSearchRoute(params);
|
||||
registerFrameInformationRoute(params);
|
||||
}
|
20
x-pack/plugins/profiling/server/routes/logger.ts
Normal file
20
x-pack/plugins/profiling/server/routes/logger.ts
Normal file
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { Logger } from '@kbn/core/server';
|
||||
|
||||
export async function logExecutionLatency<T>(
|
||||
logger: Logger,
|
||||
activity: string,
|
||||
func: () => Promise<T>
|
||||
): Promise<T> {
|
||||
const start = new Date().getTime();
|
||||
return await func().then((res) => {
|
||||
logger.info(activity + ' took ' + (new Date().getTime() - start) + 'ms');
|
||||
return res;
|
||||
});
|
||||
}
|
42
x-pack/plugins/profiling/server/routes/query.ts
Normal file
42
x-pack/plugins/profiling/server/routes/query.ts
Normal file
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { QueryDslBoolQuery } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { kqlQuery } from '@kbn/observability-plugin/server';
|
||||
import { ProfilingESField } from '../../common/elasticsearch';
|
||||
|
||||
export interface ProjectTimeQuery {
|
||||
bool: QueryDslBoolQuery;
|
||||
}
|
||||
|
||||
export function createCommonFilter({
|
||||
kuery,
|
||||
timeFrom,
|
||||
timeTo,
|
||||
}: {
|
||||
kuery: string;
|
||||
timeFrom: number;
|
||||
timeTo: number;
|
||||
}): ProjectTimeQuery {
|
||||
return {
|
||||
bool: {
|
||||
filter: [
|
||||
...kqlQuery(kuery),
|
||||
{
|
||||
range: {
|
||||
[ProfilingESField.Timestamp]: {
|
||||
gte: String(timeFrom),
|
||||
lt: String(timeTo),
|
||||
format: 'epoch_second',
|
||||
boost: 1.0,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
}
|
140
x-pack/plugins/profiling/server/routes/stacktrace.test.ts
Normal file
140
x-pack/plugins/profiling/server/routes/stacktrace.test.ts
Normal file
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { createStackFrameID, StackTrace } from '../../common/profiling';
|
||||
import {
|
||||
decodeStackTrace,
|
||||
EncodedStackTrace,
|
||||
runLengthDecode,
|
||||
runLengthEncode,
|
||||
} from './stacktrace';
|
||||
|
||||
enum fileID {
|
||||
A = 'aQpJmTLWydNvOapSFZOwKg',
|
||||
B = 'hz_u-HGyrN6qeIk6UIJeCA',
|
||||
C = 'AJ8qrcXSoJbl_haPhlc4og',
|
||||
D = 'lHZiv7a58px6Gumcpo-6yA',
|
||||
E = 'fkbxUTZgljnk71ZMnqJnyA',
|
||||
F = 'gnEsgxvvEODj6iFYMQWYlA',
|
||||
}
|
||||
|
||||
enum addressOrLine {
|
||||
A = 515512,
|
||||
B = 26278522,
|
||||
C = 6712518,
|
||||
D = 105806025,
|
||||
E = 111,
|
||||
F = 106182663,
|
||||
G = 100965370,
|
||||
}
|
||||
|
||||
const frameID: Record<string, string> = {
|
||||
A: createStackFrameID(fileID.A, addressOrLine.A),
|
||||
B: createStackFrameID(fileID.B, addressOrLine.B),
|
||||
C: createStackFrameID(fileID.C, addressOrLine.C),
|
||||
D: createStackFrameID(fileID.D, addressOrLine.D),
|
||||
E: createStackFrameID(fileID.E, addressOrLine.E),
|
||||
F: createStackFrameID(fileID.F, addressOrLine.F),
|
||||
G: createStackFrameID(fileID.F, addressOrLine.G),
|
||||
};
|
||||
|
||||
const frameTypeA = [0, 0, 0];
|
||||
const frameTypeB = [8, 8, 8, 8];
|
||||
|
||||
describe('Stack trace operations', () => {
|
||||
test('decodeStackTrace', () => {
|
||||
const tests: Array<{
|
||||
original: EncodedStackTrace;
|
||||
expected: StackTrace;
|
||||
}> = [
|
||||
{
|
||||
original: {
|
||||
Stacktrace: {
|
||||
frame: {
|
||||
ids: frameID.A + frameID.B + frameID.C,
|
||||
types: runLengthEncode(frameTypeA).toString('base64url'),
|
||||
},
|
||||
},
|
||||
} as EncodedStackTrace,
|
||||
expected: {
|
||||
FrameIDs: [frameID.A, frameID.B, frameID.C],
|
||||
FileIDs: [fileID.A, fileID.B, fileID.C],
|
||||
AddressOrLines: [addressOrLine.A, addressOrLine.B, addressOrLine.C],
|
||||
Types: frameTypeA,
|
||||
} as StackTrace,
|
||||
},
|
||||
{
|
||||
original: {
|
||||
Stacktrace: {
|
||||
frame: {
|
||||
ids: frameID.D + frameID.E + frameID.F + frameID.G,
|
||||
types: runLengthEncode(frameTypeB).toString('base64url'),
|
||||
},
|
||||
},
|
||||
} as EncodedStackTrace,
|
||||
expected: {
|
||||
FrameIDs: [frameID.D, frameID.E, frameID.F, frameID.G],
|
||||
FileIDs: [fileID.D, fileID.E, fileID.F, fileID.F],
|
||||
AddressOrLines: [addressOrLine.D, addressOrLine.E, addressOrLine.F, addressOrLine.G],
|
||||
Types: frameTypeB,
|
||||
} as StackTrace,
|
||||
},
|
||||
];
|
||||
|
||||
for (const t of tests) {
|
||||
expect(decodeStackTrace(t.original)).toEqual(t.expected);
|
||||
}
|
||||
});
|
||||
|
||||
test('run length is fully reversible', () => {
|
||||
const tests: number[][] = [[], [0], [0, 1, 2, 3], [0, 1, 1, 2, 2, 2, 3, 3, 3, 3]];
|
||||
|
||||
for (const t of tests) {
|
||||
expect(runLengthDecode(runLengthEncode(t))).toEqual(t);
|
||||
}
|
||||
});
|
||||
|
||||
test('runLengthDecodeReverse with optional parameter', () => {
|
||||
const tests: Array<{
|
||||
bytes: Buffer;
|
||||
expected: number[];
|
||||
}> = [
|
||||
{
|
||||
bytes: Buffer.from([0x5, 0x0, 0x2, 0x2]),
|
||||
expected: [0, 0, 0, 0, 0, 2, 2],
|
||||
},
|
||||
{
|
||||
bytes: Buffer.from([0x1, 0x8]),
|
||||
expected: [8],
|
||||
},
|
||||
];
|
||||
|
||||
for (const t of tests) {
|
||||
expect(runLengthDecode(t.bytes, t.expected.length)).toEqual(t.expected);
|
||||
}
|
||||
});
|
||||
|
||||
test('runLengthDecodeReverse without optional parameter', () => {
|
||||
const tests: Array<{
|
||||
bytes: Buffer;
|
||||
expected: number[];
|
||||
}> = [
|
||||
{
|
||||
bytes: Buffer.from([0x5, 0x0, 0x2, 0x2]),
|
||||
expected: [0, 0, 0, 0, 0, 2, 2],
|
||||
},
|
||||
{
|
||||
bytes: Buffer.from([0x1, 0x8]),
|
||||
expected: [8],
|
||||
},
|
||||
];
|
||||
|
||||
for (const t of tests) {
|
||||
expect(runLengthDecode(t.bytes)).toEqual(t.expected);
|
||||
}
|
||||
});
|
||||
});
|
417
x-pack/plugins/profiling/server/routes/stacktrace.ts
Normal file
417
x-pack/plugins/profiling/server/routes/stacktrace.ts
Normal file
|
@ -0,0 +1,417 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { Logger } from '@kbn/core/server';
|
||||
import { chunk } from 'lodash';
|
||||
import LRUCache from 'lru-cache';
|
||||
import { INDEX_EXECUTABLES, INDEX_FRAMES, INDEX_TRACES } from '../../common';
|
||||
import {
|
||||
DedotObject,
|
||||
PickFlattened,
|
||||
ProfilingESField,
|
||||
ProfilingExecutable,
|
||||
ProfilingStackFrame,
|
||||
ProfilingStackTrace,
|
||||
} from '../../common/elasticsearch';
|
||||
import {
|
||||
Executable,
|
||||
FileID,
|
||||
StackFrame,
|
||||
StackFrameID,
|
||||
StackTrace,
|
||||
StackTraceID,
|
||||
} from '../../common/profiling';
|
||||
import { ProfilingESClient } from '../utils/create_profiling_es_client';
|
||||
import { withProfilingSpan } from '../utils/with_profiling_span';
|
||||
import { DownsampledEventsIndex } from './downsampling';
|
||||
import { logExecutionLatency } from './logger';
|
||||
import { ProjectTimeQuery } from './query';
|
||||
|
||||
const traceLRU = new LRUCache<StackTraceID, StackTrace>({ max: 20000 });
|
||||
|
||||
const BASE64_FRAME_ID_LENGTH = 32;
|
||||
|
||||
export type EncodedStackTrace = DedotObject<{
|
||||
// This field is a base64-encoded byte string. The string represents a
|
||||
// serialized list of frame IDs in which the order of frames are
|
||||
// reversed to allow for prefix compression (leaf frame last). Each
|
||||
// frame ID is composed of two concatenated values: a 16-byte file ID
|
||||
// and an 8-byte address or line number (depending on the context of
|
||||
// the downstream reader).
|
||||
//
|
||||
// Frame ID #1 Frame ID #2
|
||||
// +----------------+--------+----------------+--------+----
|
||||
// | File ID | Addr | File ID | Addr |
|
||||
// +----------------+--------+----------------+--------+----
|
||||
[ProfilingESField.StacktraceFrameIDs]: string;
|
||||
|
||||
// This field is a run-length encoding of a list of uint8s. The order is
|
||||
// reversed from the original input.
|
||||
[ProfilingESField.StacktraceFrameTypes]: string;
|
||||
}>;
|
||||
|
||||
// runLengthEncode run-length encodes the input array.
|
||||
//
|
||||
// The input is a list of uint8s. The output is a binary stream of
|
||||
// 2-byte pairs (first byte is the length and the second byte is the
|
||||
// binary representation of the object) in reverse order.
|
||||
//
|
||||
// E.g. uint8 array [0, 0, 0, 0, 0, 2, 2, 2] is converted into the byte
|
||||
// array [5, 0, 3, 2].
|
||||
export function runLengthEncode(input: number[]): Buffer {
|
||||
const output: number[] = [];
|
||||
|
||||
if (input.length === 0) {
|
||||
return Buffer.from(output);
|
||||
}
|
||||
|
||||
let count = 0;
|
||||
let current = input[0];
|
||||
|
||||
for (let i = 1; i < input.length; i++) {
|
||||
const next = input[i];
|
||||
|
||||
if (next === current && count < 255) {
|
||||
count++;
|
||||
continue;
|
||||
}
|
||||
|
||||
output.push(count + 1, current);
|
||||
|
||||
count = 0;
|
||||
current = next;
|
||||
}
|
||||
|
||||
output.push(count + 1, current);
|
||||
|
||||
return Buffer.from(output);
|
||||
}
|
||||
|
||||
// runLengthDecode decodes a run-length encoding for the input array.
|
||||
//
|
||||
// The input is a binary stream of 2-byte pairs (first byte is the length and the
|
||||
// second byte is the binary representation of the object). The output is a list of
|
||||
// uint8s.
|
||||
//
|
||||
// E.g. byte array [5, 0, 3, 2] is converted into an uint8 array like
|
||||
// [0, 0, 0, 0, 0, 2, 2, 2].
|
||||
export function runLengthDecode(input: Buffer, outputSize?: number): number[] {
|
||||
let size;
|
||||
|
||||
if (typeof outputSize === 'undefined') {
|
||||
size = 0;
|
||||
for (let i = 0; i < input.length; i += 2) {
|
||||
size += input[i];
|
||||
}
|
||||
} else {
|
||||
size = outputSize;
|
||||
}
|
||||
|
||||
const output: number[] = new Array(size);
|
||||
|
||||
let idx = 0;
|
||||
for (let i = 0; i < input.length; i += 2) {
|
||||
for (let j = 0; j < input[i]; j++) {
|
||||
output[idx] = input[i + 1];
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
|
||||
return output;
|
||||
}
|
||||
|
||||
// decodeStackTrace unpacks an encoded stack trace from Elasticsearch
|
||||
export function decodeStackTrace(input: EncodedStackTrace): StackTrace {
|
||||
const inputFrameIDs = input.Stacktrace.frame.ids;
|
||||
const inputFrameTypes = input.Stacktrace.frame.types;
|
||||
const countsFrameIDs = inputFrameIDs.length / BASE64_FRAME_ID_LENGTH;
|
||||
|
||||
const fileIDs: string[] = new Array(countsFrameIDs);
|
||||
const frameIDs: string[] = new Array(countsFrameIDs);
|
||||
const addressOrLines: number[] = new Array(countsFrameIDs);
|
||||
|
||||
// Step 1: Convert the base64-encoded frameID list into two separate
|
||||
// lists (frame IDs and file IDs), both of which are also base64-encoded.
|
||||
//
|
||||
// To get the frame ID, we grab the next 32 bytes.
|
||||
//
|
||||
// To get the file ID, we grab the first 22 bytes of the frame ID.
|
||||
// However, since the file ID is base64-encoded using 21.33 bytes
|
||||
// (16 * 4 / 3), then the 22 bytes have an extra 4 bits from the
|
||||
// address (see diagram in definition of EncodedStackTrace).
|
||||
for (let i = 0; i < countsFrameIDs; i++) {
|
||||
const pos = i * BASE64_FRAME_ID_LENGTH;
|
||||
const frameID = inputFrameIDs.slice(pos, pos + BASE64_FRAME_ID_LENGTH);
|
||||
const buf = Buffer.from(frameID, 'base64url');
|
||||
|
||||
fileIDs[i] = buf.toString('base64url', 0, 16);
|
||||
addressOrLines[i] = Number(buf.readBigUInt64BE(16));
|
||||
frameIDs[i] = frameID;
|
||||
}
|
||||
|
||||
// Step 2: Convert the run-length byte encoding into a list of uint8s.
|
||||
const types = Buffer.from(inputFrameTypes, 'base64url');
|
||||
const typeIDs = runLengthDecode(types, countsFrameIDs);
|
||||
|
||||
return {
|
||||
AddressOrLines: addressOrLines,
|
||||
FileIDs: fileIDs,
|
||||
FrameIDs: frameIDs,
|
||||
Types: typeIDs,
|
||||
} as StackTrace;
|
||||
}
|
||||
|
||||
export async function searchEventsGroupByStackTrace({
|
||||
logger,
|
||||
client,
|
||||
index,
|
||||
filter,
|
||||
}: {
|
||||
logger: Logger;
|
||||
client: ProfilingESClient;
|
||||
index: DownsampledEventsIndex;
|
||||
filter: ProjectTimeQuery;
|
||||
}) {
|
||||
const resEvents = await client.search('get_events_group_by_stack_trace', {
|
||||
index: index.name,
|
||||
track_total_hits: false,
|
||||
query: filter,
|
||||
aggs: {
|
||||
group_by: {
|
||||
terms: {
|
||||
// 'size' should be max 100k, but might be slightly more. Better be on the safe side.
|
||||
size: 150000,
|
||||
field: ProfilingESField.StacktraceID,
|
||||
// 'execution_hint: map' skips the slow building of ordinals that we don't need.
|
||||
// Especially with high cardinality fields, this makes aggregations really slow.
|
||||
// E.g. it reduces the latency from 70s to 0.7s on our 8.1. MVP cluster (as of 28.04.2022).
|
||||
execution_hint: 'map',
|
||||
},
|
||||
aggs: {
|
||||
count: {
|
||||
sum: {
|
||||
field: ProfilingESField.StacktraceCount,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
total_count: {
|
||||
sum: {
|
||||
field: ProfilingESField.StacktraceCount,
|
||||
},
|
||||
},
|
||||
},
|
||||
pre_filter_shard_size: 1,
|
||||
filter_path:
|
||||
'aggregations.group_by.buckets.key,aggregations.group_by.buckets.count,aggregations.total_count,_shards.failures',
|
||||
});
|
||||
|
||||
const totalCount = resEvents.aggregations?.total_count.value ?? 0;
|
||||
const stackTraceEvents = new Map<StackTraceID, number>();
|
||||
|
||||
resEvents.aggregations?.group_by?.buckets.forEach((item) => {
|
||||
const traceid: StackTraceID = String(item.key);
|
||||
stackTraceEvents.set(traceid, item.count.value ?? 0);
|
||||
});
|
||||
|
||||
logger.info('events total count: ' + totalCount);
|
||||
logger.info('unique stacktraces: ' + stackTraceEvents.size);
|
||||
|
||||
return { totalCount, stackTraceEvents };
|
||||
}
|
||||
|
||||
export async function mgetStackTraces({
|
||||
logger,
|
||||
client,
|
||||
events,
|
||||
concurrency = 1,
|
||||
}: {
|
||||
logger: Logger;
|
||||
client: ProfilingESClient;
|
||||
events: Map<StackTraceID, number>;
|
||||
concurrency?: number;
|
||||
}) {
|
||||
const stackTraceIDs = [...events.keys()];
|
||||
const chunkSize = Math.floor(events.size / concurrency);
|
||||
let chunks = chunk(stackTraceIDs, chunkSize);
|
||||
|
||||
if (chunks.length !== concurrency) {
|
||||
// The last array element contains the remainder, just drop it as irrelevant.
|
||||
chunks = chunks.slice(0, concurrency);
|
||||
}
|
||||
|
||||
const stackResponses = await withProfilingSpan('mget_stacktraces', () =>
|
||||
Promise.all(
|
||||
chunks.map((ids) => {
|
||||
return client.mget<
|
||||
PickFlattened<
|
||||
ProfilingStackTrace,
|
||||
ProfilingESField.StacktraceFrameIDs | ProfilingESField.StacktraceFrameTypes
|
||||
>
|
||||
>('mget_stacktraces_chunk', {
|
||||
index: INDEX_TRACES,
|
||||
ids,
|
||||
realtime: true,
|
||||
_source_includes: [
|
||||
ProfilingESField.StacktraceFrameIDs,
|
||||
ProfilingESField.StacktraceFrameTypes,
|
||||
],
|
||||
});
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
let totalFrames = 0;
|
||||
const stackTraces = new Map<StackTraceID, StackTrace>();
|
||||
const stackFrameDocIDs = new Set<string>();
|
||||
const executableDocIDs = new Set<string>();
|
||||
|
||||
await logExecutionLatency(logger, 'processing data', async () => {
|
||||
// flatMap() is significantly slower than an explicit for loop
|
||||
for (const res of stackResponses) {
|
||||
for (const trace of res.docs) {
|
||||
if ('error' in trace) {
|
||||
continue;
|
||||
}
|
||||
// Sometimes we don't find the trace.
|
||||
// This is due to ES delays writing (data is not immediately seen after write).
|
||||
// Also, ES doesn't know about transactions.
|
||||
if (trace.found) {
|
||||
const traceid = trace._id as StackTraceID;
|
||||
let stackTrace = traceLRU.get(traceid) as StackTrace;
|
||||
if (!stackTrace) {
|
||||
stackTrace = decodeStackTrace(trace._source as EncodedStackTrace);
|
||||
traceLRU.set(traceid, stackTrace);
|
||||
}
|
||||
|
||||
totalFrames += stackTrace.FrameIDs.length;
|
||||
stackTraces.set(traceid, stackTrace);
|
||||
for (const frameID of stackTrace.FrameIDs) {
|
||||
stackFrameDocIDs.add(frameID);
|
||||
}
|
||||
for (const fileID of stackTrace.FileIDs) {
|
||||
executableDocIDs.add(fileID);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (stackTraces.size !== 0) {
|
||||
logger.info('Average size of stacktrace: ' + totalFrames / stackTraces.size);
|
||||
}
|
||||
|
||||
if (stackTraces.size < events.size) {
|
||||
logger.info(
|
||||
'failed to find ' + (events.size - stackTraces.size) + ' stacktraces (todo: find out why)'
|
||||
);
|
||||
}
|
||||
|
||||
return { stackTraces, stackFrameDocIDs, executableDocIDs };
|
||||
}
|
||||
|
||||
export async function mgetStackFrames({
|
||||
logger,
|
||||
client,
|
||||
stackFrameIDs,
|
||||
}: {
|
||||
logger: Logger;
|
||||
client: ProfilingESClient;
|
||||
stackFrameIDs: Set<string>;
|
||||
}): Promise<Map<StackFrameID, StackFrame>> {
|
||||
const stackFrames = new Map<StackFrameID, StackFrame>();
|
||||
|
||||
if (stackFrameIDs.size === 0) {
|
||||
return stackFrames;
|
||||
}
|
||||
|
||||
const resStackFrames = await client.mget<ProfilingStackFrame>('mget_stackframes', {
|
||||
index: INDEX_FRAMES,
|
||||
ids: [...stackFrameIDs],
|
||||
realtime: true,
|
||||
});
|
||||
|
||||
// Create a lookup map StackFrameID -> StackFrame.
|
||||
let framesFound = 0;
|
||||
await logExecutionLatency(logger, 'processing data', async () => {
|
||||
const docs = resStackFrames.docs;
|
||||
for (const frame of docs) {
|
||||
if ('error' in frame) {
|
||||
continue;
|
||||
}
|
||||
if (frame.found) {
|
||||
stackFrames.set(frame._id, {
|
||||
FileName: frame._source!.Stackframe.file?.name,
|
||||
FunctionName: frame._source!.Stackframe.function?.name,
|
||||
FunctionOffset: frame._source!.Stackframe.function?.offset,
|
||||
LineNumber: frame._source!.Stackframe.line?.number,
|
||||
SourceType: frame._source!.Stackframe.source?.type,
|
||||
});
|
||||
framesFound++;
|
||||
} else {
|
||||
stackFrames.set(frame._id, {
|
||||
FileName: '',
|
||||
FunctionName: '',
|
||||
FunctionOffset: 0,
|
||||
LineNumber: 0,
|
||||
SourceType: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
logger.info('found ' + framesFound + ' / ' + stackFrameIDs.size + ' frames');
|
||||
|
||||
return stackFrames;
|
||||
}
|
||||
|
||||
export async function mgetExecutables({
|
||||
logger,
|
||||
client,
|
||||
executableIDs,
|
||||
}: {
|
||||
logger: Logger;
|
||||
client: ProfilingESClient;
|
||||
executableIDs: Set<string>;
|
||||
}): Promise<Map<FileID, Executable>> {
|
||||
const executables = new Map<FileID, Executable>();
|
||||
|
||||
if (executableIDs.size === 0) {
|
||||
return executables;
|
||||
}
|
||||
|
||||
const resExecutables = await client.mget<ProfilingExecutable>('mget_executables', {
|
||||
index: INDEX_EXECUTABLES,
|
||||
ids: [...executableIDs],
|
||||
_source_includes: [ProfilingESField.ExecutableFileName],
|
||||
});
|
||||
|
||||
// Create a lookup map StackFrameID -> StackFrame.
|
||||
let exeFound = 0;
|
||||
await logExecutionLatency(logger, 'processing data', async () => {
|
||||
const docs = resExecutables.docs;
|
||||
for (const exe of docs) {
|
||||
if ('error' in exe) {
|
||||
continue;
|
||||
}
|
||||
if (exe.found) {
|
||||
executables.set(exe._id, {
|
||||
FileName: exe._source!.Executable.file.name,
|
||||
});
|
||||
exeFound++;
|
||||
} else {
|
||||
executables.set(exe._id, {
|
||||
FileName: '',
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
logger.info('found ' + exeFound + ' / ' + executableIDs.size + ' executables');
|
||||
|
||||
return executables;
|
||||
}
|
69
x-pack/plugins/profiling/server/routes/topn.test.ts
Normal file
69
x-pack/plugins/profiling/server/routes/topn.test.ts
Normal file
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { AggregationsAggregationContainer } from '@elastic/elasticsearch/lib/api/types';
|
||||
import { coreMock } from '@kbn/core/server/mocks';
|
||||
import { loggerMock } from '@kbn/logging-mocks';
|
||||
import { ProfilingESField } from '../../common/elasticsearch';
|
||||
import { ProfilingESClient } from '../utils/create_profiling_es_client';
|
||||
import { topNElasticSearchQuery } from './topn';
|
||||
|
||||
const anyQuery = 'any::query';
|
||||
const smallestInterval = '1s';
|
||||
const testAgg = { aggs: { test: {} } };
|
||||
|
||||
jest.mock('./query', () => ({
|
||||
createCommonFilter: ({}: {}) => {
|
||||
return anyQuery;
|
||||
},
|
||||
findFixedIntervalForBucketsPerTimeRange: (from: number, to: number, buckets: number): string => {
|
||||
return smallestInterval;
|
||||
},
|
||||
aggregateByFieldAndTimestamp: (
|
||||
searchField: string,
|
||||
interval: string
|
||||
): AggregationsAggregationContainer => {
|
||||
return testAgg;
|
||||
},
|
||||
}));
|
||||
|
||||
describe('TopN data from Elasticsearch', () => {
|
||||
const context = coreMock.createRequestHandlerContext();
|
||||
const client: ProfilingESClient = {
|
||||
search: jest.fn(
|
||||
(operationName, request) =>
|
||||
context.elasticsearch.client.asCurrentUser.search(request) as Promise<any>
|
||||
),
|
||||
mget: jest.fn(
|
||||
(operationName, request) =>
|
||||
context.elasticsearch.client.asCurrentUser.search(request) as Promise<any>
|
||||
),
|
||||
};
|
||||
const logger = loggerMock.create();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('when fetching Stack Traces', () => {
|
||||
it('should search first then skip mget', async () => {
|
||||
await topNElasticSearchQuery({
|
||||
client,
|
||||
logger,
|
||||
timeFrom: 456,
|
||||
timeTo: 789,
|
||||
searchField: ProfilingESField.StacktraceID,
|
||||
highCardinality: false,
|
||||
kuery: '',
|
||||
});
|
||||
|
||||
// Calls to mget are skipped since data doesn't exist
|
||||
expect(client.search).toHaveBeenCalledTimes(2);
|
||||
expect(client.mget).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
});
|
||||
});
|
260
x-pack/plugins/profiling/server/routes/topn.ts
Normal file
260
x-pack/plugins/profiling/server/routes/topn.ts
Normal file
|
@ -0,0 +1,260 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import type { IRouter, Logger } from '@kbn/core/server';
|
||||
import { RouteRegisterParameters } from '.';
|
||||
import { fromMapToRecord, getRoutePaths, INDEX_EVENTS } from '../../common';
|
||||
import { ProfilingESField } from '../../common/elasticsearch';
|
||||
import { computeBucketWidthFromTimeRangeAndBucketCount } from '../../common/histogram';
|
||||
import { groupStackFrameMetadataByStackTrace, StackTraceID } from '../../common/profiling';
|
||||
import { getFieldNameForTopNType, TopNType } from '../../common/stack_traces';
|
||||
import { createTopNSamples, getTopNAggregationRequest, TopNResponse } from '../../common/topn';
|
||||
import { ProfilingRequestHandlerContext } from '../types';
|
||||
import { createProfilingEsClient, ProfilingESClient } from '../utils/create_profiling_es_client';
|
||||
import { withProfilingSpan } from '../utils/with_profiling_span';
|
||||
import { getClient } from './compat';
|
||||
import { findDownsampledIndex } from './downsampling';
|
||||
import { createCommonFilter } from './query';
|
||||
import { mgetExecutables, mgetStackFrames, mgetStackTraces } from './stacktrace';
|
||||
|
||||
export async function topNElasticSearchQuery({
|
||||
client,
|
||||
logger,
|
||||
timeFrom,
|
||||
timeTo,
|
||||
searchField,
|
||||
highCardinality,
|
||||
kuery,
|
||||
}: {
|
||||
client: ProfilingESClient;
|
||||
logger: Logger;
|
||||
timeFrom: number;
|
||||
timeTo: number;
|
||||
searchField: string;
|
||||
highCardinality: boolean;
|
||||
kuery: string;
|
||||
}): Promise<TopNResponse> {
|
||||
const filter = createCommonFilter({ timeFrom, timeTo, kuery });
|
||||
const targetSampleSize = 20000; // minimum number of samples to get statistically sound results
|
||||
|
||||
const bucketWidth = computeBucketWidthFromTimeRangeAndBucketCount(timeFrom, timeTo, 50);
|
||||
|
||||
const eventsIndex = await findDownsampledIndex({
|
||||
logger,
|
||||
client,
|
||||
index: INDEX_EVENTS,
|
||||
filter,
|
||||
sampleSize: targetSampleSize,
|
||||
});
|
||||
|
||||
const resEvents = await client.search('get_topn_histogram', {
|
||||
index: eventsIndex.name,
|
||||
size: 0,
|
||||
query: filter,
|
||||
aggs: getTopNAggregationRequest({
|
||||
searchField,
|
||||
highCardinality,
|
||||
fixedInterval: `${bucketWidth}s`,
|
||||
}),
|
||||
// Adrien and Dario found out this is a work-around for some bug in 8.1.
|
||||
// It reduces the query time by avoiding unneeded searches.
|
||||
pre_filter_shard_size: 1,
|
||||
});
|
||||
|
||||
const { aggregations } = resEvents;
|
||||
|
||||
if (!aggregations) {
|
||||
return {
|
||||
TotalCount: 0,
|
||||
TopN: [],
|
||||
Metadata: {},
|
||||
};
|
||||
}
|
||||
|
||||
// Creating top N samples requires the time range and bucket width to
|
||||
// be in milliseconds, not seconds
|
||||
const topN = createTopNSamples(aggregations, timeFrom * 1000, timeTo * 1000, bucketWidth * 1000);
|
||||
|
||||
for (let i = 0; i < topN.length; i++) {
|
||||
topN[i].Count = (topN[i].Count ?? 0) / eventsIndex.sampleRate;
|
||||
}
|
||||
|
||||
let totalSampledStackTraces = aggregations.total_count.value ?? 0;
|
||||
logger.info('total sampled stacktraces: ' + totalSampledStackTraces);
|
||||
totalSampledStackTraces = Math.floor(totalSampledStackTraces / eventsIndex.sampleRate);
|
||||
|
||||
if (searchField !== ProfilingESField.StacktraceID) {
|
||||
return {
|
||||
TotalCount: totalSampledStackTraces,
|
||||
TopN: topN,
|
||||
Metadata: {},
|
||||
};
|
||||
}
|
||||
|
||||
const stackTraceEvents = new Map<StackTraceID, number>();
|
||||
const groupByBuckets = aggregations.group_by.buckets ?? [];
|
||||
let totalAggregatedStackTraces = 0;
|
||||
|
||||
for (let i = 0; i < groupByBuckets.length; i++) {
|
||||
const stackTraceID = String(groupByBuckets[i].key);
|
||||
const count = Math.floor((groupByBuckets[i].count.value ?? 0) / eventsIndex.sampleRate);
|
||||
totalAggregatedStackTraces += count;
|
||||
stackTraceEvents.set(stackTraceID, count);
|
||||
}
|
||||
|
||||
logger.info('total aggregated stacktraces: ' + totalAggregatedStackTraces);
|
||||
logger.info('unique aggregated stacktraces: ' + stackTraceEvents.size);
|
||||
|
||||
const { stackTraces, stackFrameDocIDs, executableDocIDs } = await mgetStackTraces({
|
||||
logger,
|
||||
client,
|
||||
events: stackTraceEvents,
|
||||
});
|
||||
|
||||
const [stackFrames, executables] = await withProfilingSpan(
|
||||
'get_stackframes_and_executables',
|
||||
() => {
|
||||
return Promise.all([
|
||||
mgetStackFrames({ logger, client, stackFrameIDs: stackFrameDocIDs }),
|
||||
mgetExecutables({ logger, client, executableIDs: executableDocIDs }),
|
||||
]);
|
||||
}
|
||||
);
|
||||
|
||||
const metadata = await withProfilingSpan('collect_stackframe_metadata', async () => {
|
||||
return fromMapToRecord(
|
||||
groupStackFrameMetadataByStackTrace(stackTraces, stackFrames, executables)
|
||||
);
|
||||
});
|
||||
|
||||
logger.info('returning payload response to client');
|
||||
|
||||
return {
|
||||
TotalCount: totalSampledStackTraces,
|
||||
TopN: topN,
|
||||
Metadata: metadata,
|
||||
};
|
||||
}
|
||||
|
||||
export function queryTopNCommon(
|
||||
router: IRouter<ProfilingRequestHandlerContext>,
|
||||
logger: Logger,
|
||||
pathName: string,
|
||||
searchField: string,
|
||||
highCardinality: boolean
|
||||
) {
|
||||
router.get(
|
||||
{
|
||||
path: pathName,
|
||||
validate: {
|
||||
query: schema.object({
|
||||
timeFrom: schema.number(),
|
||||
timeTo: schema.number(),
|
||||
kuery: schema.string(),
|
||||
}),
|
||||
},
|
||||
},
|
||||
async (context, request, response) => {
|
||||
const { timeFrom, timeTo, kuery } = request.query;
|
||||
const client = await getClient(context);
|
||||
|
||||
try {
|
||||
return response.ok({
|
||||
body: await topNElasticSearchQuery({
|
||||
client: createProfilingEsClient({ request, esClient: client }),
|
||||
logger,
|
||||
timeFrom,
|
||||
timeTo,
|
||||
searchField,
|
||||
highCardinality,
|
||||
kuery,
|
||||
}),
|
||||
});
|
||||
} catch (e) {
|
||||
logger.error(e);
|
||||
|
||||
return response.customError({
|
||||
statusCode: e.statusCode ?? 500,
|
||||
body: {
|
||||
message: 'Profiling TopN request failed: ' + e.message + '; full error ' + e.toString(),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
export function registerTraceEventsTopNContainersSearchRoute({
|
||||
router,
|
||||
logger,
|
||||
}: RouteRegisterParameters) {
|
||||
const paths = getRoutePaths();
|
||||
return queryTopNCommon(
|
||||
router,
|
||||
logger,
|
||||
paths.TopNContainers,
|
||||
getFieldNameForTopNType(TopNType.Containers),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
export function registerTraceEventsTopNDeploymentsSearchRoute({
|
||||
router,
|
||||
logger,
|
||||
}: RouteRegisterParameters) {
|
||||
const paths = getRoutePaths();
|
||||
return queryTopNCommon(
|
||||
router,
|
||||
logger,
|
||||
paths.TopNDeployments,
|
||||
getFieldNameForTopNType(TopNType.Deployments),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
export function registerTraceEventsTopNHostsSearchRoute({
|
||||
router,
|
||||
logger,
|
||||
}: RouteRegisterParameters) {
|
||||
const paths = getRoutePaths();
|
||||
return queryTopNCommon(
|
||||
router,
|
||||
logger,
|
||||
paths.TopNHosts,
|
||||
getFieldNameForTopNType(TopNType.Hosts),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
export function registerTraceEventsTopNStackTracesSearchRoute({
|
||||
router,
|
||||
logger,
|
||||
}: RouteRegisterParameters) {
|
||||
const paths = getRoutePaths();
|
||||
return queryTopNCommon(
|
||||
router,
|
||||
logger,
|
||||
paths.TopNTraces,
|
||||
getFieldNameForTopNType(TopNType.Traces),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
export function registerTraceEventsTopNThreadsSearchRoute({
|
||||
router,
|
||||
logger,
|
||||
}: RouteRegisterParameters) {
|
||||
const paths = getRoutePaths();
|
||||
return queryTopNCommon(
|
||||
router,
|
||||
logger,
|
||||
paths.TopNThreads,
|
||||
getFieldNameForTopNType(TopNType.Threads),
|
||||
true
|
||||
);
|
||||
}
|
25
x-pack/plugins/profiling/server/types.ts
Normal file
25
x-pack/plugins/profiling/server/types.ts
Normal file
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { RequestHandlerContext } from '@kbn/core/server';
|
||||
import { PluginSetupContract as FeaturesPluginSetup } from '@kbn/features-plugin/server';
|
||||
import { ObservabilityPluginSetup } from '@kbn/observability-plugin/server';
|
||||
|
||||
export interface ProfilingPluginSetupDeps {
|
||||
observability: ObservabilityPluginSetup;
|
||||
features: FeaturesPluginSetup;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
export interface ProfilingPluginStartDeps {}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
export interface ProfilingPluginSetup {}
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
export interface ProfilingPluginStart {}
|
||||
|
||||
export type ProfilingRequestHandlerContext = RequestHandlerContext;
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { ESSearchRequest, InferSearchResponseOf } from '@kbn/core/types/elasticsearch';
|
||||
import type { KibanaRequest } from '@kbn/core/server';
|
||||
import { unwrapEsResponse } from '@kbn/observability-plugin/server';
|
||||
import { MgetRequest, MgetResponse } from '@elastic/elasticsearch/lib/api/types';
|
||||
import { ProfilingESEvent } from '../../common/elasticsearch';
|
||||
import { withProfilingSpan } from './with_profiling_span';
|
||||
|
||||
export function cancelEsRequestOnAbort<T extends Promise<any>>(
|
||||
promise: T,
|
||||
request: KibanaRequest,
|
||||
controller: AbortController
|
||||
): T {
|
||||
const subscription = request.events.aborted$.subscribe(() => {
|
||||
controller.abort();
|
||||
});
|
||||
|
||||
return promise.finally(() => subscription.unsubscribe()) as T;
|
||||
}
|
||||
|
||||
export interface ProfilingESClient {
|
||||
search<TDocument = unknown, TSearchRequest extends ESSearchRequest = ESSearchRequest>(
|
||||
operationName: string,
|
||||
searchRequest: TSearchRequest
|
||||
): Promise<InferSearchResponseOf<TDocument, TSearchRequest>>;
|
||||
mget<TDocument = ProfilingESEvent>(
|
||||
operationName: string,
|
||||
mgetRequest: MgetRequest
|
||||
): Promise<MgetResponse<TDocument>>;
|
||||
}
|
||||
|
||||
export function createProfilingEsClient({
|
||||
request,
|
||||
esClient,
|
||||
}: {
|
||||
request: KibanaRequest;
|
||||
esClient: ElasticsearchClient;
|
||||
}): ProfilingESClient {
|
||||
return {
|
||||
search<TDocument = unknown, TSearchRequest extends ESSearchRequest = ESSearchRequest>(
|
||||
operationName: string,
|
||||
searchRequest: TSearchRequest
|
||||
): Promise<InferSearchResponseOf<TDocument, TSearchRequest>> {
|
||||
const controller = new AbortController();
|
||||
|
||||
const promise = withProfilingSpan(operationName, () => {
|
||||
return cancelEsRequestOnAbort(
|
||||
esClient.search(searchRequest, {
|
||||
signal: controller.signal,
|
||||
meta: true,
|
||||
}) as unknown as Promise<{
|
||||
body: InferSearchResponseOf<TDocument, TSearchRequest>;
|
||||
}>,
|
||||
request,
|
||||
controller
|
||||
);
|
||||
});
|
||||
|
||||
return unwrapEsResponse(promise);
|
||||
},
|
||||
mget<TDocument = ProfilingESEvent>(
|
||||
operationName: string,
|
||||
mgetRequest: MgetRequest
|
||||
): Promise<MgetResponse<TDocument>> {
|
||||
const controller = new AbortController();
|
||||
|
||||
const promise = withProfilingSpan(operationName, () => {
|
||||
return cancelEsRequestOnAbort(
|
||||
esClient.mget<TDocument>(mgetRequest, {
|
||||
signal: controller.signal,
|
||||
meta: true,
|
||||
}),
|
||||
request,
|
||||
controller
|
||||
);
|
||||
});
|
||||
|
||||
return unwrapEsResponse(promise);
|
||||
},
|
||||
};
|
||||
}
|
25
x-pack/plugins/profiling/server/utils/with_profiling_span.ts
Normal file
25
x-pack/plugins/profiling/server/utils/with_profiling_span.ts
Normal file
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { withSpan, SpanOptions, parseSpanOptions } from '@kbn/apm-utils';
|
||||
|
||||
export function withProfilingSpan<T>(
|
||||
optionsOrName: SpanOptions | string,
|
||||
cb: () => Promise<T>
|
||||
): Promise<T> {
|
||||
const options = parseSpanOptions(optionsOrName);
|
||||
|
||||
const optionsWithDefaults = {
|
||||
...(options.intercept ? {} : { type: 'plugin:profiling' }),
|
||||
...options,
|
||||
labels: {
|
||||
plugin: 'profiling',
|
||||
...options.labels,
|
||||
},
|
||||
};
|
||||
|
||||
return withSpan(optionsWithDefaults, cb);
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue