mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Synthtrace] Synthtrace to generate unique ids (#214637)
closes [214636](https://github.com/elastic/kibana/issues/214636) ## Summary This PR updates the id generation logic in Synthtrace to ensure the uniqueness of generated ids. The change addresses the issue of ID collisions when Synthtrace is executed on multiple pods in parallel, as observed in the edge clusters. This change will affect APM traces, but the idea could be extended to other scenarios requiring unique/random ids in the test environments. ### Performance | Length | Sequential id generator | Random id generator | | --| -----------------------| ----------------------| | 16 chars | ~0.005ms | ~ 0.007ms | | 32 chars | ~0.005ms | ~ 0.007ms | ### How to test Execute this query after running synthtrace. ```bash curl -X GET "http://elastic:changeme@localhost:9200/apm*,traces-apm*/_search" -H "Content-Type: application/json" -d '{ "size": 1, "query": { "bool": { "filter": [ { "terms": { "processor.event": [ "transaction", "span" ] } } ] } }, "_source": [ "span.id", "transaction.id", "trace.id", "error.id" ] }' | jq '.hits.hits[]._source' ``` **Historical data** *Sequential ids* ```bash node scripts/synthtrace service_map_oom.ts --from=now-5m to=now --clean --workers=1 ``` ```json { "trace": { "id": "56956000000000000000000000281715" }, "transaction": { "id": "5695600000281714" } } ``` *Random ids* ```bash node scripts/synthtrace service_map_oom.ts --from=now-5m to=now --clean --workers=1 --uniqueIds ``` ```json { "trace": { "id": "9dd787e4c55948000000000000081916" }, "transaction": { "id": "9dd709a2f7979800" } } ``` **Live data** *Sequential ids* ```bash node scripts/synthtrace service_map_oom.ts --live --clean ``` ```json { "trace": { "id": "58384000000000000000000000001995" }, "transaction": { "id": "5838400000001994" } } ``` *Random ids* ```bash node scripts/synthtrace service_map_oom.ts --live --clean --uniqueIds ``` ```json { "trace": { "id": "3d8b3401711004000000000000001996" }, "transaction": { "id": "3d834f49b9bb0c00" } } ```
This commit is contained in:
parent
cd491c34e7
commit
2f453ac2b7
17 changed files with 129 additions and 20 deletions
|
@ -31,7 +31,11 @@ export { Serializable } from './src/lib/serializable';
|
|||
export { timerange } from './src/lib/timerange';
|
||||
export type { Timerange } from './src/lib/timerange';
|
||||
export { dedot } from './src/lib/utils/dedot';
|
||||
export { generateLongId, generateShortId } from './src/lib/utils/generate_id';
|
||||
export {
|
||||
generateLongId,
|
||||
generateShortId,
|
||||
setIdGeneratorStrategy,
|
||||
} from './src/lib/utils/generate_id';
|
||||
export { appendHash, hashKeysOf } from './src/lib/utils/hash';
|
||||
export type { ESDocumentWithOperation, SynthtraceESAction, SynthtraceGenerator } from './src/types';
|
||||
export { log, type LogDocument, LONG_FIELD_NAME } from './src/lib/logs';
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import {
|
||||
setIdGeneratorStrategy,
|
||||
generateLongId,
|
||||
generateShortId,
|
||||
generateLongIdWithSeed,
|
||||
} from './generate_id';
|
||||
|
||||
describe('generate_id', () => {
|
||||
describe('generate deterministic ids', () => {
|
||||
it('should generate a short id of the correct length and format', () => {
|
||||
setIdGeneratorStrategy('sequential');
|
||||
const shortId = generateShortId();
|
||||
expect(shortId.length).toBe(16);
|
||||
expect(shortId).toContain('00000000000');
|
||||
});
|
||||
|
||||
it('should generate a long id of the correct length and format', () => {
|
||||
setIdGeneratorStrategy('sequential');
|
||||
const longId = generateLongId();
|
||||
expect(longId.length).toBe(32);
|
||||
expect(longId).toContain('000000000000000000000000001');
|
||||
});
|
||||
|
||||
it('should generate a long id with a seed and correct padding', () => {
|
||||
const seed = 'order/123';
|
||||
const longIdWithSeed = generateLongIdWithSeed(seed);
|
||||
expect(longIdWithSeed.length).toBe(32);
|
||||
expect(longIdWithSeed).toEqual('00000000000000000000000order_123');
|
||||
});
|
||||
});
|
||||
|
||||
describe('generate random ids', () => {
|
||||
it('should generate a unique id of correct length and valid format', () => {
|
||||
setIdGeneratorStrategy('random');
|
||||
const shortId = generateShortId();
|
||||
|
||||
expect(shortId.length).toBe(16);
|
||||
|
||||
expect(shortId).toMatch(/^(?!\d{14})[0-9a-fA-F]{14}\d{2}$/);
|
||||
});
|
||||
|
||||
it('should generate a long id of the correct length and format', () => {
|
||||
setIdGeneratorStrategy('random');
|
||||
const longId = generateLongId();
|
||||
|
||||
expect(longId.length).toBe(32);
|
||||
expect(longId).toMatch(/^(?!\d{14})[0-9a-fA-F]{14}\d{18}$/);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -7,13 +7,18 @@
|
|||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
type IdGeneratorStrategyType = 'random' | 'sequential';
|
||||
|
||||
let seq = 0;
|
||||
let idGeneratorStategy: IdGeneratorStrategyType = 'sequential';
|
||||
|
||||
const pid = String(process.pid);
|
||||
|
||||
const LONG_ID_LENGTH = 32;
|
||||
const SHORT_ID_LENGTH = 16;
|
||||
const UNIQUE_ID_MAX_SEQ = 10_000_000;
|
||||
|
||||
function generateId(length: number = LONG_ID_LENGTH) {
|
||||
function generateSequentialId(length: number = LONG_ID_LENGTH) {
|
||||
const id = String(seq++);
|
||||
const generatedId = pid + id.padStart(length - pid.length, '0');
|
||||
if (generatedId.length > length) {
|
||||
|
@ -23,6 +28,24 @@ function generateId(length: number = LONG_ID_LENGTH) {
|
|||
return generatedId;
|
||||
}
|
||||
|
||||
function generateRandomId(length: number = LONG_ID_LENGTH) {
|
||||
seq = (seq + 1) % UNIQUE_ID_MAX_SEQ;
|
||||
|
||||
const randomFactor = Math.floor(Math.random() * UNIQUE_ID_MAX_SEQ);
|
||||
const randomHex = (randomFactor * Date.now() * seq).toString(16);
|
||||
|
||||
const generatedId =
|
||||
randomHex.length < length
|
||||
? `${randomHex}${String(seq).padStart(length - randomHex.length, '0')}`
|
||||
: randomHex.substring(0, length);
|
||||
|
||||
if (generatedId.length > length) {
|
||||
throw new Error(`generated id is longer than ${length} characters: ${generatedId.length}`);
|
||||
}
|
||||
|
||||
return generatedId;
|
||||
}
|
||||
|
||||
function generateIdWithSeed(seed: string, length: number = LONG_ID_LENGTH) {
|
||||
// this is needed to sanitize errors like "No handler for /order/{id}",
|
||||
// as encodeURIComponent is not enough and can cause errors in the client
|
||||
|
@ -31,13 +54,21 @@ function generateIdWithSeed(seed: string, length: number = LONG_ID_LENGTH) {
|
|||
}
|
||||
|
||||
export function generateShortId() {
|
||||
return generateId(SHORT_ID_LENGTH);
|
||||
return idGeneratorStategy === 'random'
|
||||
? generateRandomId(SHORT_ID_LENGTH)
|
||||
: generateSequentialId(SHORT_ID_LENGTH);
|
||||
}
|
||||
|
||||
export function generateLongId() {
|
||||
return generateId(LONG_ID_LENGTH);
|
||||
return idGeneratorStategy === 'random'
|
||||
? generateRandomId(LONG_ID_LENGTH)
|
||||
: generateSequentialId(LONG_ID_LENGTH);
|
||||
}
|
||||
|
||||
export function generateLongIdWithSeed(seed: string) {
|
||||
return generateIdWithSeed(seed, LONG_ID_LENGTH);
|
||||
}
|
||||
|
||||
export const setIdGeneratorStrategy = (strategy: IdGeneratorStrategyType) => {
|
||||
idGeneratorStategy = strategy;
|
||||
};
|
||||
|
|
|
@ -48,6 +48,10 @@ function options(y: Argv) {
|
|||
description: 'Generate and index data continuously',
|
||||
boolean: true,
|
||||
})
|
||||
.option('uniqueIds', {
|
||||
description: 'Generate unique ids to avoid id collisions',
|
||||
boolean: true,
|
||||
})
|
||||
.option('liveBucketSize', {
|
||||
description: 'Bucket size in ms for live streaming',
|
||||
default: 1000,
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
*/
|
||||
|
||||
import { Client, HttpConnection } from '@elastic/elasticsearch';
|
||||
import { setIdGeneratorStrategy } from '@kbn/apm-synthtrace-client';
|
||||
import { createLogger } from '../../lib/utils/create_logger';
|
||||
import { getClients } from './get_clients';
|
||||
import { getKibanaClient } from './get_kibana_client';
|
||||
|
@ -20,6 +21,7 @@ export async function bootstrap({
|
|||
...runOptions
|
||||
}: RunOptions & { skipClientBootstrap?: boolean }) {
|
||||
const logger = createLogger(runOptions.logLevel);
|
||||
setIdGeneratorStrategy(runOptions.uniqueIds ? 'random' : 'sequential');
|
||||
|
||||
const { kibanaUrl, esUrl } = await getServiceUrls({ ...runOptions, logger });
|
||||
|
||||
|
|
|
@ -73,7 +73,8 @@ export async function indexHistoricalData({
|
|||
generatorsAndClients.map(async ({ client, generator }) => {
|
||||
await streamManager.index(client, generator);
|
||||
})
|
||||
).finally(() => {
|
||||
).finally(async () => {
|
||||
await streamManager.teardown();
|
||||
clearInterval(intervalId);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -76,7 +76,8 @@ export function parseRunCliFlags(flags: RunCliFlags) {
|
|||
'versionOverride',
|
||||
'clean',
|
||||
'assume-package-version',
|
||||
'liveBucketSize'
|
||||
'liveBucketSize',
|
||||
'uniqueIds'
|
||||
),
|
||||
scenarioOpts: flags.scenarioOpts as unknown as Record<string, any>,
|
||||
logLevel: parsedLogLevel,
|
||||
|
|
|
@ -57,10 +57,14 @@ export class StreamManager {
|
|||
private readonly logger: ToolingLog,
|
||||
private readonly teardownCallback: () => Promise<void> = asyncNoop
|
||||
) {
|
||||
attach(this.logger, () => this.teardown());
|
||||
attach(this.logger, () => {
|
||||
this.logger.info('Tearing down after kill signal');
|
||||
return this.teardown();
|
||||
});
|
||||
|
||||
parentPort?.on('message', (message) => {
|
||||
if (message === 'shutdown') {
|
||||
this.logger.info('Tearing down worker after shutdown message');
|
||||
this.teardown()
|
||||
.then(() => {
|
||||
process.exit(0);
|
||||
|
@ -136,8 +140,6 @@ export class StreamManager {
|
|||
process.exit(1);
|
||||
});
|
||||
|
||||
this.logger.info('Tearing down after kill signal');
|
||||
|
||||
// end all streams and listen until they've
|
||||
// completed
|
||||
function endStream(stream: Writable) {
|
||||
|
|
|
@ -50,7 +50,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
|
|||
const cloudRegion = getCloudRegion(index);
|
||||
|
||||
const commonLongEntryFields: LogDocument = {
|
||||
'trace.id': generateShortId(),
|
||||
'trace.id': generateLongId(),
|
||||
'agent.name': 'synth-agent',
|
||||
'orchestrator.cluster.name': clusterName,
|
||||
'orchestrator.cluster.id': clusterId,
|
||||
|
|
|
@ -91,7 +91,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
|
|||
const cloudRegion = getCloudRegion(index);
|
||||
|
||||
const commonLongEntryFields: LogDocument = {
|
||||
'trace.id': generateShortId(),
|
||||
'trace.id': generateLongId(),
|
||||
'agent.name': 'synth-agent',
|
||||
'orchestrator.cluster.name': clusterName,
|
||||
'orchestrator.cluster.id': clusterId,
|
||||
|
|
|
@ -65,7 +65,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
|
|||
.logLevel(MESSAGE_LOG_LEVELS[index].level)
|
||||
.service(SERVICE_NAMES[index])
|
||||
.defaults({
|
||||
'trace.id': generateShortId(),
|
||||
'trace.id': generateLongId(),
|
||||
'agent.name': 'synth-agent',
|
||||
'orchestrator.cluster.name': clusterName,
|
||||
'orchestrator.cluster.id': clusterId,
|
||||
|
|
|
@ -7,7 +7,12 @@
|
|||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import { generateShortId, OtelEdotDocument, otelEdot } from '@kbn/apm-synthtrace-client';
|
||||
import {
|
||||
generateShortId,
|
||||
OtelEdotDocument,
|
||||
otelEdot,
|
||||
generateLongId,
|
||||
} from '@kbn/apm-synthtrace-client';
|
||||
import { times } from 'lodash';
|
||||
import { Scenario } from '../cli/scenario';
|
||||
import { withClient } from '../lib/utils/with_client';
|
||||
|
@ -17,7 +22,7 @@ const scenario: Scenario<OtelEdotDocument> = async (runOptions) => {
|
|||
generate: ({ range, clients: { otelEsClient } }) => {
|
||||
const { numOtelTraces = 5 } = runOptions.scenarioOpts || {};
|
||||
const { logger } = runOptions;
|
||||
const traceId = generateShortId();
|
||||
const traceId = generateLongId();
|
||||
const spanId = generateShortId();
|
||||
|
||||
const otelEdotDocs = times(numOtelTraces / 2).map((index) => otelEdot.create(traceId));
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import { otel, generateShortId, OtelDocument } from '@kbn/apm-synthtrace-client';
|
||||
import { otel, generateShortId, generateLongId, OtelDocument } from '@kbn/apm-synthtrace-client';
|
||||
import { times } from 'lodash';
|
||||
import { Scenario } from '../cli/scenario';
|
||||
import { withClient } from '../lib/utils/with_client';
|
||||
|
@ -17,7 +17,7 @@ const scenario: Scenario<OtelDocument> = async (runOptions) => {
|
|||
generate: ({ range, clients: { otelEsClient } }) => {
|
||||
const { numOtelTraces = 5 } = runOptions.scenarioOpts || {};
|
||||
const { logger } = runOptions;
|
||||
const traceId = generateShortId();
|
||||
const traceId = generateLongId();
|
||||
const spanId = generateShortId();
|
||||
|
||||
const otelDocs = times(numOtelTraces / 2).map((index) => otel.create(traceId));
|
||||
|
|
|
@ -42,7 +42,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
|
|||
const cloudRegion = getCloudRegion(index);
|
||||
|
||||
const commonLongEntryFields: LogDocument = {
|
||||
'trace.id': generateShortId(),
|
||||
'trace.id': generateLongId(),
|
||||
'agent.name': getAgentName(),
|
||||
'orchestrator.cluster.name': clusterName,
|
||||
'orchestrator.cluster.id': clusterId,
|
||||
|
|
|
@ -45,7 +45,7 @@ const scenario: Scenario<LogDocument> = async (runOptions) => {
|
|||
const cloudRegion = getCloudRegion(index);
|
||||
|
||||
const commonLongEntryFields: LogDocument = {
|
||||
'trace.id': generateShortId(),
|
||||
'trace.id': generateLongId(),
|
||||
'agent.name': 'nodejs',
|
||||
'orchestrator.cluster.name': clusterName,
|
||||
'orchestrator.cluster.id': clusterId,
|
||||
|
|
|
@ -20,7 +20,7 @@ function generateExternalSpanLinks() {
|
|||
// randomly creates external span links 0 - 10
|
||||
return Array(Math.floor(Math.random() * 11))
|
||||
.fill(0)
|
||||
.map(() => ({ span: { id: generateLongId() }, trace: { id: generateShortId() } }));
|
||||
.map(() => ({ span: { id: generateShortId() }, trace: { id: generateLongId() } }));
|
||||
}
|
||||
|
||||
function getSpanLinksFromEvents(events: ApmFields[]) {
|
||||
|
|
|
@ -79,7 +79,7 @@ const scenario: Scenario<LogDocument | ApmFields> = async ({ logger, ...runOptio
|
|||
.containerId(containerId)
|
||||
.hostName(hostName)
|
||||
.defaults({
|
||||
'trace.id': generateShortId(),
|
||||
'trace.id': generateLongId(),
|
||||
'agent.name': 'synth-agent',
|
||||
'orchestrator.cluster.name': clusterName,
|
||||
'orchestrator.cluster.id': clusterId,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue