APM Synthtrace: Introduce new DSL using generators (#123454)

Co-authored-by: Martijn Laarman <ml@elastic.co>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Martijn Laarman 2022-03-08 12:18:15 +01:00 committed by GitHub
parent 813ba8554f
commit dd4652b33a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
74 changed files with 1285 additions and 1059 deletions

View file

@ -32,6 +32,7 @@ RUNTIME_DEPS = [
"@npm//object-hash",
"@npm//p-limit",
"@npm//yargs",
"@npm//node-fetch",
]
TYPES_DEPS = [
@ -43,6 +44,7 @@ TYPES_DEPS = [
"@npm//@types/object-hash",
"@npm//moment",
"@npm//p-limit",
"@npm//@types/node-fetch",
]
jsts_transpiler(

View file

@ -14,3 +14,4 @@ export { createLogger, LogLevel } from './lib/utils/create_logger';
export type { Fields } from './lib/entity';
export type { ApmException, ApmSynthtraceEsClient } from './lib/apm';
export type { SpanIterable } from './lib/span_iterable';

View file

@ -31,9 +31,14 @@ export type ApmUserAgentFields = Partial<{
export interface ApmException {
message: string;
}
export interface Observer {
version: string;
version_major: number;
}
export type ApmFields = Fields &
Partial<{
'timestamp.us'?: number;
'agent.name': string;
'agent.version': string;
'container.id': string;
@ -47,8 +52,7 @@ export type ApmFields = Fields &
'host.name': string;
'kubernetes.pod.uid': string;
'metricset.name': string;
'observer.version': string;
'observer.version_major': number;
observer: Observer;
'parent.id': string;
'processor.event': string;
'processor.name': string;

View file

@ -24,7 +24,7 @@ export class BaseSpan extends Serializable<ApmFields> {
});
}
parent(span: BaseSpan) {
parent(span: BaseSpan): this {
this.fields['trace.id'] = span.fields['trace.id'];
this.fields['parent.id'] = span.isSpan()
? span.fields['span.id']
@ -40,7 +40,7 @@ export class BaseSpan extends Serializable<ApmFields> {
return this;
}
children(...children: BaseSpan[]) {
children(...children: BaseSpan[]): this {
children.forEach((child) => {
child.parent(this);
});
@ -50,17 +50,17 @@ export class BaseSpan extends Serializable<ApmFields> {
return this;
}
success() {
success(): this {
this.fields['event.outcome'] = 'success';
return this;
}
failure() {
failure(): this {
this.fields['event.outcome'] = 'failure';
return this;
}
outcome(outcome: 'success' | 'failure' | 'unknown') {
outcome(outcome: 'success' | 'failure' | 'unknown'): this {
this.fields['event.outcome'] = outcome;
return this;
}

View file

@ -7,56 +7,115 @@
*/
import { Client } from '@elastic/elasticsearch';
import { uploadEvents } from '../../../scripts/utils/upload_events';
import { Fields } from '../../entity';
import { cleanWriteTargets } from '../../utils/clean_write_targets';
import { getBreakdownMetrics } from '../utils/get_breakdown_metrics';
import { getSpanDestinationMetrics } from '../utils/get_span_destination_metrics';
import { getTransactionMetrics } from '../utils/get_transaction_metrics';
import { getApmWriteTargets } from '../utils/get_apm_write_targets';
import { Logger } from '../../utils/create_logger';
import { apmEventsToElasticsearchOutput } from '../utils/apm_events_to_elasticsearch_output';
import { ApmFields } from '../apm_fields';
import { SpanIterable } from '../../span_iterable';
import { StreamProcessor } from '../../stream_processor';
import { SpanGeneratorsUnion } from '../../span_generators_union';
export interface StreamToBulkOptions {
concurrency?: number;
maxDocs?: number;
mapToIndex?: (document: Record<string, any>) => string;
}
export class ApmSynthtraceEsClient {
constructor(private readonly client: Client, private readonly logger: Logger) {}
constructor(
private readonly client: Client,
private readonly logger: Logger,
private readonly forceDataStreams: boolean
) {}
private getWriteTargets() {
return getApmWriteTargets({ client: this.client });
return getApmWriteTargets({ client: this.client, forceDataStreams: this.forceDataStreams });
}
clean() {
return this.getWriteTargets().then((writeTargets) =>
cleanWriteTargets({
return this.getWriteTargets().then(async (writeTargets) => {
const indices = Object.values(writeTargets);
this.logger.info(`Attempting to clean: ${indices}`);
if (this.forceDataStreams) {
for (const name of indices) {
const dataStream = await this.client.indices.getDataStream({ name }, { ignore: [404] });
if (dataStream.data_streams && dataStream.data_streams.length > 0) {
this.logger.debug(`Deleting datastream: ${name}`);
await this.client.indices.deleteDataStream({ name });
}
}
return;
}
return cleanWriteTargets({
client: this.client,
targets: Object.values(writeTargets),
targets: indices,
logger: this.logger,
})
);
});
});
}
async index(events: Fields[]) {
async updateComponentTemplates(numberOfPrimaryShards: number) {
const response = await this.client.cluster.getComponentTemplate({ name: '*apm*@custom' });
for (const componentTemplate of response.component_templates) {
if (componentTemplate.component_template._meta?.package?.name !== 'apm') continue;
componentTemplate.component_template.template.settings = {
index: {
number_of_shards: numberOfPrimaryShards,
},
};
const putTemplate = await this.client.cluster.putComponentTemplate({
name: componentTemplate.name,
...componentTemplate.component_template,
});
this.logger.info(
`- updated component template ${componentTemplate.name}, acknowledged: ${putTemplate.acknowledged}`
);
}
}
async index(events: SpanIterable | SpanIterable[], options?: StreamToBulkOptions) {
const dataStream = Array.isArray(events) ? new SpanGeneratorsUnion(events) : events;
const writeTargets = await this.getWriteTargets();
const eventsToIndex = apmEventsToElasticsearchOutput({
events: [
...events,
...getTransactionMetrics(events),
...getSpanDestinationMetrics(events),
...getBreakdownMetrics(events),
],
writeTargets,
// TODO logger.perf
await this.client.helpers.bulk<ApmFields>({
concurrency: options?.concurrency ?? 10,
refresh: false,
refreshOnCompletion: false,
datasource: new StreamProcessor({
processors: StreamProcessor.apmProcessors,
maxSourceEvents: options?.maxDocs,
logger: this.logger,
})
// TODO https://github.com/elastic/elasticsearch-js/issues/1610
// having to map here is awkward, it'd be better to map just before serialization.
.streamToDocumentAsync(StreamProcessor.toDocument, dataStream),
onDrop: (doc) => {
this.logger.info(doc);
},
// TODO bug in client not passing generic to BulkHelperOptions<>
// https://github.com/elastic/elasticsearch-js/issues/1611
onDocument: (doc: unknown) => {
const d = doc as Record<string, any>;
const index = options?.mapToIndex
? options?.mapToIndex(d)
: this.forceDataStreams
? StreamProcessor.getDataStreamForEvent(d, writeTargets)
: StreamProcessor.getIndexForEvent(d, writeTargets);
return { create: { _index: index } };
},
});
await uploadEvents({
batchSize: 1000,
client: this.client,
clientWorkers: 5,
events: eventsToIndex,
logger: this.logger,
});
const indices = Object.values(writeTargets);
this.logger.info(`Indexed all data attempting to refresh: ${indices}`);
return this.client.indices.refresh({
index: Object.values(writeTargets),
index: indices,
allow_no_indices: true,
ignore_unavailable: true,
});
}
}

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import fetch from 'node-fetch';
import { Logger } from '../../utils/create_logger';
export class ApmSynthtraceKibanaClient {
constructor(private readonly logger: Logger) {}
async migrateCloudToManagedApm(cloudId: string, username: string, password: string) {
await this.logger.perf('migrate_apm_on_cloud', async () => {
this.logger.info('attempting to migrate cloud instance over to managed APM');
const cloudUrls = Buffer.from(cloudId.split(':')[1], 'base64').toString().split('$');
const kibanaCloudUrl = `https://${cloudUrls[2]}.${cloudUrls[0]}`;
const response = await fetch(
kibanaCloudUrl + '/internal/apm/fleet/cloud_apm_package_policy',
{
method: 'POST', // *GET, POST, PUT, DELETE, etc.
headers: {
Authorization: 'Basic ' + Buffer.from(username + ':' + password).toString('base64'),
Accept: 'application/json',
'Content-Type': 'application/json',
'kbn-xsrf': 'kibana',
},
}
);
const responseJson = await response.json();
if (responseJson.message) {
this.logger.info(`Cloud Instance already migrated to managed APM: ${responseJson.message}`);
}
if (responseJson.cloudApmPackagePolicy) {
this.logger.info(
`Cloud Instance migrated to managed APM: ${responseJson.cloudApmPackagePolicy.package.version}`
);
}
});
}
}

View file

@ -1,16 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ApmFields } from '../apm_fields';
export function getObserverDefaults(): ApmFields {
return {
'observer.version': '7.16.0',
'observer.version_major': 7,
};
}

View file

@ -7,12 +7,10 @@
*/
import { service } from './service';
import { browser } from './browser';
import { getTransactionMetrics } from './utils/get_transaction_metrics';
import { getSpanDestinationMetrics } from './utils/get_span_destination_metrics';
import { getObserverDefaults } from './defaults/get_observer_defaults';
import { getTransactionMetrics } from './processors/get_transaction_metrics';
import { getSpanDestinationMetrics } from './processors/get_span_destination_metrics';
import { getChromeUserAgentDefaults } from './defaults/get_chrome_user_agent_defaults';
import { apmEventsToElasticsearchOutput } from './utils/apm_events_to_elasticsearch_output';
import { getBreakdownMetrics } from './utils/get_breakdown_metrics';
import { getBreakdownMetrics } from './processors/get_breakdown_metrics';
import { getApmWriteTargets } from './utils/get_apm_write_targets';
import { ApmSynthtraceEsClient } from './client/apm_synthtrace_es_client';
@ -23,9 +21,7 @@ export const apm = {
browser,
getTransactionMetrics,
getSpanDestinationMetrics,
getObserverDefaults,
getChromeUserAgentDefaults,
apmEventsToElasticsearchOutput,
getBreakdownMetrics,
getApmWriteTargets,
ApmSynthtraceEsClient,

View file

@ -8,7 +8,7 @@
import objectHash from 'object-hash';
import { groupBy, pickBy } from 'lodash';
import { ApmFields } from '../apm_fields';
import { createPicker } from './create_picker';
import { createPicker } from '../utils/create_picker';
const instanceFields = [
'container.*',
@ -41,7 +41,10 @@ export function getBreakdownMetrics(events: ApmFields[]) {
Object.keys(txWithSpans).forEach((transactionId) => {
const txEvents = txWithSpans[transactionId];
const transaction = txEvents.find((event) => event['processor.event'] === 'transaction')!;
const transaction = txEvents.find((event) => event['processor.event'] === 'transaction');
if (transaction === undefined) {
return;
}
const eventsById: Record<string, ApmFields> = {};
const activityByParentId: Record<string, Array<{ from: number; to: number }>> = {};

View file

@ -7,7 +7,7 @@
*/
import { ApmFields } from '../apm_fields';
import { aggregate } from './aggregate';
import { aggregate } from '../utils/aggregate';
export function getSpanDestinationMetrics(events: ApmFields[]) {
const exitSpans = events.filter((event) => !!event['span.destination.service.resource']);

View file

@ -8,7 +8,7 @@
import { sortBy } from 'lodash';
import { ApmFields } from '../apm_fields';
import { aggregate } from './aggregate';
import { aggregate } from '../utils/aggregate';
function sortAndCompressHistogram(histogram?: { values: number[]; counts: number[] }) {
return sortBy(histogram?.values).reduce(
@ -34,12 +34,13 @@ export function getTransactionMetrics(events: ApmFields[]) {
.map((transaction) => {
return {
...transaction,
['trace.root']: transaction['parent.id'] === undefined,
['transaction.root']: transaction['parent.id'] === undefined,
};
});
const metricsets = aggregate(transactions, [
'trace.root',
'transaction.root',
'transaction.name',
'transaction.type',
'event.outcome',
@ -77,7 +78,6 @@ export function getTransactionMetrics(events: ApmFields[]) {
histogram.counts.push(1);
histogram.values.push(Number(transaction['transaction.duration.us']));
}
return {
...metricset.key,
'metricset.name': 'transaction',

View file

@ -29,7 +29,6 @@ export function aggregate(events: ApmFields[], fields: string[]) {
const id = objectHash(key);
let metricset = metricsets.get(id);
if (!metricset) {
metricset = {
key: { ...key, 'processor.event': 'metric', 'processor.name': 'metric' },
@ -37,7 +36,6 @@ export function aggregate(events: ApmFields[], fields: string[]) {
};
metricsets.set(id, metricset);
}
metricset.events.push(event);
}

View file

@ -1,60 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { getObserverDefaults } from '../defaults/get_observer_defaults';
import { ApmFields } from '../apm_fields';
import { dedot } from '../../utils/dedot';
import { ElasticsearchOutput } from '../../utils/to_elasticsearch_output';
export interface ApmElasticsearchOutputWriteTargets {
transaction: string;
span: string;
error: string;
metric: string;
}
const observerDefaults = getObserverDefaults();
const esDocumentDefaults = {
ecs: {
version: '1.4',
},
};
dedot(observerDefaults, esDocumentDefaults);
export function apmEventsToElasticsearchOutput({
events,
writeTargets,
}: {
events: ApmFields[];
writeTargets: ApmElasticsearchOutputWriteTargets;
}): ElasticsearchOutput[] {
return events.map((event) => {
const values = {};
Object.assign(values, event, {
'@timestamp': new Date(event['@timestamp']!).toISOString(),
'timestamp.us': event['@timestamp']! * 1000,
'service.node.name':
event['service.node.name'] || event['container.id'] || event['host.name'],
});
const document = {};
Object.assign(document, esDocumentDefaults);
dedot(values, document);
return {
_index: writeTargets[event['processor.event'] as keyof ApmElasticsearchOutputWriteTargets],
_source: document,
timestamp: event['@timestamp']!,
};
});
}

View file

@ -7,13 +7,32 @@
*/
import { Client } from '@elastic/elasticsearch';
import { ApmElasticsearchOutputWriteTargets } from './apm_events_to_elasticsearch_output';
export interface ApmElasticsearchOutputWriteTargets {
transaction: string;
span: string;
error: string;
metric: string;
app_metric: string;
}
export async function getApmWriteTargets({
client,
forceDataStreams,
}: {
client: Client;
forceDataStreams?: boolean;
}): Promise<ApmElasticsearchOutputWriteTargets> {
if (forceDataStreams) {
return {
transaction: 'traces-apm-default',
span: 'traces-apm-default',
metric: 'metrics-apm.internal-default',
app_metric: 'metrics-apm.app-default',
error: 'logs-apm.error-default',
};
}
const [indicesResponse, datastreamsResponse] = await Promise.all([
client.indices.getAlias({
index: 'apm-*',
@ -40,11 +59,12 @@ export async function getApmWriteTargets({
.find(({ key, writeIndexAlias }) => writeIndexAlias && key.includes(filter))
?.writeIndexAlias!;
}
const metricsTarget = getDataStreamName('metrics-apm') || getAlias('-metric');
const targets = {
transaction: getDataStreamName('traces-apm') || getAlias('-transaction'),
span: getDataStreamName('traces-apm') || getAlias('-span'),
metric: getDataStreamName('metrics-apm') || getAlias('-metric'),
metric: metricsTarget,
app_metric: metricsTarget,
error: getDataStreamName('logs-apm') || getAlias('-error'),
};

View file

@ -6,27 +6,69 @@
* Side Public License, v 1.
*/
import moment from 'moment';
import { ApmFields } from './apm/apm_fields';
import { SpanIterable } from './span_iterable';
import { SpanGenerator } from './span_generator';
export class Interval {
export function parseInterval(interval: string): [number, string] {
const args = interval.match(/(\d+)(s|m|h|d)/);
if (!args || args.length < 3) {
throw new Error('Failed to parse interval');
}
return [Number(args[1]), args[2] as any];
}
export class Interval implements Iterable<number> {
constructor(
private readonly from: number,
private readonly to: number,
private readonly interval: string
) {}
public readonly from: Date,
public readonly to: Date,
public readonly interval: string,
public readonly yieldRate: number = 1
) {
[this.intervalAmount, this.intervalUnit] = parseInterval(interval);
}
rate(rate: number) {
let now = this.from;
const args = this.interval.match(/(.*)(s|m|h|d)/);
if (!args) {
throw new Error('Failed to parse interval');
private readonly intervalAmount: number;
private readonly intervalUnit: any;
spans(map: (timestamp: number, index?: number) => ApmFields[]): SpanIterable {
return new SpanGenerator(this, [
function* (i) {
let index = 0;
for (const x of i) {
for (const a of map(x, index)) {
yield a;
index++;
}
}
},
]);
}
rate(rate: number): Interval {
return new Interval(this.from, this.to, this.interval, rate);
}
private yieldRateTimestamps(timestamp: number) {
return new Array<number>(this.yieldRate).fill(timestamp);
}
private *_generate(): Iterable<number> {
if (this.from > this.to) {
let now = this.from;
do {
yield* this.yieldRateTimestamps(now.getTime());
now = new Date(moment(now).subtract(this.intervalAmount, this.intervalUnit).valueOf());
} while (now > this.to);
} else {
let now = this.from;
do {
yield* this.yieldRateTimestamps(now.getTime());
now = new Date(moment(now).add(this.intervalAmount, this.intervalUnit).valueOf());
} while (now < this.to);
}
const timestamps: number[] = [];
while (now < this.to) {
timestamps.push(...new Array<number>(rate).fill(now));
now = moment(now)
.add(Number(args[1]), args[2] as any)
.valueOf();
}
return timestamps;
}
[Symbol.iterator]() {
return this._generate()[Symbol.iterator]();
}
}

View file

@ -15,7 +15,7 @@ export class Serializable<TFields extends Fields> extends Entity<TFields> {
});
}
timestamp(time: number) {
timestamp(time: number): this {
this.fields['@timestamp'] = time;
return this;
}

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Interval } from './interval';
import { ApmFields } from './apm/apm_fields';
import { SpanGeneratorsUnion } from './span_generators_union';
import { SpanIterable } from './span_iterable';
export class SpanGenerator implements SpanIterable {
constructor(
private readonly interval: Interval,
private readonly dataGenerator: Array<(interval: Interval) => Generator<ApmFields>>
) {
this._order = interval.from > interval.to ? 'desc' : 'asc';
}
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
toArray(): ApmFields[] {
return Array.from(this);
}
concat(...iterables: SpanGenerator[]) {
return new SpanGeneratorsUnion([this, ...iterables]);
}
*[Symbol.iterator]() {
for (const iterator of this.dataGenerator) {
for (const fields of iterator(this.interval)) {
yield fields;
}
}
}
async *[Symbol.asyncIterator]() {
for (const iterator of this.dataGenerator) {
for (const fields of iterator(this.interval)) {
yield fields;
}
}
}
}

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ApmFields } from './apm/apm_fields';
import { SpanIterable } from './span_iterable';
import { merge } from './utils/merge_iterable';
export class SpanGeneratorsUnion implements SpanIterable {
constructor(private readonly dataGenerators: SpanIterable[]) {
const orders = new Set<'desc' | 'asc'>(dataGenerators.map((d) => d.order()));
if (orders.size > 1) throw Error('Can only combine intervals with the same order()');
this._order = orders.has('asc') ? 'asc' : 'desc';
}
static empty: SpanGeneratorsUnion = new SpanGeneratorsUnion([]);
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
toArray(): ApmFields[] {
return Array.from(this);
}
concat(...iterables: SpanIterable[]) {
return new SpanGeneratorsUnion([...this.dataGenerators, ...iterables]);
}
*[Symbol.iterator]() {
const iterator = merge(this.dataGenerators);
for (const fields of iterator) {
yield fields;
}
}
async *[Symbol.asyncIterator]() {
for (const iterator of this.dataGenerators) {
for (const fields of iterator) {
yield fields;
}
}
}
}

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ApmFields } from './apm/apm_fields';
import { SpanGeneratorsUnion } from './span_generators_union';
export interface SpanIterable extends Iterable<ApmFields>, AsyncIterable<ApmFields> {
order(): 'desc' | 'asc';
toArray(): ApmFields[];
concat(...iterables: SpanIterable[]): SpanGeneratorsUnion;
}
export class SpanArrayIterable implements SpanIterable {
constructor(private fields: ApmFields[]) {
const timestamps = fields.filter((f) => f['@timestamp']).map((f) => f['@timestamp']!);
this._order = timestamps.length > 1 ? (timestamps[0] > timestamps[1] ? 'desc' : 'asc') : 'asc';
}
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
async *[Symbol.asyncIterator](): AsyncIterator<ApmFields> {
return this.fields[Symbol.iterator]();
}
[Symbol.iterator](): Iterator<ApmFields> {
return this.fields[Symbol.iterator]();
}
concat(...iterables: SpanIterable[]): SpanGeneratorsUnion {
return new SpanGeneratorsUnion([this, ...iterables]);
}
toArray(): ApmFields[] {
return this.fields;
}
}

View file

@ -17,13 +17,13 @@ export class ClusterStats extends Serializable<StackMonitoringFields> {
this.fields['license.status'] = 'active';
}
timestamp(timestamp: number) {
timestamp(timestamp: number): this {
super.timestamp(timestamp);
this.fields['cluster_stats.timestamp'] = new Date(timestamp).toISOString();
return this;
}
indices(count: number) {
indices(count: number): this {
this.fields['cluster_stats.indices.count'] = count;
return this;
}

View file

@ -10,15 +10,15 @@ import { Serializable } from '../serializable';
import { StackMonitoringFields } from './stack_monitoring_fields';
export class KibanaStats extends Serializable<StackMonitoringFields> {
timestamp(timestamp: number) {
super.timestamp(timestamp);
timestamp(timestamp: number): this {
this.fields['kibana_stats.timestamp'] = new Date(timestamp).toISOString();
this.fields['kibana_stats.response_times.max'] = 250;
this.fields['kibana_stats.kibana.status'] = 'green';
this.fields.timestamp = timestamp;
return this;
}
requests(disconnects: number, total: number) {
requests(disconnects: number, total: number): this {
this.fields['kibana_stats.requests.disconnects'] = disconnects;
this.fields['kibana_stats.requests.total'] = total;
return this;

View file

@ -26,4 +26,5 @@ export type StackMonitoringFields = Fields &
'kibana_stats.requests.total': number;
'kibana_stats.timestamp': string;
'kibana_stats.response_times.max': number;
timestamp: number;
}>;

View file

@ -0,0 +1,207 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import { ApmFields } from './apm/apm_fields';
import { SpanIterable } from './span_iterable';
import { getTransactionMetrics } from './apm/processors/get_transaction_metrics';
import { getSpanDestinationMetrics } from './apm/processors/get_span_destination_metrics';
import { getBreakdownMetrics } from './apm/processors/get_breakdown_metrics';
import { parseInterval } from './interval';
import { dedot } from './utils/dedot';
import { ApmElasticsearchOutputWriteTargets } from './apm/utils/get_apm_write_targets';
import { Logger } from './utils/create_logger';
export interface StreamProcessorOptions {
processors: Array<(events: ApmFields[]) => ApmFields[]>;
flushInterval?: string;
maxBufferSize?: number;
// the maximum source events to process, not the maximum documents outputted by the processor
maxSourceEvents?: number;
logger?: Logger;
}
export class StreamProcessor {
public static readonly apmProcessors = [
getTransactionMetrics,
getSpanDestinationMetrics,
getBreakdownMetrics,
];
constructor(private readonly options: StreamProcessorOptions) {
[this.intervalAmount, this.intervalUnit] = this.options.flushInterval
? parseInterval(this.options.flushInterval)
: parseInterval('1m');
}
private readonly intervalAmount: number;
private readonly intervalUnit: any;
// TODO move away from chunking and feed this data one by one to processors
*stream(...eventSources: SpanIterable[]) {
const maxBufferSize = this.options.maxBufferSize ?? 10000;
const maxSourceEvents = this.options.maxSourceEvents;
let localBuffer = [];
let flushAfter: number | null = null;
let sourceEventsYielded = 0;
for (const eventSource of eventSources) {
const order = eventSource.order();
this.options.logger?.info(`order: ${order}`);
for (const event of eventSource) {
const eventDate = event['@timestamp'] as number;
localBuffer.push(event);
if (flushAfter === null && eventDate !== null) {
flushAfter = this.calculateFlushAfter(eventDate, order);
}
yield StreamProcessor.enrich(event);
sourceEventsYielded++;
if (maxSourceEvents && sourceEventsYielded % (maxSourceEvents / 10) === 0) {
this.options.logger?.info(`Yielded ${sourceEventsYielded} events`);
}
if (maxSourceEvents && sourceEventsYielded >= maxSourceEvents) {
// yielded the maximum source events, we still want the local buffer to generate derivative documents
break;
}
if (
localBuffer.length === maxBufferSize ||
(flushAfter != null &&
((order === 'asc' && eventDate > flushAfter) ||
(order === 'desc' && eventDate < flushAfter)))
) {
const e = new Date(eventDate).toISOString();
const f = new Date(flushAfter!).toISOString();
this.options.logger?.debug(
`flush ${localBuffer.length} documents ${order}: ${e} => ${f}`
);
for (const processor of this.options.processors) {
yield* processor(localBuffer).map(StreamProcessor.enrich);
}
localBuffer = [];
flushAfter = this.calculateFlushAfter(flushAfter, order);
}
}
if (maxSourceEvents && sourceEventsYielded >= maxSourceEvents) {
this.options.logger?.info(`Yielded maximum number of documents: ${maxSourceEvents}`);
break;
}
}
if (localBuffer.length > 0) {
this.options.logger?.info(`Processing remaining buffer: ${localBuffer.length} items left`);
for (const processor of this.options.processors) {
yield* processor(localBuffer).map(StreamProcessor.enrich);
}
}
}
private calculateFlushAfter(eventDate: number | null, order: 'asc' | 'desc') {
if (order === 'desc') {
return moment(eventDate).subtract(this.intervalAmount, this.intervalUnit).valueOf();
} else {
return moment(eventDate).add(this.intervalAmount, this.intervalUnit).valueOf();
}
}
async *streamAsync(...eventSources: SpanIterable[]): AsyncIterator<ApmFields> {
yield* this.stream(...eventSources);
}
*streamToDocument<TDocument>(
map: (d: ApmFields) => TDocument,
...eventSources: SpanIterable[]
): Generator<ApmFields> {
for (const apmFields of this.stream(...eventSources)) {
yield map(apmFields);
}
}
async *streamToDocumentAsync<TDocument>(
map: (d: ApmFields) => TDocument,
...eventSources: SpanIterable[]
): AsyncIterator<ApmFields> {
for (const apmFields of this.stream(...eventSources)) {
yield map(apmFields);
}
}
streamToArray(...eventSources: SpanIterable[]) {
return Array.from<ApmFields>(this.stream(...eventSources));
}
static enrich(document: ApmFields): ApmFields {
// see https://github.com/elastic/apm-server/issues/7088 can not be provided as flat key/values
document.observer = {
version: '8.0.0',
version_major: 8,
};
document['service.node.name'] =
document['service.node.name'] || document['container.id'] || document['host.name'];
document['ecs.version'] = '1.4';
// TODO this non standard field should not be enriched here
if (document['processor.event'] !== 'metric') {
document['timestamp.us'] = document['@timestamp']! * 1000;
}
return document;
}
static toDocument(document: ApmFields): Record<string, any> {
if (!document.observer) {
document = StreamProcessor.enrich(document);
}
const newDoc: Record<string, any> = {};
dedot(document, newDoc);
if (typeof newDoc['@timestamp'] === 'number') {
const timestamp = newDoc['@timestamp'];
newDoc['@timestamp'] = new Date(timestamp).toISOString();
}
return newDoc;
}
static getDataStreamForEvent(
d: Record<string, any>,
writeTargets: ApmElasticsearchOutputWriteTargets
) {
if (!d.processor?.event) {
throw Error("'processor.event' is not set on document, can not determine target index");
}
const eventType = d.processor.event as keyof ApmElasticsearchOutputWriteTargets;
let dataStream = writeTargets[eventType];
if (eventType === 'metric') {
if (!d.service?.name) {
dataStream = 'metrics-apm.app-default';
} else {
if (!d.transaction && !d.span) {
dataStream = 'metrics-apm.app-default';
}
}
}
return dataStream;
}
static getIndexForEvent(
d: Record<string, any>,
writeTargets: ApmElasticsearchOutputWriteTargets
) {
if (!d.processor?.event) {
throw Error("'processor.event' is not set on document, can not determine target index");
}
const eventType = d.processor.event as keyof ApmElasticsearchOutputWriteTargets;
return writeTargets[eventType];
}
}
export async function* streamProcessAsync(
processors: Array<(events: ApmFields[]) => ApmFields[]>,
...eventSources: SpanIterable[]
) {
return new StreamProcessor({ processors }).streamAsync(...eventSources);
}
export function streamProcessToArray(
processors: Array<(events: ApmFields[]) => ApmFields[]>,
...eventSources: SpanIterable[]
) {
return new StreamProcessor({ processors }).streamToArray(...eventSources);
}

View file

@ -9,13 +9,16 @@
import { Interval } from './interval';
export class Timerange {
constructor(private from: number, private to: number) {}
constructor(private from: Date, private to: Date) {}
interval(interval: string) {
return new Interval(this.from, this.to, interval);
}
}
export function timerange(from: number, to: number) {
return new Timerange(from, to);
export function timerange(from: Date | number, to: Date | number) {
return new Timerange(
from instanceof Date ? from : new Date(from),
to instanceof Date ? to : new Date(to)
);
}

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ApmFields } from '../apm/apm_fields';
import { SpanIterable } from '../span_iterable';
export function merge(iterables: SpanIterable[]): Iterable<ApmFields> {
if (iterables.length === 1) return iterables[0];
const iterators = iterables.map<Iterator<ApmFields>>((i) => {
return i[Symbol.iterator]();
});
let done = false;
const myIterable: Iterable<ApmFields> = {
*[Symbol.iterator]() {
do {
const items = iterators.map((i) => i.next());
done = items.every((item) => item.done);
if (!done) {
yield* items.filter((i) => !i.done).map((i) => i.value);
}
} while (!done);
// Done for the first time: close all iterators
for (const iterator of iterators) {
if (typeof iterator.return === 'function') {
iterator.return();
}
}
},
};
return myIterable;
}

View file

@ -1,44 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Fields } from '../entity';
import { dedot } from './dedot';
export interface ElasticsearchOutput {
_index: string;
_source: unknown;
timestamp: number;
}
export function eventsToElasticsearchOutput({
events,
writeTarget,
}: {
events: Fields[];
writeTarget: string;
}): ElasticsearchOutput[] {
return events.map((event) => {
const values = {};
const timestamp = event['@timestamp']!;
Object.assign(values, event, {
'@timestamp': new Date(timestamp).toISOString(),
});
const document = {};
dedot(values, document);
return {
_index: writeTarget,
_source: document,
timestamp,
};
});
}

View file

@ -7,16 +7,15 @@
*/
import { apm, timerange } from '../../index';
import { apmEventsToElasticsearchOutput } from '../../lib/apm/utils/apm_events_to_elasticsearch_output';
import { getApmWriteTargets } from '../../lib/apm/utils/get_apm_write_targets';
import { Instance } from '../../lib/apm/instance';
import { Scenario } from '../scenario';
import { getCommonServices } from '../utils/get_common_services';
import { RunOptions } from '../utils/parse_run_cli_flags';
const scenario: Scenario = async ({ target, logLevel, scenarioOpts }) => {
const { client, logger } = getCommonServices({ target, logLevel });
const writeTargets = await getApmWriteTargets({ client });
const scenario: Scenario = async (runOptions: RunOptions) => {
const { logger } = getCommonServices(runOptions);
const { numServices = 3 } = scenarioOpts || {};
const { numServices = 3 } = runOptions.scenarioOpts || {};
return {
generate: ({ from, to }) => {
@ -28,79 +27,65 @@ const scenario: Scenario = async ({ target, logLevel, scenarioOpts }) => {
const failedTimestamps = range.interval('1s').rate(1);
return new Array(numServices).fill(undefined).flatMap((_, index) => {
const events = logger.perf('generating_apm_events', () => {
const instance = apm
.service(`opbeans-go-${index}`, 'production', 'go')
.instance('instance');
const successfulTraceEvents = successfulTimestamps.flatMap((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
.duration(1000)
.success()
.children(
instance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.duration(1000)
.success()
.destination('elasticsearch')
.timestamp(timestamp),
instance
.span('custom_operation', 'custom')
.duration(100)
.success()
.timestamp(timestamp)
)
.serialize()
);
const failedTraceEvents = failedTimestamps.flatMap((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
.duration(1000)
.failure()
.errors(
instance
.error('[ResponseError] index_not_found_exception')
.timestamp(timestamp + 50)
)
.serialize()
);
const metricsets = range
.interval('30s')
.rate(1)
.flatMap((timestamp) =>
const instances = [...Array(numServices).keys()].map((index) =>
apm.service(`opbeans-go-${index}`, 'production', 'go').instance('instance')
);
const instanceSpans = (instance: Instance) => {
const successfulTraceEvents = successfulTimestamps.spans((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
.duration(1000)
.success()
.children(
instance
.appMetrics({
'system.memory.actual.free': 800,
'system.memory.total': 1000,
'system.cpu.total.norm.pct': 0.6,
'system.process.cpu.total.norm.pct': 0.7,
})
.span('GET apm-*/_search', 'db', 'elasticsearch')
.duration(1000)
.success()
.destination('elasticsearch')
.timestamp(timestamp),
instance
.span('custom_operation', 'custom')
.duration(100)
.success()
.timestamp(timestamp)
.serialize()
);
return [...successfulTraceEvents, ...failedTraceEvents, ...metricsets];
});
return logger.perf('apm_events_to_es_output', () =>
apmEventsToElasticsearchOutput({
events: [
...events,
...logger.perf('get_transaction_metrics', () => apm.getTransactionMetrics(events)),
...logger.perf('get_span_destination_metrics', () =>
apm.getSpanDestinationMetrics(events)
),
...logger.perf('get_breakdown_metrics', () => apm.getBreakdownMetrics(events)),
],
writeTargets,
})
)
.serialize()
);
});
const failedTraceEvents = failedTimestamps.spans((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
.duration(1000)
.failure()
.errors(
instance.error('[ResponseError] index_not_found_exception').timestamp(timestamp + 50)
)
.serialize()
);
const metricsets = range
.interval('30s')
.rate(1)
.spans((timestamp) =>
instance
.appMetrics({
'system.memory.actual.free': 800,
'system.memory.total': 1000,
'system.cpu.total.norm.pct': 0.6,
'system.process.cpu.total.norm.pct': 0.7,
})
.timestamp(timestamp)
.serialize()
);
return successfulTraceEvents.concat(failedTraceEvents, metricsets);
};
return instances
.map((instance) => logger.perf('generating_apm_events', () => instanceSpans(instance)))
.reduce((p, c) => p.concat(c));
},
};
};

View file

@ -7,14 +7,14 @@
*/
import { stackMonitoring, timerange } from '../../index';
import { eventsToElasticsearchOutput } from '../../lib/utils/to_elasticsearch_output';
import { Scenario } from '../scenario';
import { getCommonServices } from '../utils/get_common_services';
import { RunOptions } from '../utils/parse_run_cli_flags';
const scenario: Scenario = async ({ target, writeTarget, logLevel }) => {
const { logger } = getCommonServices({ target, logLevel });
const scenario: Scenario = async (runOptions: RunOptions) => {
const { logger } = getCommonServices(runOptions);
if (!writeTarget) {
if (!runOptions.writeTarget) {
throw new Error('Write target is not defined');
}
@ -26,20 +26,11 @@ const scenario: Scenario = async ({ target, writeTarget, logLevel }) => {
return range
.interval('30s')
.rate(1)
.flatMap((timestamp) => {
.spans((timestamp) => {
const events = logger.perf('generating_sm_events', () => {
return kibanaStats.timestamp(timestamp).requests(10, 20).serialize();
});
return logger.perf('sm_events_to_es_output', () => {
const smEvents = eventsToElasticsearchOutput({ events, writeTarget });
smEvents.forEach((event: any) => {
const ts = event._source['@timestamp'];
delete event._source['@timestamp'];
event._source.timestamp = ts;
});
return smEvents;
});
return events;
});
},
};

View file

@ -9,32 +9,19 @@
// Run with: node ./src/scripts/run ./src/scripts/examples/03_monitoring.ts --target=http://elastic:changeme@localhost:9200
import { stackMonitoring, timerange } from '../../index';
import {
ElasticsearchOutput,
eventsToElasticsearchOutput,
} from '../../lib/utils/to_elasticsearch_output';
import { Scenario } from '../scenario';
import { getCommonServices } from '../utils/get_common_services';
import { StackMonitoringFields } from '../../lib/stack_monitoring/stack_monitoring_fields';
import { RunOptions } from '../utils/parse_run_cli_flags';
// TODO (mat): move this into a function like utils/apm_events_to_elasticsearch_output.ts
function smEventsToElasticsearchOutput(
events: StackMonitoringFields[],
writeTarget: string
): ElasticsearchOutput[] {
const smEvents = eventsToElasticsearchOutput({ events, writeTarget });
smEvents.forEach((event: any) => {
const ts = event._source['@timestamp'];
delete event._source['@timestamp'];
event._source.timestamp = ts;
});
return smEvents;
}
const scenario: Scenario = async ({ target, logLevel }) => {
const { logger } = getCommonServices({ target, logLevel });
const scenario: Scenario = async (runOptions: RunOptions) => {
const { logger } = getCommonServices(runOptions);
return {
mapToIndex: (data) => {
return data.kibana_stats?.kibana?.name
? '.monitoring-kibana-7-synthtrace'
: '.monitoring-es-7-synthtrace';
},
generate: ({ from, to }) => {
const cluster = stackMonitoring.cluster('test-cluster');
const clusterStats = cluster.stats();
@ -44,24 +31,14 @@ const scenario: Scenario = async ({ target, logLevel }) => {
return range
.interval('10s')
.rate(1)
.flatMap((timestamp) => {
.spans((timestamp) => {
const clusterEvents = logger.perf('generating_es_events', () => {
return clusterStats.timestamp(timestamp).indices(115).serialize();
});
const clusterOutputs = smEventsToElasticsearchOutput(
clusterEvents,
'.monitoring-es-7-synthtrace'
);
const kibanaEvents = logger.perf('generating_kb_events', () => {
return kibanaStats.timestamp(timestamp).requests(10, 20).serialize();
});
const kibanaOutputs = smEventsToElasticsearchOutput(
kibanaEvents,
'.monitoring-kibana-7-synthtrace'
);
return [...clusterOutputs, ...kibanaOutputs];
return [...clusterEvents, ...kibanaEvents];
});
},
};

View file

@ -13,6 +13,8 @@ import { startHistoricalDataUpload } from './utils/start_historical_data_upload'
import { startLiveDataUpload } from './utils/start_live_data_upload';
import { parseRunCliFlags } from './utils/parse_run_cli_flags';
import { getCommonServices } from './utils/get_common_services';
import { ApmSynthtraceKibanaClient } from '../lib/apm/client/apm_synthtrace_kibana_client';
import { ApmSynthtraceEsClient } from '../lib/apm/client/apm_synthtrace_es_client';
function options(y: Argv) {
return y
@ -22,10 +24,24 @@ function options(y: Argv) {
string: true,
})
.option('target', {
describe: 'Elasticsearch target, including username/password',
demandOption: true,
describe: 'Elasticsearch target',
string: true,
})
.option('cloudId', {
describe:
'Provide connection information and will force APM on the cloud to migrate to run as a Fleet integration',
string: true,
})
.option('username', {
describe: 'Basic authentication username',
string: true,
demandOption: true,
})
.option('password', {
describe: 'Basic authentication password',
string: true,
demandOption: true,
})
.option('from', {
description: 'The start of the time window',
})
@ -36,6 +52,20 @@ function options(y: Argv) {
description: 'Generate and index data continuously',
boolean: true,
})
.option('--dryRun', {
description: 'Enumerates the stream without sending events to Elasticsearch ',
boolean: true,
})
.option('maxDocs', {
description:
'The maximum number of documents we are allowed to generate, should be multiple of 10.000',
number: true,
})
.option('numShards', {
description:
'Updates the component templates to update the number of primary shards, requires cloudId to be provided',
number: true,
})
.option('clean', {
describe: 'Clean APM indices before indexing new data',
default: false,
@ -75,7 +105,9 @@ function options(y: Argv) {
return arg as Record<string, any> | undefined;
},
})
.conflicts('to', 'live');
.conflicts('to', 'live')
.conflicts('maxDocs', 'live')
.conflicts('target', 'cloudId');
}
export type RunCliFlags = ReturnType<typeof options>['argv'];
@ -84,38 +116,57 @@ yargs(process.argv.slice(2))
.command('*', 'Generate data and index into Elasticsearch', options, async (argv) => {
const runOptions = parseRunCliFlags(argv);
const { logger } = getCommonServices(runOptions);
const { logger, client } = getCommonServices(runOptions);
const to = datemath.parse(String(argv.to ?? 'now'))!.valueOf();
const from = argv.from
const toMs = datemath.parse(String(argv.to ?? 'now'))!.valueOf();
const to = new Date(toMs);
const defaultTimeRange = !runOptions.maxDocs ? '15m' : '52w';
const fromMs = argv.from
? datemath.parse(String(argv.from))!.valueOf()
: to - intervalToMs('15m');
: toMs - intervalToMs(defaultTimeRange);
const from = new Date(fromMs);
const live = argv.live;
const forceDataStreams = !!runOptions.cloudId;
const esClient = new ApmSynthtraceEsClient(client, logger, forceDataStreams);
if (runOptions.dryRun) {
await startHistoricalDataUpload(esClient, logger, runOptions, from, to);
return;
}
if (runOptions.cloudId) {
const kibanaClient = new ApmSynthtraceKibanaClient(logger);
await kibanaClient.migrateCloudToManagedApm(
runOptions.cloudId,
runOptions.username,
runOptions.password
);
}
if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) {
await esClient.updateComponentTemplates(runOptions.numShards);
}
if (argv.clean) {
await esClient.clean();
}
logger.info(
`Starting data generation\n: ${JSON.stringify(
{
...runOptions,
from: new Date(from).toISOString(),
to: new Date(to).toISOString(),
from: from.toISOString(),
to: to.toISOString(),
},
null,
2
)}`
);
startHistoricalDataUpload({
...runOptions,
from,
to,
});
await startHistoricalDataUpload(esClient, logger, runOptions, from, to);
if (live) {
startLiveDataUpload({
...runOptions,
start: to,
});
await startLiveDataUpload(esClient, logger, runOptions, to);
}
})
.parse();

View file

@ -6,8 +6,11 @@
* Side Public License, v 1.
*/
import { ElasticsearchOutput } from '../lib/utils/to_elasticsearch_output';
import { RunOptions } from './utils/parse_run_cli_flags';
import { SpanIterable } from '../lib/span_iterable';
type Generate = (range: { from: number; to: number }) => ElasticsearchOutput[];
export type Scenario = (options: RunOptions) => Promise<{ generate: Generate }>;
type Generate = (range: { from: Date; to: Date }) => SpanIterable;
export type Scenario = (options: RunOptions) => Promise<{
generate: Generate;
mapToIndex?: (data: Record<string, any>) => string;
}>;

View file

@ -6,13 +6,21 @@
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { createLogger, LogLevel } from '../../lib/utils/create_logger';
import { Client, ClientOptions } from '@elastic/elasticsearch';
import { createLogger } from '../../lib/utils/create_logger';
import { RunOptions } from './parse_run_cli_flags';
export function getCommonServices({ target, logLevel }: { target: string; logLevel: LogLevel }) {
const client = new Client({
node: target,
});
export function getCommonServices({ target, cloudId, username, password, logLevel }: RunOptions) {
if (!target && !cloudId) {
throw Error('target or cloudId needs to be specified');
}
const options: ClientOptions = !!target ? { node: target } : { cloud: { id: cloudId! } };
options.auth = {
username,
password,
};
const client = new Client(options);
const logger = createLogger(logLevel);

View file

@ -49,12 +49,18 @@ export function parseRunCliFlags(flags: RunCliFlags) {
return {
...pick(
flags,
'maxDocs',
'target',
'cloudId',
'username',
'password',
'workers',
'clientWorkers',
'batchSize',
'writeTarget',
'scenarioOpts'
'numShards',
'scenarioOpts',
'dryRun'
),
intervalInMs,
bucketSizeInMs,

View file

@ -5,101 +5,55 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import pLimit from 'p-limit';
import Path from 'path';
import { Worker } from 'worker_threads';
import { getCommonServices } from './get_common_services';
import { RunOptions } from './parse_run_cli_flags';
import { WorkerData } from './upload_next_batch';
import { getScenario } from './get_scenario';
import { ApmSynthtraceEsClient } from '../../lib/apm';
import { Logger } from '../../lib/utils/create_logger';
import { StreamProcessor } from '../../lib/stream_processor';
export async function startHistoricalDataUpload({
from,
to,
intervalInMs,
bucketSizeInMs,
workers,
clientWorkers,
batchSize,
logLevel,
target,
file,
writeTarget,
scenarioOpts,
}: RunOptions & { from: number; to: number }) {
let requestedUntil: number = from;
export async function startHistoricalDataUpload(
esClient: ApmSynthtraceEsClient,
logger: Logger,
runOptions: RunOptions,
from: Date,
to: Date
) {
const file = runOptions.file;
const scenario = await logger.perf('get_scenario', () => getScenario({ file, logger }));
const { logger } = getCommonServices({ target, logLevel });
const { generate, mapToIndex } = await scenario(runOptions);
function processNextBatch() {
const bucketFrom = requestedUntil;
const bucketTo = Math.min(to, bucketFrom + bucketSizeInMs);
// if we want to generate a maximum number of documents reverse generation to descend.
[from, to] = runOptions.maxDocs ? [to, from] : [from, to];
if (bucketFrom === bucketTo) {
return;
}
logger.info(`Generating data from ${from} to ${to}`);
requestedUntil = bucketTo;
const events = logger.perf('generate_scenario', () => generate({ from, to }));
logger.info(
`Starting worker for ${new Date(bucketFrom).toISOString()} to ${new Date(
bucketTo
).toISOString()}`
);
const workerData: WorkerData = {
bucketFrom,
bucketTo,
file,
logLevel,
batchSize,
bucketSizeInMs,
clientWorkers,
intervalInMs,
target,
workers,
writeTarget,
scenarioOpts,
};
const worker = new Worker(Path.join(__dirname, './upload_next_batch.js'), {
workerData,
});
logger.perf('created_worker', () => {
return new Promise<void>((resolve, reject) => {
worker.on('online', () => {
resolve();
});
});
});
logger.perf('completed_worker', () => {
return new Promise<void>((resolve, reject) => {
worker.on('exit', () => {
resolve();
});
});
});
return new Promise<void>((resolve, reject) => {
worker.on('error', (err) => {
reject(err);
});
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped: exit code ${code}`));
return;
}
logger.debug('Worker completed');
resolve();
});
if (runOptions.dryRun) {
const maxDocs = runOptions.maxDocs;
const stream = new StreamProcessor({
processors: StreamProcessor.apmProcessors,
maxSourceEvents: maxDocs,
logger,
}).streamToDocument(StreamProcessor.toDocument, events);
logger.perf('enumerate_scenario', () => {
// @ts-ignore
// We just want to enumerate
let yielded = 0;
for (const _ of stream) {
yielded++;
}
});
return;
}
const numBatches = Math.ceil((to - from) / bucketSizeInMs);
const limiter = pLimit(workers);
return Promise.all(new Array(numBatches).fill(undefined).map((_) => limiter(processNextBatch)));
const clientWorkers = runOptions.clientWorkers;
await logger.perf('index_scenario', () =>
esClient.index(events, {
concurrency: clientWorkers,
maxDocs: runOptions.maxDocs,
mapToIndex,
})
);
}

View file

@ -8,49 +8,36 @@
import { partition } from 'lodash';
import { getScenario } from './get_scenario';
import { uploadEvents } from './upload_events';
import { RunOptions } from './parse_run_cli_flags';
import { getCommonServices } from './get_common_services';
import { ElasticsearchOutput } from '../../lib/utils/to_elasticsearch_output';
import { ApmFields } from '../../lib/apm/apm_fields';
import { ApmSynthtraceEsClient } from '../../lib/apm';
import { Logger } from '../../lib/utils/create_logger';
import { SpanArrayIterable } from '../../lib/span_iterable';
export async function startLiveDataUpload({
file,
start,
bucketSizeInMs,
intervalInMs,
clientWorkers,
batchSize,
target,
logLevel,
workers,
writeTarget,
scenarioOpts,
}: RunOptions & { start: number }) {
let queuedEvents: ElasticsearchOutput[] = [];
let requestedUntil: number = start;
const { logger, client } = getCommonServices({ target, logLevel });
export async function startLiveDataUpload(
esClient: ApmSynthtraceEsClient,
logger: Logger,
runOptions: RunOptions,
start: Date
) {
const file = runOptions.file;
const scenario = await getScenario({ file, logger });
const { generate } = await scenario({
batchSize,
bucketSizeInMs,
clientWorkers,
file,
intervalInMs,
logLevel,
target,
workers,
writeTarget,
scenarioOpts,
});
const { generate, mapToIndex } = await scenario(runOptions);
function uploadNextBatch() {
const end = new Date().getTime();
let queuedEvents: ApmFields[] = [];
let requestedUntil: Date = start;
async function uploadNextBatch() {
const end = new Date();
if (end > requestedUntil) {
const bucketFrom = requestedUntil;
const bucketTo = requestedUntil + bucketSizeInMs;
const nextEvents = generate({ from: bucketFrom, to: bucketTo });
const bucketTo = new Date(requestedUntil.getTime() + runOptions.bucketSizeInMs);
// TODO this materializes into an array, assumption is that the live buffer will fit in memory
const nextEvents = logger.perf('execute_scenario', () =>
generate({ from: bucketFrom, to: bucketTo }).toArray()
);
logger.debug(
`Requesting ${new Date(bucketFrom).toISOString()} to ${new Date(
bucketTo
@ -62,23 +49,27 @@ export async function startLiveDataUpload({
const [eventsToUpload, eventsToRemainInQueue] = partition(
queuedEvents,
(event) => event.timestamp <= end
(event) => event['@timestamp'] !== undefined && event['@timestamp'] <= end.getTime()
);
logger.info(`Uploading until ${new Date(end).toISOString()}, events: ${eventsToUpload.length}`);
queuedEvents = eventsToRemainInQueue;
uploadEvents({
events: eventsToUpload,
clientWorkers,
batchSize,
logger,
client,
});
await logger.perf('index_live_scenario', () =>
esClient.index(new SpanArrayIterable(eventsToUpload), {
concurrency: runOptions.clientWorkers,
maxDocs: runOptions.maxDocs,
mapToIndex,
})
);
}
setInterval(uploadNextBatch, intervalInMs);
uploadNextBatch();
do {
await uploadNextBatch();
await delay(runOptions.intervalInMs);
} while (true);
}
async function delay(ms: number) {
return await new Promise((resolve) => setTimeout(resolve, ms));
}

View file

@ -1,66 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { chunk } from 'lodash';
import pLimit from 'p-limit';
import { inspect } from 'util';
import { ElasticsearchOutput } from '../../lib/utils/to_elasticsearch_output';
import { Logger } from '../../lib/utils/create_logger';
export function uploadEvents({
events,
client,
clientWorkers,
batchSize,
logger,
}: {
events: ElasticsearchOutput[];
client: Client;
clientWorkers: number;
batchSize: number;
logger: Logger;
}) {
const fn = pLimit(clientWorkers);
const batches = chunk(events, batchSize);
if (!batches.length) {
return;
}
logger.debug(`Uploading ${events.length} in ${batches.length} batches`);
const time = new Date().getTime();
return Promise.all(
batches.map((batch) =>
fn(() => {
return logger.perf('bulk_upload', () =>
client.bulk({
refresh: false,
body: batch.flatMap((doc) => {
return [{ index: { _index: doc._index } }, doc._source];
}),
})
);
})
)
).then((results) => {
const errors = results
.flatMap((result) => result.items)
.filter((item) => !!item.index?.error)
.map((item) => item.index?.error);
if (errors.length) {
logger.error(inspect(errors.slice(0, 10), { depth: null }));
throw new Error('Failed to upload some items');
}
logger.debug(`Uploaded ${events.length} in ${new Date().getTime() - time}ms`);
});
}

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/* eslint-disable @typescript-eslint/no-var-requires*/
require('@babel/register')({
extensions: ['.ts', '.js'],
presets: [['@babel/preset-env', { targets: { node: 'current' } }], '@babel/preset-typescript'],
});
require('./upload_next_batch.ts');

View file

@ -1,95 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
// add this to workerExample.js file.
import { Client } from '@elastic/elasticsearch';
import { workerData } from 'worker_threads';
import { getScenario } from './get_scenario';
import { createLogger, LogLevel } from '../../lib/utils/create_logger';
import { uploadEvents } from './upload_events';
export interface WorkerData {
bucketFrom: number;
bucketTo: number;
file: string;
scenarioOpts: Record<string, any> | undefined;
logLevel: LogLevel;
clientWorkers: number;
batchSize: number;
intervalInMs: number;
bucketSizeInMs: number;
target: string;
workers: number;
writeTarget?: string;
}
const {
bucketFrom,
bucketTo,
file,
logLevel,
clientWorkers,
batchSize,
intervalInMs,
bucketSizeInMs,
workers,
target,
writeTarget,
scenarioOpts,
} = workerData as WorkerData;
async function uploadNextBatch() {
if (bucketFrom === bucketTo) {
return;
}
const logger = createLogger(logLevel);
const client = new Client({
node: target,
});
const scenario = await logger.perf('get_scenario', () => getScenario({ file, logger }));
const { generate } = await scenario({
intervalInMs,
bucketSizeInMs,
logLevel,
file,
clientWorkers,
batchSize,
target,
workers,
writeTarget,
scenarioOpts,
});
const events = logger.perf('execute_scenario', () =>
generate({ from: bucketFrom, to: bucketTo })
);
return uploadEvents({
events,
client,
clientWorkers,
batchSize,
logger,
});
}
uploadNextBatch()
.then(() => {
process.exit(0);
})
.catch((error) => {
// eslint-disable-next-line
console.log(error);
// make sure error shows up in console before process is killed
setTimeout(() => {
process.exit(1);
}, 100);
});

View file

@ -6,15 +6,8 @@
* Side Public License, v 1.
*/
import { apmEventsToElasticsearchOutput } from '../lib/apm/utils/apm_events_to_elasticsearch_output';
import { ApmFields } from '../lib/apm/apm_fields';
const writeTargets = {
transaction: 'apm-8.0.0-transaction',
span: 'apm-8.0.0-span',
metric: 'apm-8.0.0-metric',
error: 'apm-8.0.0-error',
};
import { StreamProcessor } from '../lib/stream_processor';
describe('output apm events to elasticsearch', () => {
let event: ApmFields;
@ -29,32 +22,31 @@ describe('output apm events to elasticsearch', () => {
});
it('properly formats @timestamp', () => {
const doc = apmEventsToElasticsearchOutput({ events: [event], writeTargets })[0] as any;
expect(doc._source['@timestamp']).toEqual('2020-12-31T23:00:00.000Z');
const doc = StreamProcessor.toDocument(event);
expect(doc['@timestamp']).toEqual('2020-12-31T23:00:00.000Z');
});
it('formats a nested object', () => {
const doc = apmEventsToElasticsearchOutput({ events: [event], writeTargets })[0] as any;
const doc = StreamProcessor.toDocument(event);
expect(doc._source.processor).toEqual({
expect(doc.processor).toEqual({
event: 'transaction',
name: 'transaction',
});
});
it('formats all fields consistently', () => {
const doc = apmEventsToElasticsearchOutput({ events: [event], writeTargets })[0] as any;
const doc = StreamProcessor.toDocument(event);
expect(doc._source).toMatchInlineSnapshot(`
expect(doc).toMatchInlineSnapshot(`
Object {
"@timestamp": "2020-12-31T23:00:00.000Z",
"ecs": Object {
"version": "1.4",
},
"observer": Object {
"version": "7.16.0",
"version_major": 7,
"version": "8.0.0",
"version_major": 8,
},
"processor": Object {
"event": "transaction",

View file

@ -17,14 +17,14 @@ describe('simple trace', () => {
const javaInstance = javaService.instance('instance-1');
const range = timerange(
new Date('2021-01-01T00:00:00.000Z').getTime(),
new Date('2021-01-01T00:15:00.000Z').getTime()
new Date('2021-01-01T00:00:00.000Z'),
new Date('2021-01-01T00:15:00.000Z')
);
events = range
.interval('1m')
.rate(1)
.flatMap((timestamp) =>
.spans((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
@ -38,7 +38,8 @@ describe('simple trace', () => {
.timestamp(timestamp + 50)
)
.serialize()
);
)
.toArray();
});
it('generates the same data every time', () => {

View file

@ -8,7 +8,8 @@
import { apm } from '../../lib/apm';
import { timerange } from '../../lib/timerange';
import { getTransactionMetrics } from '../../lib/apm/utils/get_transaction_metrics';
import { getTransactionMetrics } from '../../lib/apm/processors/get_transaction_metrics';
import { StreamProcessor } from '../../lib/stream_processor';
describe('transaction metrics', () => {
let events: Array<Record<string, any>>;
@ -18,36 +19,29 @@ describe('transaction metrics', () => {
const javaInstance = javaService.instance('instance-1');
const range = timerange(
new Date('2021-01-01T00:00:00.000Z').getTime(),
new Date('2021-01-01T00:15:00.000Z').getTime()
new Date('2021-01-01T00:00:00.000Z'),
new Date('2021-01-01T00:15:00.000Z')
);
events = getTransactionMetrics(
range
.interval('1m')
.rate(25)
.flatMap((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
.success()
.timestamp(timestamp)
.serialize()
)
.concat(
range
.interval('1m')
.rate(50)
.flatMap((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
.failure()
.timestamp(timestamp)
.serialize()
)
)
);
const span = (timestamp: number) =>
javaInstance.transaction('GET /api/product/list').duration(1000).timestamp(timestamp);
const processor = new StreamProcessor({
processors: [getTransactionMetrics],
flushInterval: '15m',
});
events = processor
.streamToArray(
range
.interval('1m')
.rate(25)
.spans((timestamp) => span(timestamp).success().serialize()),
range
.interval('1m')
.rate(50)
.spans((timestamp) => span(timestamp).failure().serialize())
)
.filter((fields) => fields['metricset.name'] === 'transaction');
});
it('generates the right amount of transaction metrics', () => {

View file

@ -8,7 +8,8 @@
import { apm } from '../../lib/apm';
import { timerange } from '../../lib/timerange';
import { getSpanDestinationMetrics } from '../../lib/apm/utils/get_span_destination_metrics';
import { getSpanDestinationMetrics } from '../../lib/apm/processors/get_span_destination_metrics';
import { StreamProcessor } from '../../lib/stream_processor';
describe('span destination metrics', () => {
let events: Array<Record<string, any>>;
@ -18,57 +19,57 @@ describe('span destination metrics', () => {
const javaInstance = javaService.instance('instance-1');
const range = timerange(
new Date('2021-01-01T00:00:00.000Z').getTime(),
new Date('2021-01-01T00:15:00.000Z').getTime()
);
events = getSpanDestinationMetrics(
range
.interval('1m')
.rate(25)
.flatMap((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
.success()
.timestamp(timestamp)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.timestamp(timestamp)
.duration(1000)
.destination('elasticsearch')
.success()
)
.serialize()
)
.concat(
range
.interval('1m')
.rate(50)
.flatMap((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
.failure()
.timestamp(timestamp)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.timestamp(timestamp)
.duration(1000)
.destination('elasticsearch')
.failure(),
javaInstance
.span('custom_operation', 'app')
.timestamp(timestamp)
.duration(500)
.success()
)
.serialize()
)
)
new Date('2021-01-01T00:00:00.000Z'),
new Date('2021-01-01T00:15:00.000Z')
);
const processor = new StreamProcessor({ processors: [getSpanDestinationMetrics] });
events = processor
.streamToArray(
range
.interval('1m')
.rate(25)
.spans((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
.success()
.timestamp(timestamp)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.timestamp(timestamp)
.duration(1000)
.destination('elasticsearch')
.success()
)
.serialize()
),
range
.interval('1m')
.rate(50)
.spans((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
.failure()
.timestamp(timestamp)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.timestamp(timestamp)
.duration(1000)
.destination('elasticsearch')
.failure(),
javaInstance
.span('custom_operation', 'app')
.timestamp(timestamp)
.duration(500)
.success()
)
.serialize()
)
)
.filter((fields) => fields['metricset.name'] === 'span_destination');
});
it('generates the right amount of span metrics', () => {

View file

@ -8,8 +8,9 @@
import { sumBy } from 'lodash';
import { apm } from '../../lib/apm';
import { timerange } from '../../lib/timerange';
import { getBreakdownMetrics } from '../../lib/apm/utils/get_breakdown_metrics';
import { getBreakdownMetrics } from '../../lib/apm/processors/get_breakdown_metrics';
import { ApmFields } from '../../lib/apm/apm_fields';
import { StreamProcessor } from '../../lib/stream_processor';
describe('breakdown metrics', () => {
let events: ApmFields[];
@ -24,51 +25,58 @@ describe('breakdown metrics', () => {
const javaService = apm.service('opbeans-java', 'production', 'java');
const javaInstance = javaService.instance('instance-1');
const start = new Date('2021-01-01T00:00:00.000Z').getTime();
const start = new Date('2021-01-01T00:00:00.000Z');
const range = timerange(start, start + INTERVALS * 30 * 1000);
const range = timerange(start, new Date(start.getTime() + INTERVALS * 30 * 1000));
events = getBreakdownMetrics([
...range
.interval('30s')
.rate(LIST_RATE)
.flatMap((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.timestamp(timestamp)
.duration(1000)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.timestamp(timestamp + 150)
.duration(500),
javaInstance.span('GET foo', 'db', 'redis').timestamp(timestamp).duration(100)
)
.serialize()
),
...range
.interval('30s')
.rate(ID_RATE)
.flatMap((timestamp) =>
javaInstance
.transaction('GET /api/product/:id')
.timestamp(timestamp)
.duration(1000)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.duration(500)
.timestamp(timestamp + 100)
.children(
javaInstance
.span('bar', 'external', 'http')
.timestamp(timestamp + 200)
.duration(100)
)
)
.serialize()
),
]).filter((event) => event['processor.event'] === 'metric');
const listSpans = range
.interval('30s')
.rate(LIST_RATE)
.spans((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.timestamp(timestamp)
.duration(1000)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.timestamp(timestamp + 150)
.duration(500),
javaInstance.span('GET foo', 'db', 'redis').timestamp(timestamp).duration(100)
)
.serialize()
);
const productPageSpans = range
.interval('30s')
.rate(ID_RATE)
.spans((timestamp) =>
javaInstance
.transaction('GET /api/product/:id')
.timestamp(timestamp)
.duration(1000)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.duration(500)
.timestamp(timestamp + 100)
.children(
javaInstance
.span('bar', 'external', 'http')
.timestamp(timestamp + 200)
.duration(100)
)
)
.serialize()
);
const processor = new StreamProcessor({
processors: [getBreakdownMetrics],
flushInterval: '15m',
});
events = processor
.streamToArray(listSpans, productPageSpans)
.filter((event) => event['processor.event'] === 'metric');
});
it('generates the right amount of breakdown metrics', () => {

View file

@ -24,47 +24,43 @@ export function opbeans({ from, to }: { from: number; to: number }) {
apm.getChromeUserAgentDefaults()
);
return [
...range
.interval('1s')
.rate(1)
.flatMap((timestamp) => [
...opbeansJava
.transaction('GET /api/product')
.timestamp(timestamp)
.duration(1000)
.success()
.errors(
opbeansJava
.error('[MockError] Foo', `Exception`)
.timestamp(timestamp)
)
.children(
opbeansJava
.span('SELECT * FROM product', 'db', 'postgresql')
.timestamp(timestamp)
.duration(50)
.success()
.destination('postgresql')
)
.serialize(),
...opbeansNode
.transaction('GET /api/product/:id')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
...opbeansNode
.transaction('Worker job', 'Worker')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize(),
...opbeansRum
.transaction('/')
.timestamp(timestamp)
.duration(1000)
.serialize(),
]),
];
return range
.interval('1s')
.rate(1)
.spans((timestamp) => [
...opbeansJava
.transaction('GET /api/product')
.timestamp(timestamp)
.duration(1000)
.success()
.errors(
opbeansJava.error('[MockError] Foo', `Exception`).timestamp(timestamp)
)
.children(
opbeansJava
.span('SELECT * FROM product', 'db', 'postgresql')
.timestamp(timestamp)
.duration(50)
.success()
.destination('postgresql')
)
.serialize(),
...opbeansNode
.transaction('GET /api/product/:id')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
...opbeansNode
.transaction('Worker job', 'Worker')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize(),
...opbeansRum
.transaction('/')
.timestamp(timestamp)
.duration(1000)
.serialize(),
]);
}

View file

@ -18,28 +18,26 @@ export function generateData({ from, to }: { from: number; to: number }) {
.service('opbeans-node', 'production', 'nodejs')
.instance('opbeans-node-prod-1');
return [
...range
.interval('2m')
.rate(1)
.flatMap((timestamp, index) => [
...opbeansJava
.transaction('GET /apple 🍎 ')
.timestamp(timestamp)
.duration(1000)
.success()
.errors(
opbeansJava
.error(`Error ${index}`, `exception ${index}`)
.timestamp(timestamp)
)
.serialize(),
...opbeansNode
.transaction('GET /banana 🍌')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
]),
];
return range
.interval('2m')
.rate(1)
.spans((timestamp, index) => [
...opbeansJava
.transaction('GET /apple 🍎 ')
.timestamp(timestamp)
.duration(1000)
.success()
.errors(
opbeansJava
.error(`Error ${index}`, `exception ${index}`)
.timestamp(timestamp)
)
.serialize(),
...opbeansNode
.transaction('GET /banana 🍌')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
]);
}

View file

@ -26,23 +26,21 @@ export function generateData({
.service('opbeans-node', 'production', 'nodejs')
.instance('opbeans-node-prod-1');
return [
...range
.interval('2m')
.rate(1)
.flatMap((timestamp, index) => [
...service1
.transaction('GET /apple 🍎 ')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize(),
...opbeansNode
.transaction('GET /banana 🍌')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
]),
];
return range
.interval('2m')
.rate(1)
.spans((timestamp, index) => [
...service1
.transaction('GET /apple 🍎 ')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize(),
...opbeansNode
.transaction('GET /banana 🍌')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
]);
}

View file

@ -24,8 +24,8 @@ export function generateData({ start, end }: { start: number; end: number }) {
const traceEvents = timerange(start, end)
.interval('1m')
.rate(rate)
.flatMap((timestamp) => [
...instance
.spans((timestamp) =>
instance
.transaction(transaction.name)
.defaults({
'service.runtime.name': 'AWS_Lambda_python3.8',
@ -34,8 +34,8 @@ export function generateData({ start, end }: { start: number; end: number }) {
.timestamp(timestamp)
.duration(transaction.duration)
.success()
.serialize(),
]);
.serialize()
);
return traceEvents;
}

View file

@ -4,7 +4,12 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { apm, createLogger, LogLevel } from '@elastic/apm-synthtrace';
import {
apm,
createLogger,
LogLevel,
SpanIterable,
} from '@elastic/apm-synthtrace';
import { createEsClientForTesting } from '@kbn/test';
// ***********************************************************
@ -33,13 +38,15 @@ const plugin: Cypress.PluginConfig = (on, config) => {
isCloud: !!config.env.TEST_CLOUD,
});
const forceDataStreams = false;
const synthtraceEsClient = new apm.ApmSynthtraceEsClient(
client,
createLogger(LogLevel.info)
createLogger(LogLevel.info),
forceDataStreams
);
on('task', {
'synthtrace:index': async (events) => {
'synthtrace:index': async (events: SpanIterable) => {
await synthtraceEsClient.index(events);
return null;
},

View file

@ -4,9 +4,10 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SpanIterable } from '@elastic/apm-synthtrace';
export const synthtrace = {
index: (events: any[]) =>
index: (events: SpanIterable) =>
new Promise((resolve) => {
cy.task('synthtrace:index', events).then(resolve);
}),

View file

@ -11,5 +11,6 @@ import { InheritedFtrProviderContext } from './ftr_provider_context';
export async function synthtraceEsClientService(context: InheritedFtrProviderContext) {
const es = context.getService('es');
return new apm.ApmSynthtraceEsClient(es, createLogger(LogLevel.info));
const forceDataStreams = false;
return new apm.ApmSynthtraceEsClient(es, createLogger(LogLevel.info), forceDataStreams);
}

View file

@ -104,7 +104,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
const events = timerange(new Date(start).getTime(), new Date(end).getTime())
.interval('1m')
.rate(1)
.flatMap((timestamp) => {
.spans((timestamp) => {
const isInSpike = timestamp >= spikeStart && timestamp < spikeEnd;
const count = isInSpike ? 4 : NORMAL_RATE;
const duration = isInSpike ? 1000 : NORMAL_DURATION;

View file

@ -29,36 +29,36 @@ export async function generateData({
const { transactionName, duration, serviceName } = dataConfig;
const instance = apm.service(serviceName, 'production', 'go').instance('instance-a');
const traceEvents = [
...timerange(start, end)
.interval('1m')
.rate(coldStartRate)
.flatMap((timestamp) => [
...instance
.transaction(transactionName)
.defaults({
'faas.coldstart': true,
})
.timestamp(timestamp)
.duration(duration)
.success()
.serialize(),
]),
...timerange(start, end)
.interval('1m')
.rate(warmStartRate)
.flatMap((timestamp) => [
...instance
.transaction(transactionName)
.defaults({
'faas.coldstart': false,
})
.timestamp(timestamp)
.duration(duration)
.success()
.serialize(),
]),
];
const traceEvents = timerange(start, end)
.interval('1m')
.rate(coldStartRate)
.spans((timestamp) =>
instance
.transaction(transactionName)
.defaults({
'faas.coldstart': true,
})
.timestamp(timestamp)
.duration(duration)
.success()
.serialize()
)
.concat(
timerange(start, end)
.interval('1m')
.rate(warmStartRate)
.spans((timestamp) =>
instance
.transaction(transactionName)
.defaults({
'faas.coldstart': false,
})
.timestamp(timestamp)
.duration(duration)
.success()
.serialize()
)
);
await synthtraceEsClient.index(traceEvents);
}

View file

@ -36,11 +36,11 @@ export async function generateData({
const instance = apm.service(serviceName, 'production', 'go').instance('instance-a');
const traceEvents = [
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(coldStartRate)
.flatMap((timestamp) => [
...instance
.spans((timestamp) =>
instance
.transaction(coldStartTransaction.name)
.defaults({
'faas.coldstart': true,
@ -48,13 +48,13 @@ export async function generateData({
.timestamp(timestamp)
.duration(coldStartTransaction.duration)
.success()
.serialize(),
]),
...timerange(start, end)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(warmStartRate)
.flatMap((timestamp) => [
...instance
.spans((timestamp) =>
instance
.transaction(warmStartTransaction.name)
.defaults({
'faas.coldstart': false,
@ -62,8 +62,8 @@ export async function generateData({
.timestamp(timestamp)
.duration(warmStartTransaction.duration)
.success()
.serialize(),
]),
.serialize()
),
];
await synthtraceEsClient.index(traceEvents);

View file

@ -139,11 +139,11 @@ function generateApmData(synthtrace: ApmSynthtraceEsClient) {
const instance = apm.service('multiple-env-service', 'production', 'go').instance('my-instance');
return synthtrace.index([
...range
range
.interval('1s')
.rate(1)
.flatMap((timestamp) => [
...instance.transaction('GET /api').timestamp(timestamp).duration(30).success().serialize(),
]),
.spans((timestamp) =>
instance.transaction('GET /api').timestamp(timestamp).duration(30).success().serialize()
),
]);
}

View file

@ -37,7 +37,7 @@ export async function generateData({
timerange(start, end)
.interval('1m')
.rate(rate)
.flatMap((timestamp) =>
.spans((timestamp) =>
instance
.transaction(transaction.name)
.timestamp(timestamp)

View file

@ -128,10 +128,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
const transactionNameProductId = 'GET /api/product/:id';
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.timestamp(timestamp)
@ -139,10 +139,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_ERROR_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.duration(1000)
@ -150,10 +150,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.failure()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.timestamp(timestamp)
@ -161,10 +161,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_ERROR_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.duration(1000)

View file

@ -77,10 +77,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
const transactionNameProductId = 'GET /api/product/:id';
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList, 'Worker')
.timestamp(timestamp)
@ -88,10 +88,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_ERROR_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList, 'Worker')
.duration(1000)
@ -99,10 +99,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.failure()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.timestamp(timestamp)
@ -110,10 +110,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_ERROR_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.duration(1000)

View file

@ -76,10 +76,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-a');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(appleTransaction.successRate)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceInstance
.transaction(appleTransaction.name)
.timestamp(timestamp)
@ -87,10 +87,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(appleTransaction.failureRate)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceInstance
.transaction(appleTransaction.name)
.errors(serviceInstance.error('error 1', 'foo').timestamp(timestamp))
@ -99,10 +99,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.failure()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(bananaTransaction.successRate)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceInstance
.transaction(bananaTransaction.name)
.timestamp(timestamp)
@ -110,10 +110,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(bananaTransaction.failureRate)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceInstance
.transaction(bananaTransaction.name)
.errors(serviceInstance.error('error 2', 'bar').timestamp(timestamp))

View file

@ -37,24 +37,23 @@ export async function generateData({
const { bananaTransaction, appleTransaction } = config;
const documents = [appleTransaction, bananaTransaction]
.map((transaction, index) => {
return [
...timerange(start, end)
.interval(interval)
.rate(transaction.successRate)
.flatMap((timestamp) =>
serviceGoProdInstance
.transaction(transaction.name)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
...timerange(start, end)
const documents = [appleTransaction, bananaTransaction].map((transaction, index) => {
return timerange(start, end)
.interval(interval)
.rate(transaction.successRate)
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transaction.name)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
)
.concat(
timerange(start, end)
.interval(interval)
.rate(transaction.failureRate)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transaction.name)
.errors(
@ -64,10 +63,9 @@ export async function generateData({
.timestamp(timestamp)
.failure()
.serialize()
),
];
})
.flatMap((_) => _);
)
);
});
await synthtraceEsClient.index(documents);
}

View file

@ -130,20 +130,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-b');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(GO_PROD_DURATION)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(GO_DEV_DURATION)

View file

@ -76,20 +76,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-b');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list', 'Worker')
.duration(GO_PROD_DURATION)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(GO_DEV_DURATION)

View file

@ -103,30 +103,30 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-c');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(JAVA_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceJavaInstance
.transaction('POST /api/product/buy')
.duration(1000)

View file

@ -58,7 +58,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(1)
.flatMap((timestamp) =>
.spans((timestamp) =>
instance
.appMetrics({
'system.process.cpu.total.norm.pct': 1,

View file

@ -321,7 +321,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
}
return synthtraceEsClient.index([
...interval.rate(GO_A_INSTANCE_RATE_SUCCESS).flatMap((timestamp) =>
interval.rate(GO_A_INSTANCE_RATE_SUCCESS).spans((timestamp) =>
goInstanceA
.transaction('GET /api/product/list')
.success()
@ -330,7 +330,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.children(...withSpans(timestamp))
.serialize()
),
...interval.rate(GO_A_INSTANCE_RATE_FAILURE).flatMap((timestamp) =>
interval.rate(GO_A_INSTANCE_RATE_FAILURE).spans((timestamp) =>
goInstanceA
.transaction('GET /api/product/list')
.failure()
@ -339,7 +339,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.children(...withSpans(timestamp))
.serialize()
),
...interval.rate(GO_B_INSTANCE_RATE_SUCCESS).flatMap((timestamp) =>
interval.rate(GO_B_INSTANCE_RATE_SUCCESS).spans((timestamp) =>
goInstanceB
.transaction('GET /api/product/list')
.success()
@ -348,7 +348,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.children(...withSpans(timestamp))
.serialize()
),
...interval.rate(JAVA_INSTANCE_RATE).flatMap((timestamp) =>
interval.rate(JAVA_INSTANCE_RATE).spans((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.success()

View file

@ -42,10 +42,10 @@ export async function generateData({
} = config;
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(PROD_LIST_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.timestamp(timestamp)
@ -53,10 +53,10 @@ export async function generateData({
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(PROD_LIST_ERROR_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.errors(serviceGoProdInstance.error(ERROR_NAME_1, 'foo').timestamp(timestamp))
@ -65,10 +65,10 @@ export async function generateData({
.failure()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(PROD_ID_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.timestamp(timestamp)
@ -76,10 +76,10 @@ export async function generateData({
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(PROD_ID_ERROR_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.errors(serviceGoProdInstance.error(ERROR_NAME_2, 'bar').timestamp(timestamp))

View file

@ -62,7 +62,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(1)
.flatMap((timestamp) =>
.spans((timestamp) =>
instance
.transaction('GET /api/product/list')
.timestamp(timestamp)

View file

@ -68,10 +68,10 @@ export async function generateData({
const instance = apm.service(serviceName, 'production', agentName).instance('instance-a');
const traceEvents = [
...timerange(start, end)
timerange(start, end)
.interval('30s')
.rate(rate)
.flatMap((timestamp) =>
.spans((timestamp) =>
instance
.transaction(transaction.name)
.timestamp(timestamp)
@ -96,10 +96,10 @@ export async function generateData({
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('30s')
.rate(rate)
.flatMap((timestamp) =>
.spans((timestamp) =>
instance
.transaction(transaction.name)
.timestamp(timestamp)

View file

@ -38,7 +38,7 @@ export async function generateData({
const traceEvents = timerange(start, end)
.interval('30s')
.rate(rate)
.flatMap((timestamp) =>
.spans((timestamp) =>
instance
.transaction(transaction.name)
.defaults({

View file

@ -85,30 +85,30 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-c');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(JAVA_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceJavaInstance
.transaction('POST /api/product/buy')
.duration(1000)

View file

@ -96,9 +96,9 @@ export default function ApiTest({ getService }: FtrProviderContext) {
before(async () => {
return synthtrace.index([
...transactionInterval
transactionInterval
.rate(config.multiple.prod.rps)
.flatMap((timestamp) => [
.spans((timestamp) => [
...multipleEnvServiceProdInstance
.transaction('GET /api')
.timestamp(timestamp)
@ -106,9 +106,9 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize(),
]),
...transactionInterval
transactionInterval
.rate(config.multiple.dev.rps)
.flatMap((timestamp) => [
.spans((timestamp) => [
...multipleEnvServiceDevInstance
.transaction('GET /api')
.timestamp(timestamp)
@ -116,9 +116,9 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.failure()
.serialize(),
]),
...transactionInterval
transactionInterval
.rate(config.multiple.prod.rps)
.flatMap((timestamp) => [
.spans((timestamp) => [
...multipleEnvServiceDevInstance
.transaction('non-request', 'rpc')
.timestamp(timestamp)
@ -126,7 +126,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize(),
]),
...metricInterval.rate(1).flatMap((timestamp) => [
metricInterval.rate(1).spans((timestamp) => [
...metricOnlyInstance
.appMetrics({
'system.memory.actual.free': 1,
@ -137,9 +137,9 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.timestamp(timestamp)
.serialize(),
]),
...errorInterval
errorInterval
.rate(1)
.flatMap((timestamp) => [
.spans((timestamp) => [
...errorOnlyInstance.error('Foo').timestamp(timestamp).serialize(),
]),
]);

View file

@ -101,10 +101,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-c');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
@ -132,10 +132,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(JAVA_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceJavaInstance
.transaction('POST /api/product/buy')
.duration(1000)

View file

@ -116,20 +116,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-b');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)

View file

@ -76,20 +76,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.instance('instance-b');
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list', 'Worker')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)

View file

@ -91,10 +91,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
const transactionName = 'GET /api/product/list';
await synthtraceEsClient.index([
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionName)
.timestamp(timestamp)
@ -102,10 +102,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.serialize()
),
...timerange(start, end)
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ERROR_RATE)
.flatMap((timestamp) =>
.spans((timestamp) =>
serviceGoProdInstance
.transaction(transactionName)
.duration(1000)