[Profiling] Replace mget calls in the Stacktraces view (#157808)

## Summary

This PR removes all references to `mget` calls as started in
https://github.com/elastic/prodfiler/issues/2775.

I replaced the metadata lookup in the stacktraces view with the
Elasticsearch plugin endpoint (i.e. `_profiling/stacktraces).

The main consequence of this change is that we no longer need to cache
the results when decoding the key-value pairs. However, the endpoints
that clear the internal caches will need to be remove separately so that
we can remove the calls to these endpoints from other clients.

### For maintainers

- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
Joseph Crail 2023-05-16 05:37:12 -07:00 committed by GitHub
parent 9b9d8cb347
commit fa912516e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 33 additions and 488 deletions

View file

@ -14,8 +14,8 @@ import { handleRouteHandlerError } from '../utils/handle_route_error_handler';
import { createBaseFlameGraph } from '../../common/flamegraph';
import { withProfilingSpan } from '../utils/with_profiling_span';
import { getClient } from './compat';
import { getStackTraces } from './get_stacktraces';
import { createCommonFilter } from './query';
import { searchStackTraces } from './search_stacktraces';
export function registerFlameChartSearchRoute({
router,
@ -50,9 +50,7 @@ export function registerFlameChartSearchRoute({
const t0 = Date.now();
const { stackTraceEvents, stackTraces, executables, stackFrames, totalFrames } =
await getStackTraces({
context,
logger,
await searchStackTraces({
client: profilingElasticsearchClient,
filter,
sampleSize: targetSampleSize,

View file

@ -12,8 +12,8 @@ import { createTopNFunctions } from '../../common/functions';
import { handleRouteHandlerError } from '../utils/handle_route_error_handler';
import { withProfilingSpan } from '../utils/with_profiling_span';
import { getClient } from './compat';
import { getStackTraces } from './get_stacktraces';
import { createCommonFilter } from './query';
import { searchStackTraces } from './search_stacktraces';
const querySchema = schema.object({
timeFrom: schema.number(),
@ -52,13 +52,13 @@ export function registerTopNFunctionsSearchRoute({
});
const t0 = Date.now();
const { stackTraceEvents, stackTraces, executables, stackFrames } = await getStackTraces({
context,
logger,
client: profilingElasticsearchClient,
filter,
sampleSize: targetSampleSize,
});
const { stackTraceEvents, stackTraces, executables, stackFrames } = await searchStackTraces(
{
client: profilingElasticsearchClient,
filter,
sampleSize: targetSampleSize,
}
);
logger.info(`querying stacktraces took ${Date.now() - t0} ms`);
const t1 = Date.now();

View file

@ -1,32 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { RequestHandlerContext } from '@kbn/core/server';
import { Logger } from '@kbn/logging';
import { ProfilingESClient } from '../utils/create_profiling_es_client';
import { ProjectTimeQuery } from './query';
import { searchStackTraces } from './search_stacktraces';
export async function getStackTraces({
context,
logger,
client,
filter,
sampleSize,
}: {
context: RequestHandlerContext;
logger: Logger;
client: ProfilingESClient;
filter: ProjectTimeQuery;
sampleSize: number;
}) {
return await searchStackTraces({
client,
filter,
sampleSize,
});
}

View file

@ -5,10 +5,9 @@
* 2.0.
*/
import LRUCache from 'lru-cache';
import { createStackFrameID, StackFrame, StackFrameID, StackTrace } from '../../common/profiling';
import { createStackFrameID, StackTrace } from '../../common/profiling';
import { runLengthEncode } from '../../common/run_length_encoding';
import { decodeStackTrace, EncodedStackTrace, updateStackFrameMap } from './stacktrace';
import { decodeStackTrace, EncodedStackTrace } from './stacktrace';
enum fileID {
A = 'aQpJmTLWydNvOapSFZOwKg',
@ -87,109 +86,3 @@ describe('Stack trace operations', () => {
}
});
});
describe('Stack frame operations', () => {
test('updateStackFrameMap with no frames', () => {
const stackFrameMap = new Map<StackFrameID, StackFrame>();
const stackFrameCache = new LRUCache<StackFrameID, StackFrame>();
const hits = updateStackFrameMap([], stackFrameMap, stackFrameCache);
expect(hits).toEqual(0);
expect(stackFrameMap.size).toEqual(0);
expect(stackFrameCache.length).toEqual(0);
});
test('updateStackFrameMap with missing frames', () => {
const stackFrameMap = new Map<StackFrameID, StackFrame>();
const stackFrameCache = new LRUCache<StackFrameID, StackFrame>();
const stackFrames = [
{
_index: 'profiling-stackframes',
_id: 'stackframe-001',
found: false,
},
];
const hits = updateStackFrameMap(stackFrames, stackFrameMap, stackFrameCache);
expect(hits).toEqual(0);
expect(stackFrameMap.size).toEqual(1);
expect(stackFrameCache.length).toEqual(1);
});
test('updateStackFrameMap with one partial non-inlined frame', () => {
const stackFrameMap = new Map<StackFrameID, StackFrame>();
const stackFrameCache = new LRUCache<StackFrameID, StackFrame>();
const id = 'stackframe-001';
const source = {
'ecs.version': '1.0.0',
'Stackframe.function.name': 'calloc',
};
const expected = {
FileName: undefined,
FunctionName: 'calloc',
FunctionOffset: undefined,
LineNumber: undefined,
SourceType: undefined,
};
const stackFrames = [
{
_index: 'profiling-stackframes',
_id: id,
_version: 1,
_seq_no: 1,
_primary_term: 1,
found: true,
_source: source,
},
];
const hits = updateStackFrameMap(stackFrames, stackFrameMap, stackFrameCache);
expect(hits).toEqual(1);
expect(stackFrameMap.size).toEqual(1);
expect(stackFrameCache.length).toEqual(1);
expect(stackFrameMap.get(id)).toEqual(expected);
});
test('updateStackFrameMap with one partial inlined frame', () => {
const stackFrameMap = new Map<StackFrameID, StackFrame>();
const stackFrameCache = new LRUCache<StackFrameID, StackFrame>();
const id = 'stackframe-001';
const source = {
'ecs.version': '1.0.0',
'Stackframe.function.name': ['calloc', 'memset'],
};
const expected = {
FileName: undefined,
FunctionName: 'calloc',
FunctionOffset: undefined,
LineNumber: undefined,
SourceType: undefined,
};
const stackFrames = [
{
_index: 'profiling-stackframes',
_id: id,
_version: 1,
_seq_no: 1,
_primary_term: 1,
found: true,
_source: source,
},
];
const hits = updateStackFrameMap(stackFrames, stackFrameMap, stackFrameCache);
expect(hits).toEqual(1);
expect(stackFrameMap.size).toEqual(1);
expect(stackFrameCache.length).toEqual(1);
expect(stackFrameMap.get(id)).toEqual(expected);
});
});

View file

@ -5,20 +5,9 @@
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import LRUCache from 'lru-cache';
import { INDEX_EXECUTABLES, INDEX_FRAMES, INDEX_TRACES } from '../../common';
import { DedotObject, ProfilingESField } from '../../common/elasticsearch';
import {
DedotObject,
PickFlattened,
ProfilingESField,
ProfilingExecutable,
ProfilingStackFrame,
ProfilingStackTrace,
} from '../../common/elasticsearch';
import {
emptyExecutable,
emptyStackFrame,
Executable,
FileID,
getAddressFromStackFrameID,
@ -26,11 +15,8 @@ import {
StackFrame,
StackFrameID,
StackTrace,
StackTraceID,
} from '../../common/profiling';
import { runLengthDecodeBase64Url } from '../../common/run_length_encoding';
import { ProfilingESClient } from '../utils/create_profiling_es_client';
import { withProfilingSpan } from '../utils/with_profiling_span';
const BASE64_FRAME_ID_LENGTH = 32;
@ -93,125 +79,6 @@ export function decodeStackTrace(input: EncodedStackTrace): StackTrace {
} as StackTrace;
}
function summarizeCacheAndQuery(
logger: Logger,
name: string,
cacheHits: number,
cacheTotal: number,
queryHits: number,
queryTotal: number
) {
logger.info(`found ${cacheHits} out of ${cacheTotal} ${name} in the cache`);
if (cacheHits === cacheTotal) {
return;
}
logger.info(`found ${queryHits} out of ${queryTotal} ${name}`);
if (queryHits < queryTotal) {
logger.info(`failed to find ${queryTotal - queryHits} ${name}`);
}
}
const traceLRU = new LRUCache<StackTraceID, StackTrace>({ max: 20000 });
export async function mgetStackTraces({
logger,
client,
events,
}: {
logger: Logger;
client: ProfilingESClient;
events: Map<StackTraceID, number>;
}) {
const stackTraceIDs = new Set([...events.keys()]);
const stackTraces = new Map<StackTraceID, StackTrace>();
let cacheHits = 0;
let totalFrames = 0;
const stackFrameDocIDs = new Set<string>();
const executableDocIDs = new Set<string>();
for (const stackTraceID of stackTraceIDs) {
const stackTrace = traceLRU.get(stackTraceID);
if (stackTrace) {
cacheHits++;
stackTraceIDs.delete(stackTraceID);
stackTraces.set(stackTraceID, stackTrace);
totalFrames += stackTrace.FrameIDs.length;
for (const frameID of stackTrace.FrameIDs) {
stackFrameDocIDs.add(frameID);
}
for (const fileID of stackTrace.FileIDs) {
executableDocIDs.add(fileID);
}
}
}
if (stackTraceIDs.size === 0) {
summarizeCacheAndQuery(logger, 'stacktraces', cacheHits, events.size, 0, 0);
return { stackTraces, totalFrames, stackFrameDocIDs, executableDocIDs };
}
const stackResponses = await client.mget<
PickFlattened<
ProfilingStackTrace,
ProfilingESField.StacktraceFrameIDs | ProfilingESField.StacktraceFrameTypes
>
>('mget_stacktraces', {
index: INDEX_TRACES,
ids: [...stackTraceIDs],
realtime: true,
_source_includes: [ProfilingESField.StacktraceFrameIDs, ProfilingESField.StacktraceFrameTypes],
});
let queryHits = 0;
const t0 = Date.now();
await withProfilingSpan('decode_stacktraces', async () => {
for (const trace of stackResponses.docs) {
if ('error' in trace) {
continue;
}
// Sometimes we don't find the trace.
// This is due to ES delays writing (data is not immediately seen after write).
// Also, ES doesn't know about transactions.
if (trace.found) {
queryHits++;
const traceid = trace._id as StackTraceID;
const stackTrace = decodeStackTrace(trace._source as EncodedStackTrace);
stackTraces.set(traceid, stackTrace);
traceLRU.set(traceid, stackTrace);
totalFrames += stackTrace.FrameIDs.length;
for (const frameID of stackTrace.FrameIDs) {
stackFrameDocIDs.add(frameID);
}
for (const fileID of stackTrace.FileIDs) {
executableDocIDs.add(fileID);
}
}
}
});
logger.info(`processing data took ${Date.now() - t0} ms`);
if (stackTraces.size !== 0) {
logger.info('Average size of stacktrace: ' + totalFrames / stackTraces.size);
}
summarizeCacheAndQuery(
logger,
'stacktraces',
cacheHits,
events.size,
queryHits,
stackTraceIDs.size
);
return { stackTraces, totalFrames, stackFrameDocIDs, executableDocIDs };
}
const frameLRU = new LRUCache<StackFrameID, StackFrame>({
max: CACHE_MAX_ITEMS,
maxAge: CACHE_TTL_MILLISECONDS,
@ -224,111 +91,6 @@ export function clearStackFrameCache(): number {
return numDeleted;
}
export function updateStackFrameMap(
stackFrames: any,
stackFrameMap: Map<StackFrameID, StackFrame>,
stackFrameCache: LRUCache<StackFrameID, StackFrame>
): number {
let found = 0;
for (const frame of stackFrames) {
if ('error' in frame) {
continue;
}
if (frame.found) {
found++;
const fileName = frame._source[ProfilingESField.StackframeFileName];
const functionName = frame._source[ProfilingESField.StackframeFunctionName];
const functionOffset = frame._source[ProfilingESField.StackframeFunctionOffset];
const lineNumber = frame._source[ProfilingESField.StackframeLineNumber];
let stackFrame;
if (Array.isArray(functionName)) {
// Each field in a stackframe is represented by an array. This is
// necessary to support inline frames.
//
// We only take the first available inline stackframe until the UI
// can support all of them.
stackFrame = {
FileName: fileName && fileName[0],
FunctionName: functionName && functionName[0],
FunctionOffset: functionOffset && functionOffset[0],
LineNumber: lineNumber && lineNumber[0],
};
} else {
if (fileName || functionName) {
stackFrame = {
FileName: fileName,
FunctionName: functionName,
FunctionOffset: functionOffset,
LineNumber: lineNumber,
};
} else {
// pre 8.7 format with synthetic source
const sf = frame._source.Stackframe;
stackFrame = {
FileName: sf?.file?.name,
FunctionName: sf?.function?.name,
FunctionOffset: sf?.function?.offset,
LineNumber: sf?.line?.number,
};
}
}
stackFrameMap.set(frame._id, stackFrame);
stackFrameCache.set(frame._id, stackFrame);
continue;
}
stackFrameMap.set(frame._id, emptyStackFrame);
stackFrameCache.set(frame._id, emptyStackFrame);
}
return found;
}
export async function mgetStackFrames({
logger,
client,
stackFrameIDs,
}: {
logger: Logger;
client: ProfilingESClient;
stackFrameIDs: Set<string>;
}): Promise<Map<StackFrameID, StackFrame>> {
const stackFrames = new Map<StackFrameID, StackFrame>();
let cacheHits = 0;
const cacheTotal = stackFrameIDs.size;
for (const stackFrameID of stackFrameIDs) {
const stackFrame = frameLRU.get(stackFrameID);
if (stackFrame) {
cacheHits++;
stackFrames.set(stackFrameID, stackFrame);
stackFrameIDs.delete(stackFrameID);
}
}
if (stackFrameIDs.size === 0) {
summarizeCacheAndQuery(logger, 'frames', cacheHits, cacheTotal, 0, 0);
return stackFrames;
}
const resStackFrames = await client.mget<ProfilingStackFrame>('mget_stackframes', {
index: INDEX_FRAMES,
ids: [...stackFrameIDs],
realtime: true,
});
const t0 = Date.now();
const queryHits = updateStackFrameMap(resStackFrames.docs, stackFrames, frameLRU);
logger.info(`processing data took ${Date.now() - t0} ms`);
summarizeCacheAndQuery(logger, 'frames', cacheHits, cacheTotal, queryHits, stackFrameIDs.size);
return stackFrames;
}
const executableLRU = new LRUCache<FileID, Executable>({
max: CACHE_MAX_ITEMS,
maxAge: CACHE_TTL_MILLISECONDS,
@ -340,73 +102,3 @@ export function clearExecutableCache(): number {
executableLRU.reset();
return numDeleted;
}
export async function mgetExecutables({
logger,
client,
executableIDs,
}: {
logger: Logger;
client: ProfilingESClient;
executableIDs: Set<string>;
}): Promise<Map<FileID, Executable>> {
const executables = new Map<FileID, Executable>();
let cacheHits = 0;
const cacheTotal = executableIDs.size;
for (const fileID of executableIDs) {
const executable = executableLRU.get(fileID);
if (executable) {
cacheHits++;
executables.set(fileID, executable);
executableIDs.delete(fileID);
}
}
if (executableIDs.size === 0) {
summarizeCacheAndQuery(logger, 'frames', cacheHits, cacheTotal, 0, 0);
return executables;
}
const resExecutables = await client.mget<ProfilingExecutable>('mget_executables', {
index: INDEX_EXECUTABLES,
ids: [...executableIDs],
_source_includes: [ProfilingESField.ExecutableFileName],
});
// Create a lookup map FileID -> Executable.
let queryHits = 0;
const t0 = Date.now();
const docs = resExecutables.docs;
for (const exe of docs) {
if ('error' in exe) {
continue;
}
if (exe.found) {
queryHits++;
const executable = {
FileName: exe._source!.Executable.file.name,
};
executables.set(exe._id, executable);
executableLRU.set(exe._id, executable);
continue;
}
executables.set(exe._id, emptyExecutable);
executableLRU.set(exe._id, emptyExecutable);
}
logger.info(`processing data took ${Date.now() - t0} ms`);
summarizeCacheAndQuery(
logger,
'executables',
cacheHits,
cacheTotal,
queryHits,
executableIDs.size
);
return executables;
}

View file

@ -11,7 +11,7 @@ import { RouteRegisterParameters } from '.';
import { getRoutePaths, INDEX_EVENTS } from '../../common';
import { ProfilingESField } from '../../common/elasticsearch';
import { computeBucketWidthFromTimeRangeAndBucketCount } from '../../common/histogram';
import { groupStackFrameMetadataByStackTrace, StackTraceID } from '../../common/profiling';
import { groupStackFrameMetadataByStackTrace } from '../../common/profiling';
import { getFieldNameForTopNType, TopNType } from '../../common/stack_traces';
import { createTopNSamples, getTopNAggregationRequest, TopNResponse } from '../../common/topn';
import { handleRouteHandlerError } from '../utils/handle_route_error_handler';
@ -20,7 +20,7 @@ import { withProfilingSpan } from '../utils/with_profiling_span';
import { getClient } from './compat';
import { findDownsampledIndex } from './downsampling';
import { createCommonFilter } from './query';
import { mgetExecutables, mgetStackFrames, mgetStackTraces } from './stacktrace';
import { searchStackTraces } from './search_stacktraces';
export async function topNElasticSearchQuery({
client,
@ -112,32 +112,26 @@ export async function topNElasticSearchQuery({
};
}
const stackTraceEvents = new Map<StackTraceID, number>();
let totalAggregatedStackTraces = 0;
const { stackTraces, executables, stackFrames } = await withProfilingSpan(
'search_stacktraces',
async () => {
const stackTraceIDs: Set<string> = new Set<string>();
for (let i = 0; i < groupByBuckets.length; i++) {
stackTraceIDs.add(String(groupByBuckets[i].key));
}
for (let i = 0; i < groupByBuckets.length; i++) {
const stackTraceID = String(groupByBuckets[i].key);
const count = Math.floor((groupByBuckets[i].count.value ?? 0) / eventsIndex.sampleRate);
totalAggregatedStackTraces += count;
stackTraceEvents.set(stackTraceID, count);
}
const stackTraceKuery = [...stackTraceIDs].join(' or ');
const stackTraceFilter = createCommonFilter({
timeFrom,
timeTo,
kuery: stackTraceKuery,
});
logger.info('total aggregated stacktraces: ' + totalAggregatedStackTraces);
logger.info('unique aggregated stacktraces: ' + stackTraceEvents.size);
const { stackTraces, stackFrameDocIDs, executableDocIDs } = await mgetStackTraces({
logger,
client,
events: stackTraceEvents,
});
const [stackFrames, executables] = await withProfilingSpan(
'get_stackframes_and_executables',
() => {
return Promise.all([
mgetStackFrames({ logger, client, stackFrameIDs: stackFrameDocIDs }),
mgetExecutables({ logger, client, executableIDs: executableDocIDs }),
]);
return searchStackTraces({
client,
filter: stackTraceFilter,
sampleSize: targetSampleSize,
});
}
);