[8.x] [APM][Otel] Add Otel client based on PoC data (#192293) (#211541)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[APM][Otel] Add Otel client based on PoC data
(#192293)](https://github.com/elastic/kibana/pull/192293)

<!--- Backport version: 9.6.4 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT
[{"author":{"name":"jennypavlova","email":"dzheni.pavlova@elastic.co"},"sourceCommit":{"committedDate":"2024-10-14T12:33:22Z","message":"[APM][Otel]
Add Otel client based on PoC data (#192293)\n\nCloses
[#192115](https://github.com/elastic/kibana/issues/192115)\r\nCloses
[#192465](https://github.com/elastic/kibana/issues/192465)\r\n\r\n\r\n##
Summary\r\n\r\nThis PR adds synthrace client for Otel native data and a
simple\r\nscenario. This is the first step of adding it and in the
future it will\r\ninclude more metrics and use
cases.\r\n\r\n>[!NOTE]\r\n> To run ES the command needs
\"xpack.otel_data.registry.enabled=true\"\r\nflag\r\n> `yarn es snapshot
--license trial
--E\r\n\"xpack.otel_data.registry.enabled=true\"`\r\n\r\n## Next
steps\r\n- We currently have only `service_destination` in the metrics
indices we\r\ncan include the other types in the future\r\n- After we
have all the UI changes we can add more scenarios (also using\r\nthe
opentelemetry demo data and not only the e2e PoC example)\r\n\r\n##
Testing\r\n- Run ES: \r\n```bash \r\nyarn es snapshot --license trial
--E \"xpack.otel_data.registry.enabled=true\"\r\n```\r\n- Run
Kibana:\r\n```bash \r\nyarn start\r\n```\r\n\r\n>[!WARNING]\r\nIf the
e2e PoC is used the first 2 steps should be skipped\r\n\r\n- Run
syntrace: \r\n```bash\r\nnode scripts/synthtrace otel_simple_trace.ts
--clean\r\n```\r\n- Check indices in DevTools for the generated data:
\r\n```bash \r\nGET *metrics-generic.otel*/_search\r\n\r\nGET
*traces-generic.otel*/_search\r\n\r\nGET
*logs-generic.otel*/_search\r\n```\r\n- Check in the APM UI (all the
tabs) \r\n>[!WARNING]\r\nCurrently the UI changes done in APM are not
merged so some errors
are\r\nexpected)\r\n\r\n\r\nhttps://github.com/user-attachments/assets/92f63610-82da-40f3-89bb-00be83c55377\r\n\r\n---------\r\n\r\nCo-authored-by:
miriam.aparicio
<miriam.aparicio@gmail.com>","sha":"5067f1554cb5fc7f23442d5f9ab5d255e26a3b37","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","backport:skip","v9.0.0","OpenTelemetry","ci:project-deploy-observability","Team:obs-ux-infra_services"],"title":"[APM][Otel]
Add Otel client based on PoC
data","number":192293,"url":"https://github.com/elastic/kibana/pull/192293","mergeCommit":{"message":"[APM][Otel]
Add Otel client based on PoC data (#192293)\n\nCloses
[#192115](https://github.com/elastic/kibana/issues/192115)\r\nCloses
[#192465](https://github.com/elastic/kibana/issues/192465)\r\n\r\n\r\n##
Summary\r\n\r\nThis PR adds synthrace client for Otel native data and a
simple\r\nscenario. This is the first step of adding it and in the
future it will\r\ninclude more metrics and use
cases.\r\n\r\n>[!NOTE]\r\n> To run ES the command needs
\"xpack.otel_data.registry.enabled=true\"\r\nflag\r\n> `yarn es snapshot
--license trial
--E\r\n\"xpack.otel_data.registry.enabled=true\"`\r\n\r\n## Next
steps\r\n- We currently have only `service_destination` in the metrics
indices we\r\ncan include the other types in the future\r\n- After we
have all the UI changes we can add more scenarios (also using\r\nthe
opentelemetry demo data and not only the e2e PoC example)\r\n\r\n##
Testing\r\n- Run ES: \r\n```bash \r\nyarn es snapshot --license trial
--E \"xpack.otel_data.registry.enabled=true\"\r\n```\r\n- Run
Kibana:\r\n```bash \r\nyarn start\r\n```\r\n\r\n>[!WARNING]\r\nIf the
e2e PoC is used the first 2 steps should be skipped\r\n\r\n- Run
syntrace: \r\n```bash\r\nnode scripts/synthtrace otel_simple_trace.ts
--clean\r\n```\r\n- Check indices in DevTools for the generated data:
\r\n```bash \r\nGET *metrics-generic.otel*/_search\r\n\r\nGET
*traces-generic.otel*/_search\r\n\r\nGET
*logs-generic.otel*/_search\r\n```\r\n- Check in the APM UI (all the
tabs) \r\n>[!WARNING]\r\nCurrently the UI changes done in APM are not
merged so some errors
are\r\nexpected)\r\n\r\n\r\nhttps://github.com/user-attachments/assets/92f63610-82da-40f3-89bb-00be83c55377\r\n\r\n---------\r\n\r\nCo-authored-by:
miriam.aparicio
<miriam.aparicio@gmail.com>","sha":"5067f1554cb5fc7f23442d5f9ab5d255e26a3b37"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/192293","number":192293,"mergeCommit":{"message":"[APM][Otel]
Add Otel client based on PoC data (#192293)\n\nCloses
[#192115](https://github.com/elastic/kibana/issues/192115)\r\nCloses
[#192465](https://github.com/elastic/kibana/issues/192465)\r\n\r\n\r\n##
Summary\r\n\r\nThis PR adds synthrace client for Otel native data and a
simple\r\nscenario. This is the first step of adding it and in the
future it will\r\ninclude more metrics and use
cases.\r\n\r\n>[!NOTE]\r\n> To run ES the command needs
\"xpack.otel_data.registry.enabled=true\"\r\nflag\r\n> `yarn es snapshot
--license trial
--E\r\n\"xpack.otel_data.registry.enabled=true\"`\r\n\r\n## Next
steps\r\n- We currently have only `service_destination` in the metrics
indices we\r\ncan include the other types in the future\r\n- After we
have all the UI changes we can add more scenarios (also using\r\nthe
opentelemetry demo data and not only the e2e PoC example)\r\n\r\n##
Testing\r\n- Run ES: \r\n```bash \r\nyarn es snapshot --license trial
--E \"xpack.otel_data.registry.enabled=true\"\r\n```\r\n- Run
Kibana:\r\n```bash \r\nyarn start\r\n```\r\n\r\n>[!WARNING]\r\nIf the
e2e PoC is used the first 2 steps should be skipped\r\n\r\n- Run
syntrace: \r\n```bash\r\nnode scripts/synthtrace otel_simple_trace.ts
--clean\r\n```\r\n- Check indices in DevTools for the generated data:
\r\n```bash \r\nGET *metrics-generic.otel*/_search\r\n\r\nGET
*traces-generic.otel*/_search\r\n\r\nGET
*logs-generic.otel*/_search\r\n```\r\n- Check in the APM UI (all the
tabs) \r\n>[!WARNING]\r\nCurrently the UI changes done in APM are not
merged so some errors
are\r\nexpected)\r\n\r\n\r\nhttps://github.com/user-attachments/assets/92f63610-82da-40f3-89bb-00be83c55377\r\n\r\n---------\r\n\r\nCo-authored-by:
miriam.aparicio
<miriam.aparicio@gmail.com>","sha":"5067f1554cb5fc7f23442d5f9ab5d255e26a3b37"}}]}]
BACKPORT-->

Co-authored-by: jennypavlova <dzheni.pavlova@elastic.co>
This commit is contained in:
Sergi Romeu 2025-02-18 12:49:55 +01:00 committed by GitHub
parent 5d3fb74647
commit 398c1bf795
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 527 additions and 0 deletions

View file

@ -37,3 +37,4 @@ export type { ESDocumentWithOperation, SynthtraceESAction, SynthtraceGenerator }
export { log, type LogDocument, LONG_FIELD_NAME } from './src/lib/logs';
export { syntheticsMonitor, type SyntheticsMonitorDocument } from './src/lib/synthetics';
export { type EntityFields, entities } from './src/lib/entities';
export { otel, type OtelDocument } from './src/lib/otel';

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import type { OtelDocument } from '../../..';
import { Serializable } from '../serializable';
export interface OtelErrorDocument extends OtelDocument {
'event.name'?: string;
attributes?: {
'exception.message'?: string;
'error.stack_trace'?: string;
'exception.handled'?: boolean;
'exception.type'?: string;
'processor.event'?: string;
'timestamp.us'?: number;
'event.name'?: string;
'error.id'?: string;
};
}
export class OtelError extends Serializable<OtelErrorDocument> {
constructor(fields: OtelErrorDocument) {
super({
...fields,
});
}
}

View file

@ -0,0 +1,213 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Fields } from '../entity';
import { Serializable } from '../serializable';
import { OtelError } from './error';
import { OtelMetric } from './metric';
import { OtelTransaction } from './transaction';
interface OtelSharedResourceAttributes {
'service.name'?: string;
'agent.name'?: string;
'agent.version'?: string;
'metricset.interval'?: string;
'service.instance.id'?: string;
'telemetry.sdk.language'?: string;
'telemetry.sdk.name'?: string;
'telemetry.sdk.version'?: string;
'some.resource.attribute'?: string;
}
export interface OtelDocument extends Fields {
data_stream?: {
dataset: string;
namespace: string;
type: string;
};
attributes?: {
'timestamp.us'?: number;
'metricset.name'?: string;
[key: string]: any;
};
resource?: {
attributes?: OtelSharedResourceAttributes;
dropped_attributes_count?: number;
schema_url?: string;
};
scope?: {
attributes?: {
'service.framework.name'?: string;
'service.framework.version'?: string;
};
dropped_attributes_count?: number;
name?: string;
};
name?: string;
trace_id?: string;
trace?: { id: string };
span_id?: string;
span?: { id: string };
dropped_attributes_count?: number;
dropped_events_count?: number;
dropped_links_count?: number;
timestamp_us?: number;
}
class Otel extends Serializable<OtelDocument> {
constructor(fields: OtelDocument) {
super({
...fields,
});
}
error(spanId: string) {
return new OtelError({
...this.fields,
attributes: {
'exception.message': 'boom',
'exception.handled': false,
'exception.type': '*errors.errorString',
'error.stack_trace': 'Error: INTERNAL: Boom',
'processor.event': 'error',
'timestamp.us': 1726580752010657,
'event.name': 'exception',
'error.id': `error-${spanId}`,
},
data_stream: {
dataset: 'generic.otel',
namespace: 'default',
type: 'logs',
},
'event.name': 'exception',
dropped_attributes_count: 0,
resource: {
attributes: {
'agent.name': 'opentelemetry/go',
'agent.version': '1.28.0',
'service.name': 'sendotlp-synth',
'service.instance.id': '89117ac1-0dbf-4488-9e17-4c2c3b76943a',
},
dropped_attributes_count: 0,
schema_url: 'https://opentelemetry.io/schemas/1.26.0',
},
scope: {
attributes: {
'service.framework.name': 'sendotlp-synth',
'service.framework.version': '',
},
dropped_attributes_count: 0,
name: 'sendotlp-synth',
},
span_id: spanId,
});
}
metric() {
return new OtelMetric({
...this.fields,
attributes: {
'metricset.name': 'service_destination',
'processor.event': 'metric',
'event.outcome': 'success',
'service.target.name': 'foo_service',
'service.target.type': 'http',
'span.name': 'child1',
'span.destination.service.resource': 'foo_service:8080',
},
data_stream: {
dataset: 'service_destination.10m.otel',
namespace: 'default',
type: 'metrics',
},
metrics: {
service_summary: 2,
},
resource: {
attributes: {
'agent.name': 'otlp',
'agent.version': '1.28.0',
'service.instance.id': '89117ac1-0dbf-4488-9e17-4c2c3b76943a',
'service.name': 'sendotlp-synth',
'metricset.interval': '10m',
},
dropped_attributes_count: 0,
},
scope: {
dropped_attributes_count: 0,
name: 'github.com/elastic/opentelemetry-collector-components/connector/spanmetricsconnectorv2',
},
});
}
// In Otel we have only spans (https://opentelemetry.io/docs/concepts/signals/traces/#spans)
// we call the root span a transaction to match our data model
transaction(id: string) {
return new OtelTransaction({
...this.fields,
attributes: {
'event.outcome': 'success',
'event.success_count': 1,
'processor.event': 'transaction',
'timestamp.us': 1726580752010657,
'transaction.duration.us': 15202,
'transaction.id': id,
'transaction.name': 'parent-synth',
'transaction.representative_count': 1,
'transaction.result': 'HTTP 2xx',
'transaction.root': true,
'transaction.sampled': true,
'transaction.type': 'unknown',
},
data_stream: {
dataset: 'generic.otel',
namespace: 'default',
type: 'traces',
},
duration: 11742370,
kind: 'Internal',
name: 'parent-synth',
resource: {
attributes: {
'agent.name': 'otlp',
'agent.version': '1.28.0',
'service.instance.id': '89117ac1-0dbf-4488-9e17-4c2c3b76943a',
'service.name': 'sendotlp-synth',
},
dropped_attributes_count: 0,
schema_url: 'https://opentelemetry.io/schemas/1.26.0',
},
scope: {
attributes: {
'service.framework.name': 'sendotlp-synth',
'service.framework.version': '',
},
dropped_attributes_count: 0,
name: 'sendotlp-synth',
},
span_id: id,
status: {
code: 'Unset',
},
});
}
}
export function create(id: string): Otel {
return new Otel({
trace_id: id,
dropped_attributes_count: 0,
dropped_events_count: 0,
dropped_links_count: 0,
});
}
export const otel = {
create,
};

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { OtelDocument } from '.';
import { Serializable } from '../serializable';
export interface OtelMetricDocument extends OtelDocument {
attributes?: {
'metricset.name'?: string;
'processor.event'?: string;
'event.outcome'?: string;
'service.target.name'?: string;
'service.target.type'?: string;
'span.name'?: string;
'span.destination.service.resource'?: string;
};
metrics?: {
service_summary?: number;
};
}
export class OtelMetric extends Serializable<OtelMetricDocument> {
constructor(fields: OtelMetricDocument) {
super({
...fields,
});
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { OtelDocument } from '.';
import { Serializable } from '../serializable';
export interface OtelTransactionDocument extends OtelDocument {
attributes?: {
'event.outcome'?: string;
'event.success_count'?: number;
'processor.event'?: string;
'timestamp.us'?: number;
'transaction.duration.us'?: number;
'transaction.id'?: string;
'transaction.name'?: string;
'transaction.representative_count'?: number;
'transaction.result'?: string;
'transaction.root'?: boolean;
'transaction.sampled'?: boolean;
'transaction.type'?: string;
};
status?: {
code?: string;
};
dropped_events_count?: number;
dropped_links_count?: number;
duration?: number;
kind?: string;
name?: string;
}
export class OtelTransaction extends Serializable<OtelTransactionDocument> {
constructor(fields: OtelTransactionDocument) {
super({
...fields,
});
}
}

View file

@ -18,6 +18,7 @@ export { LogsSynthtraceEsClient } from './src/lib/logs/logs_synthtrace_es_client
export { EntitiesSynthtraceEsClient } from './src/lib/entities/entities_synthtrace_es_client';
export { EntitiesSynthtraceKibanaClient } from './src/lib/entities/entities_synthtrace_kibana_client';
export { SyntheticsSynthtraceEsClient } from './src/lib/synthetics/synthetics_synthtrace_es_client';
export { OtelSynthtraceEsClient } from './src/lib/otel/otel_synthtrace_es_client';
export {
addObserverVersionTransform,
deleteSummaryFieldTransform,

View file

@ -13,6 +13,7 @@ import {
InfraSynthtraceEsClient,
LogsSynthtraceEsClient,
SyntheticsSynthtraceEsClient,
OtelSynthtraceEsClient,
EntitiesSynthtraceEsClient,
} from '../..';
import { Logger } from '../lib/utils/create_logger';
@ -25,6 +26,7 @@ interface EsClients {
logsEsClient: LogsSynthtraceEsClient;
infraEsClient: InfraSynthtraceEsClient;
syntheticsEsClient: SyntheticsSynthtraceEsClient;
otelEsClient: OtelSynthtraceEsClient;
entitiesEsClient: EntitiesSynthtraceEsClient;
}

View file

@ -15,6 +15,7 @@ import { getKibanaClient } from './get_kibana_client';
import { getServiceUrls } from './get_service_urls';
import { RunOptions } from './parse_run_cli_flags';
import { getSyntheticsEsClient } from './get_synthetics_es_client';
import { getOtelSynthtraceEsClient } from './get_otel_es_client';
import { getEntitiesEsClient } from './get_entities_es_client';
import { getEntitiesKibanaClient } from './get_entites_kibana_client';
@ -74,6 +75,11 @@ export async function bootstrap(runOptions: RunOptions) {
logger,
concurrency: runOptions.concurrency,
});
const otelEsClient = getOtelSynthtraceEsClient({
target: esUrl,
logger,
concurrency: runOptions.concurrency,
});
if (runOptions.clean) {
await apmEsClient.clean();
@ -81,6 +87,7 @@ export async function bootstrap(runOptions: RunOptions) {
await infraEsClient.clean();
await entitiesEsClient.clean();
await syntheticsEsClient.clean();
await otelEsClient.clean();
}
return {
@ -90,6 +97,7 @@ export async function bootstrap(runOptions: RunOptions) {
infraEsClient,
entitiesEsClient,
syntheticsEsClient,
otelEsClient,
version,
kibanaUrl,
esUrl,

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Client } from '@elastic/elasticsearch';
import { Logger } from '../../lib/utils/create_logger';
import { RunOptions } from './parse_run_cli_flags';
import { getEsClientTlsSettings } from './ssl';
import { OtelSynthtraceEsClient } from '../../lib/otel/otel_synthtrace_es_client';
export function getOtelSynthtraceEsClient({
target,
logger,
concurrency,
}: Pick<RunOptions, 'concurrency'> & {
target: string;
logger: Logger;
}) {
const client = new Client({
node: target,
tls: getEsClientTlsSettings(target),
});
return new OtelSynthtraceEsClient({
client,
logger,
concurrency,
});
}

View file

@ -32,6 +32,7 @@ export async function startLiveDataUpload({
logsEsClient,
infraEsClient,
syntheticsEsClient,
otelEsClient,
entitiesEsClient,
entitiesKibanaClient,
} = await bootstrap(runOptions);
@ -48,6 +49,7 @@ export async function startLiveDataUpload({
apmEsClient,
logsEsClient,
infraEsClient,
otelEsClient,
syntheticsEsClient,
entitiesEsClient,
entitiesKibanaClient,
@ -72,6 +74,7 @@ export async function startLiveDataUpload({
apmEsClient,
logsEsClient,
infraEsClient,
otelEsClient,
syntheticsEsClient,
entitiesEsClient,
entitiesKibanaClient,
@ -108,6 +111,7 @@ export async function startLiveDataUpload({
infraEsClient,
entitiesEsClient,
syntheticsEsClient,
otelEsClient,
},
});

View file

@ -16,6 +16,7 @@ import { getEntitiesKibanaClient } from './get_entites_kibana_client';
import { getEntitiesEsClient } from './get_entities_es_client';
import { getInfraEsClient } from './get_infra_es_client';
import { getLogsEsClient } from './get_logs_es_client';
import { getOtelSynthtraceEsClient } from './get_otel_es_client';
import { getScenario } from './get_scenario';
import { getSyntheticsEsClient } from './get_synthetics_es_client';
import { loggerProxy } from './logger_proxy';
@ -71,6 +72,12 @@ async function start() {
logger,
});
const otelEsClient = getOtelSynthtraceEsClient({
concurrency: runOptions.concurrency,
target: esUrl,
logger,
});
const file = runOptions.file;
const scenario = await logger.perf('get_scenario', () => getScenario({ file, logger }));
@ -85,6 +92,7 @@ async function start() {
logsEsClient,
infraEsClient,
syntheticsEsClient,
otelEsClient,
entitiesEsClient,
entitiesKibanaClient,
});
@ -101,6 +109,7 @@ async function start() {
infraEsClient,
entitiesEsClient,
syntheticsEsClient,
otelEsClient,
},
})
);
@ -140,6 +149,7 @@ async function start() {
logsEsClient,
infraEsClient,
syntheticsEsClient,
otelEsClient,
entitiesEsClient,
entitiesKibanaClient,
});

View file

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Client } from '@elastic/elasticsearch';
import { ESDocumentWithOperation } from '@kbn/apm-synthtrace-client';
import { OtelDocument } from '@kbn/apm-synthtrace-client';
import { pipeline, Readable, Transform } from 'stream';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getDedotTransform } from '../shared/get_dedot_transform';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';
export type OtelSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;
export class OtelSynthtraceEsClient extends SynthtraceEsClient<OtelDocument> {
constructor(options: { client: Client; logger: Logger } & OtelSynthtraceEsClientOptions) {
super({
...options,
pipeline: otelPipeline(),
});
this.dataStreams = ['metrics-generic.otel*', 'traces-generic.otel*', 'logs-generic.otel*'];
}
}
function otelPipeline() {
return (base: Readable) => {
return pipeline(
base,
getSerializeTransform(),
getRoutingTransform(),
getDedotTransform(),
(err: unknown) => {
if (err) {
throw err;
}
}
);
};
}
export function getRoutingTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<OtelDocument>, encoding, callback) {
const namespace = 'default';
let index: string | undefined;
switch (document?.attributes?.['processor.event']) {
case 'transaction':
case 'span':
index = `traces-generic.otel-${namespace}-synth`;
break;
case 'error':
index = `logs-generic.otel-${namespace}-synth`;
break;
case 'metric':
const metricsetName = document?.attributes?.['metricset.name'];
if (
metricsetName === 'transaction' ||
metricsetName === 'service_transaction' ||
metricsetName === 'service_destination' ||
metricsetName === 'service_summary'
) {
index = `metrics-generic.otel.${metricsetName}.${document.attributes[
'metricset.interval'
]!}-${namespace}-synth`;
} else {
index = `metrics-generic.otel.internal-${namespace}-synth`;
}
break;
default:
if (document?.attributes?.['event.action'] != null) {
index = `logs-generic.otel-${namespace}-synth`;
}
break;
}
if (!index) {
const error = new Error('Cannot determine index for event');
Object.assign(error, { document });
}
document._index = index;
callback(null, document);
},
});
}

View file

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { otel, generateShortId, OtelDocument } from '@kbn/apm-synthtrace-client';
import { times } from 'lodash';
import { Scenario } from '../cli/scenario';
import { withClient } from '../lib/utils/with_client';
const scenario: Scenario<OtelDocument> = async (runOptions) => {
return {
generate: ({ range, clients: { otelEsClient } }) => {
const { numOtelTraces = 5 } = runOptions.scenarioOpts || {};
const { logger } = runOptions;
const traceId = generateShortId();
const spanId = generateShortId();
const otelDocs = times(numOtelTraces / 2).map((index) => otel.create(traceId));
const otelWithMetricsAndErrors = range
.interval('30s')
.rate(1)
.generator((timestamp) =>
otelDocs.flatMap((oteld) => {
return [
oteld.metric().timestamp(timestamp),
oteld.transaction(spanId).timestamp(timestamp),
oteld.error(spanId).timestamp(timestamp),
];
})
);
return [
withClient(
otelEsClient,
logger.perf('generating_otel_trace', () => otelWithMetricsAndErrors)
),
];
},
};
};
export default scenario;