[8.8] [APM] Circuit breaker and perf improvements for service map (#159883) (#160060)

# Backport

This will backport the following commits from `main` to `8.8`:
- [[APM] Circuit breaker and perf improvements for service map
(#159883)](https://github.com/elastic/kibana/pull/159883)

<!--- Backport version: 8.9.7 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Dario
Gieselaar","email":"dario.gieselaar@elastic.co"},"sourceCommit":{"committedDate":"2023-06-20T16:38:23Z","message":"[APM]
Circuit breaker and perf improvements for service map
(#159883)\n\nCloses #101920\r\n\r\nThis PR does three things:\r\n\r\n-
add a `terminate_after` parameter to the search request for
the\r\nscripted metric agg. This is a configurable
setting\r\n(`xpack.apm.serviceMapTerminateAfter`) and defaults to 100k.
This is a\r\nshard-level parameter, so there's still the possibility of
lots of\r\nshards individually returning 100k documents and the
coordinating node\r\nrunning out of memory because it is collecting all
these docs from\r\nindividual shards. However, I suspect that there is
already some\r\nprotection in the reduce phase that will terminate the
request with a\r\nstack_overflow_error without OOMing, I've reached out
to the ES team to\r\nconfirm whether this is the case.\r\n- add
`xpack.apm.serviceMapMaxTraces`: this tells the max traces to\r\ninspect
in total, not just per search request. IE,
if\r\n`xpack.apm.serviceMapMaxTracesPerRequest` is 1, we simply chunk
the\r\ntraces in n chunks, so it doesn't really help with memory
management.\r\n`serviceMapMaxTraces` refers to the total amount of
traces to inspect.\r\n- rewrite `getConnections` to use local mutation
instead of\r\nimmutability. I saw huge CPU usage (with admittedly a
pathological\r\nscenario where there are 100s of services) in the
`getConnections`\r\nfunction, because it uses a deduplication mechanism
that is O(n²), so I\r\nrewrote it to O(n). Here's a before
:\r\n\r\n\r\n![image](6c24a7a2-3b48-4c95-9db2-563160a57aef)\r\n\r\nand
after:\r\n\r\n![image](c00b8428-3026-4610-aa8b-c0046e8f0e08)\r\n\r\nTo
reproduce an OOM, start ES with a much smaller amount of memory:\r\n`$
ES_JAVA_OPTS='-Xms236m -Xmx236m' yarn es snapshot`\r\n\r\nThen run the
synthtrace Service Map OOM scenario:\r\n`$ node scripts/synthtrace.js
service_map_oom --from=now-15m --to=now\r\n--clean`\r\n\r\nFinally,
navigate to `service-100` in the UI, and click on Service Map.\r\nThis
should trigger an
OOM.","sha":"1a9b2412299e98a210b4d902c4df92a710b32b97","branchLabelMapping":{"^v8.9.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:fix","Team:APM","v8.9.0","v8.8.2"],"number":159883,"url":"https://github.com/elastic/kibana/pull/159883","mergeCommit":{"message":"[APM]
Circuit breaker and perf improvements for service map
(#159883)\n\nCloses #101920\r\n\r\nThis PR does three things:\r\n\r\n-
add a `terminate_after` parameter to the search request for
the\r\nscripted metric agg. This is a configurable
setting\r\n(`xpack.apm.serviceMapTerminateAfter`) and defaults to 100k.
This is a\r\nshard-level parameter, so there's still the possibility of
lots of\r\nshards individually returning 100k documents and the
coordinating node\r\nrunning out of memory because it is collecting all
these docs from\r\nindividual shards. However, I suspect that there is
already some\r\nprotection in the reduce phase that will terminate the
request with a\r\nstack_overflow_error without OOMing, I've reached out
to the ES team to\r\nconfirm whether this is the case.\r\n- add
`xpack.apm.serviceMapMaxTraces`: this tells the max traces to\r\ninspect
in total, not just per search request. IE,
if\r\n`xpack.apm.serviceMapMaxTracesPerRequest` is 1, we simply chunk
the\r\ntraces in n chunks, so it doesn't really help with memory
management.\r\n`serviceMapMaxTraces` refers to the total amount of
traces to inspect.\r\n- rewrite `getConnections` to use local mutation
instead of\r\nimmutability. I saw huge CPU usage (with admittedly a
pathological\r\nscenario where there are 100s of services) in the
`getConnections`\r\nfunction, because it uses a deduplication mechanism
that is O(n²), so I\r\nrewrote it to O(n). Here's a before
:\r\n\r\n\r\n![image](6c24a7a2-3b48-4c95-9db2-563160a57aef)\r\n\r\nand
after:\r\n\r\n![image](c00b8428-3026-4610-aa8b-c0046e8f0e08)\r\n\r\nTo
reproduce an OOM, start ES with a much smaller amount of memory:\r\n`$
ES_JAVA_OPTS='-Xms236m -Xmx236m' yarn es snapshot`\r\n\r\nThen run the
synthtrace Service Map OOM scenario:\r\n`$ node scripts/synthtrace.js
service_map_oom --from=now-15m --to=now\r\n--clean`\r\n\r\nFinally,
navigate to `service-100` in the UI, and click on Service Map.\r\nThis
should trigger an
OOM.","sha":"1a9b2412299e98a210b4d902c4df92a710b32b97"}},"sourceBranch":"main","suggestedTargetBranches":["8.8"],"targetPullRequestStates":[{"branch":"main","label":"v8.9.0","labelRegex":"^v8.9.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/159883","number":159883,"mergeCommit":{"message":"[APM]
Circuit breaker and perf improvements for service map
(#159883)\n\nCloses #101920\r\n\r\nThis PR does three things:\r\n\r\n-
add a `terminate_after` parameter to the search request for
the\r\nscripted metric agg. This is a configurable
setting\r\n(`xpack.apm.serviceMapTerminateAfter`) and defaults to 100k.
This is a\r\nshard-level parameter, so there's still the possibility of
lots of\r\nshards individually returning 100k documents and the
coordinating node\r\nrunning out of memory because it is collecting all
these docs from\r\nindividual shards. However, I suspect that there is
already some\r\nprotection in the reduce phase that will terminate the
request with a\r\nstack_overflow_error without OOMing, I've reached out
to the ES team to\r\nconfirm whether this is the case.\r\n- add
`xpack.apm.serviceMapMaxTraces`: this tells the max traces to\r\ninspect
in total, not just per search request. IE,
if\r\n`xpack.apm.serviceMapMaxTracesPerRequest` is 1, we simply chunk
the\r\ntraces in n chunks, so it doesn't really help with memory
management.\r\n`serviceMapMaxTraces` refers to the total amount of
traces to inspect.\r\n- rewrite `getConnections` to use local mutation
instead of\r\nimmutability. I saw huge CPU usage (with admittedly a
pathological\r\nscenario where there are 100s of services) in the
`getConnections`\r\nfunction, because it uses a deduplication mechanism
that is O(n²), so I\r\nrewrote it to O(n). Here's a before
:\r\n\r\n\r\n![image](6c24a7a2-3b48-4c95-9db2-563160a57aef)\r\n\r\nand
after:\r\n\r\n![image](c00b8428-3026-4610-aa8b-c0046e8f0e08)\r\n\r\nTo
reproduce an OOM, start ES with a much smaller amount of memory:\r\n`$
ES_JAVA_OPTS='-Xms236m -Xmx236m' yarn es snapshot`\r\n\r\nThen run the
synthtrace Service Map OOM scenario:\r\n`$ node scripts/synthtrace.js
service_map_oom --from=now-15m --to=now\r\n--clean`\r\n\r\nFinally,
navigate to `service-100` in the UI, and click on Service Map.\r\nThis
should trigger an
OOM.","sha":"1a9b2412299e98a210b4d902c4df92a710b32b97"}},{"branch":"8.8","label":"v8.8.2","labelRegex":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Dario Gieselaar <dario.gieselaar@elastic.co>
This commit is contained in:
Kibana Machine 2023-06-20 14:12:15 -04:00 committed by GitHub
parent bc8ecea099
commit b2688fe4ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 204 additions and 162 deletions

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ApmFields, httpExitSpan } from '@kbn/apm-synthtrace-client';
import { service } from '@kbn/apm-synthtrace-client/src/lib/apm/service';
import { Transaction } from '@kbn/apm-synthtrace-client/src/lib/apm/transaction';
import { Scenario } from '../cli/scenario';
import { RunOptions } from '../cli/utils/parse_run_cli_flags';
import { getSynthtraceEnvironment } from '../lib/utils/get_synthtrace_environment';
const environment = getSynthtraceEnvironment(__filename);
const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
const numServices = 500;
const tracesPerMinute = 10;
return {
generate: ({ range }) => {
const services = new Array(numServices)
.fill(undefined)
.map((_, idx) => {
return service(`service-${idx}`, 'prod', environment).instance('service-instance');
})
.reverse();
return range.ratePerMinute(tracesPerMinute).generator((timestamp) => {
const rootTransaction = services.reduce((prev, currentService) => {
const tx = currentService
.transaction(`GET /my/function`, 'request')
.timestamp(timestamp)
.duration(1000)
.children(
...(prev
? [
currentService
.span(
httpExitSpan({
spanName: `exit-span-${currentService.fields['service.name']}`,
destinationUrl: `http://address-to-exit-span-${currentService.fields['service.name']}`,
})
)
.timestamp(timestamp)
.duration(1000)
.children(prev),
]
: [])
);
return tx;
}, undefined as Transaction | undefined);
return rootTransaction!;
});
},
};
};
export default scenario;

View file

@ -26,6 +26,8 @@ const configSchema = schema.object({
serviceMapTraceIdBucketSize: schema.number({ defaultValue: 65 }),
serviceMapTraceIdGlobalBucketSize: schema.number({ defaultValue: 6 }),
serviceMapMaxTracesPerRequest: schema.number({ defaultValue: 50 }),
serviceMapTerminateAfter: schema.number({ defaultValue: 100_000 }),
serviceMapMaxTraces: schema.number({ defaultValue: 1000 }),
ui: schema.object({
enabled: schema.boolean({ defaultValue: true }),
maxTraceItems: schema.number({ defaultValue: 5000 }),

View file

@ -15,12 +15,19 @@ import {
} from '../../../common/service_map';
import { APMEventClient } from '../../lib/helpers/create_es_client/create_apm_event_client';
export async function fetchServicePathsFromTraceIds(
apmEventClient: APMEventClient,
traceIds: string[],
start: number,
end: number
) {
export async function fetchServicePathsFromTraceIds({
apmEventClient,
traceIds,
start,
end,
terminateAfter,
}: {
apmEventClient: APMEventClient;
traceIds: string[];
start: number;
end: number;
terminateAfter: number;
}) {
// make sure there's a range so ES can skip shards
const dayInMs = 24 * 60 * 60 * 1000;
const startRange = start - dayInMs;
@ -30,6 +37,7 @@ export async function fetchServicePathsFromTraceIds(
apm: {
events: [ProcessorEvent.span, ProcessorEvent.transaction],
},
terminate_after: terminateAfter,
body: {
track_total_hits: false,
size: 0,

View file

@ -46,8 +46,10 @@ async function getConnectionData({
end,
serviceGroupKuery,
kuery,
logger,
}: IEnvOptions) {
return withApmSpan('get_service_map_connections', async () => {
logger.debug('Getting trace sample IDs');
const { traceIds } = await getTraceSampleIds({
config,
apmEventClient,
@ -59,6 +61,8 @@ async function getConnectionData({
kuery,
});
logger.debug(`Found ${traceIds.length} traces to inspect`);
const chunks = chunk(traceIds, config.serviceMapMaxTracesPerRequest);
const init = {
@ -70,6 +74,8 @@ async function getConnectionData({
return init;
}
logger.debug(`Executing scripted metric agg (${chunks.length} chunks)`);
const chunkedResponses = await withApmSpan(
'get_service_paths_from_all_trace_ids',
() =>
@ -80,12 +86,16 @@ async function getConnectionData({
traceIds: traceIdsChunk,
start,
end,
terminateAfter: config.serviceMapTerminateAfter,
logger,
})
)
)
);
return chunkedResponses.reduce((prev, current) => {
logger.debug('Received chunk responses');
const mergedResponses = chunkedResponses.reduce((prev, current) => {
return {
connections: prev.connections.concat(current.connections),
discoveredServices: prev.discoveredServices.concat(
@ -93,6 +103,10 @@ async function getConnectionData({
),
};
});
logger.debug('Merged responses');
return mergedResponses;
});
}
@ -119,12 +133,19 @@ export function getServiceMap(
getServiceStats(options),
anomaliesPromise,
]);
return transformServiceMapResponses({
logger.debug('Received and parsed all responses');
const transformedResponse = transformServiceMapResponses({
response: {
...connectionData,
services: servicesData,
anomalies,
},
});
logger.debug('Transformed service map response');
return transformedResponse;
});
}

View file

@ -11,9 +11,9 @@ import { Connection, ConnectionNode } from '../../../common/service_map';
function getConnectionsPairs(connections: Connection[]) {
return connections
.map((conn) => {
const source = `${conn.source['service.name']}:${conn.source['service.environment']}`;
const source = conn.source['service.name'];
const destination = conn.destination['service.name']
? `${conn.destination['service.name']}:${conn.destination['service.environment']}`
? conn.destination['service.name']
: conn.destination['span.type'];
return `${source} -> ${destination}`;
})
@ -21,139 +21,71 @@ function getConnectionsPairs(connections: Connection[]) {
}
describe('getConnections', () => {
describe('with environments defined', () => {
const paths = [
[
{
'service.environment': 'testing',
'service.name': 'opbeans-ruby',
'agent.name': 'ruby',
},
{
'service.environment': null,
'service.name': 'opbeans-node',
'agent.name': 'nodejs',
},
{
'service.environment': 'production',
'service.name': 'opbeans-go',
'agent.name': 'go',
},
{
'service.environment': 'production',
'service.name': 'opbeans-java',
'agent.name': 'java',
},
{
'span.subtype': 'http',
'span.destination.service.resource': '172.18.0.6:3000',
'span.type': 'external',
},
],
[
{
'service.environment': 'testing',
'service.name': 'opbeans-ruby',
'agent.name': 'ruby',
},
{
'service.environment': 'testing',
'service.name': 'opbeans-python',
'agent.name': 'python',
},
{
'span.subtype': 'http',
'span.destination.service.resource': '172.18.0.6:3000',
'span.type': 'external',
},
],
] as ConnectionNode[][];
const paths = [
[
{
'service.name': 'opbeans-ruby',
'agent.name': 'ruby',
},
{
'service.name': 'opbeans-node',
'agent.name': 'nodejs',
},
{
'service.name': 'opbeans-go',
'agent.name': 'go',
},
{
'service.name': 'opbeans-java',
'agent.name': 'java',
},
{
'span.subtype': 'http',
'span.destination.service.resource': '172.18.0.6:3000',
'span.type': 'external',
},
],
[
{
'service.name': 'opbeans-ruby',
'agent.name': 'ruby',
},
{
'service.name': 'opbeans-python',
'agent.name': 'python',
},
{
'span.subtype': 'http',
'span.destination.service.resource': '172.18.0.6:3000',
'span.type': 'external',
},
],
[
{
'service.name': 'opbeans-go',
'agent.name': 'go',
},
{
'service.name': 'opbeans-node',
'agent.name': 'nodejs',
},
],
] as ConnectionNode[][];
it('includes all connections', () => {
const connections = getConnections({
paths,
});
const connectionsPairs = getConnectionsPairs(connections);
expect(connectionsPairs).toEqual([
'opbeans-ruby:testing -> opbeans-node:null',
'opbeans-node:null -> opbeans-go:production',
'opbeans-go:production -> opbeans-java:production',
'opbeans-java:production -> external',
'opbeans-ruby:testing -> opbeans-python:testing',
'opbeans-python:testing -> external',
]);
it('includes all connections', () => {
const connections = getConnections({
paths,
});
});
describe('environment is "not defined"', () => {
it('includes all connections', () => {
const environmentNotDefinedPaths = [
[
{
'service.environment': 'production',
'service.name': 'opbeans-go',
'agent.name': 'go',
},
{
'service.environment': 'production',
'service.name': 'opbeans-java',
'agent.name': 'java',
},
{
'span.subtype': 'http',
'span.destination.service.resource': '172.18.0.6:3000',
'span.type': 'external',
},
],
[
{
'service.environment': null,
'service.name': 'opbeans-go',
'agent.name': 'go',
},
{
'service.environment': null,
'service.name': 'opbeans-java',
'agent.name': 'java',
},
{
'span.subtype': 'http',
'span.destination.service.resource': '172.18.0.6:3000',
'span.type': 'external',
},
],
[
{
'service.environment': null,
'service.name': 'opbeans-python',
'agent.name': 'python',
},
{
'service.environment': null,
'service.name': 'opbeans-node',
'agent.name': 'nodejs',
},
{
'span.subtype': 'http',
'span.destination.service.resource': '172.18.0.6:3000',
'span.type': 'external',
},
],
] as ConnectionNode[][];
const connections = getConnections({
paths: environmentNotDefinedPaths,
});
const connectionsPairs = getConnectionsPairs(connections);
expect(connectionsPairs).toEqual([
'opbeans-go:production -> opbeans-java:production',
'opbeans-java:production -> external',
'opbeans-go:null -> opbeans-java:null',
'opbeans-java:null -> external',
'opbeans-python:null -> opbeans-node:null',
'opbeans-node:null -> external',
]);
});
const connectionsPairs = getConnectionsPairs(connections);
expect(connectionsPairs).toEqual([
'opbeans-ruby -> opbeans-node',
'opbeans-node -> opbeans-go',
'opbeans-go -> opbeans-java',
'opbeans-java -> external',
'opbeans-ruby -> opbeans-python',
'opbeans-python -> external',
'opbeans-go -> opbeans-node',
]);
});
});

View file

@ -5,38 +5,43 @@
* 2.0.
*/
import { find, uniqBy } from 'lodash';
import { Logger } from '@kbn/logging';
import { Connection, ConnectionNode } from '../../../common/service_map';
import { APMEventClient } from '../../lib/helpers/create_es_client/create_apm_event_client';
import { fetchServicePathsFromTraceIds } from './fetch_service_paths_from_trace_ids';
import { getConnectionId } from './transform_service_map_responses';
export function getConnections({
paths,
}: {
paths: ConnectionNode[][] | undefined;
}) {
}): Connection[] {
if (!paths) {
return [];
}
const connectionsArr = paths.flatMap((path) => {
return path.reduce((conns, location, index) => {
const prev = path[index - 1];
const connectionsById: Map<string, Connection> = new Map();
paths.forEach((path) => {
path.forEach((location, i) => {
const prev = path[i - 1];
if (prev) {
return conns.concat({
const connection = {
source: prev,
destination: location,
});
};
const id = getConnectionId(connection);
if (!connectionsById.has(id)) {
connectionsById.set(id, connection);
}
}
return conns;
}, [] as Connection[]);
}, [] as Connection[]);
});
});
const connections = uniqBy(connectionsArr, (value) =>
find(connectionsArr, value)
);
return connections;
return Array.from(connectionsById.values());
}
export async function getServiceMapFromTraceIds({
@ -44,14 +49,26 @@ export async function getServiceMapFromTraceIds({
traceIds,
start,
end,
terminateAfter,
logger,
}: {
apmEventClient: APMEventClient;
traceIds: string[];
start: number;
end: number;
terminateAfter: number;
logger: Logger;
}) {
const serviceMapFromTraceIdsScriptResponse =
await fetchServicePathsFromTraceIds(apmEventClient, traceIds, start, end);
await fetchServicePathsFromTraceIds({
apmEventClient,
traceIds,
start,
end,
terminateAfter,
});
logger.debug('Received scripted metric agg response');
const serviceMapScriptedAggValue =
serviceMapFromTraceIdsScriptResponse.aggregations?.service_map.value;

View file

@ -26,8 +26,6 @@ import { environmentQuery } from '../../../common/utils/environment_query';
import { APMEventClient } from '../../lib/helpers/create_es_client/create_apm_event_client';
import { APMConfig } from '../..';
const MAX_TRACES_TO_INSPECT = 1000;
export async function getTraceSampleIds({
serviceName,
environment,
@ -163,7 +161,7 @@ export async function getTraceSampleIds({
uniq(
sortBy(traceIdsWithPriority, 'priority').map(({ traceId }) => traceId)
),
MAX_TRACES_TO_INSPECT
config.serviceMapMaxTraces
);
return { traceIds };

View file

@ -101,7 +101,7 @@ const serviceMapRoute = createApmServerRoute({
serviceName,
environment,
searchAggregatedTransactions,
logger,
logger: logger.get('serviceMap'),
start,
end,
maxNumberOfServices,

View file

@ -35,7 +35,7 @@ function getConnectionNodeId(node: ConnectionNode): string {
return node[SERVICE_NAME];
}
function getConnectionId(connection: Connection) {
export function getConnectionId(connection: Connection) {
return `${getConnectionNodeId(connection.source)}~${getConnectionNodeId(
connection.destination
)}`;