Further improvements to synthtrace DSL and reintroducing workers to the CLI tool. (#127257)

Co-authored-by: Martijn Laarman <ml@elastic.co>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Martijn Laarman 2022-04-14 15:49:45 +02:00 committed by GitHub
parent f9dd12937b
commit 708d6d0778
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
72 changed files with 1619 additions and 659 deletions

View file

@ -33,6 +33,7 @@ RUNTIME_DEPS = [
"@npm//p-limit",
"@npm//yargs",
"@npm//node-fetch",
"@npm//semver",
]
TYPES_DEPS = [
@ -45,6 +46,7 @@ TYPES_DEPS = [
"@npm//moment",
"@npm//p-limit",
"@npm//@types/node-fetch",
"@npm//@types/semver",
]
jsts_transpiler(

View file

@ -102,16 +102,46 @@ The script will try to automatically find bootstrapped APM indices. **If these i
The following options are supported:
| Option | Description | Default |
| ----------------- | -------------------------------------------------- | ------------ |
| `--target` | Elasticsearch target, including username/password. | **Required** |
| `--from` | The start of the time window. | `now - 15m` |
| `--to` | The end of the time window. | `now` |
| `--live` | Continously ingest data | `false` |
| `--clean` | Clean APM indices before indexing new data. | `false` |
| `--workers` | Amount of Node.js worker threads | `5` |
| `--bucketSize` | Size of bucket for which to generate data. | `15m` |
| `--interval` | The interval at which to index data. | `10s` |
| `--clientWorkers` | Number of simultaneously connected ES clients | `5` |
| `--batchSize` | Number of documents per bulk index request | `1000` |
| `--logLevel` | Log level. | `info` |
### Connection options
| Option | Type | Default | Description |
|------------------------|-----------|:-----------|--------------------------------------------------------------------------------------------------------------------------------------------|
| `--target` | [string] | | Elasticsearch target |
| `--kibana` | [string] | | Kibana target, used to bootstrap datastreams/mappings/templates/settings |
| `--cloudId` | [string] | | Provide connection information and will force APM on the cloud to migrate to run as a Fleet integration |
| `--local` | [boolean] | | Shortcut during development, assumes `yarn es snapshot` and `yarn start` are running |
| `--username` | [string] | `elastic` | Basic authentication username |
| `--password` | [string] | `changeme` | Basic authentication password |
Note:
- If you only specify `--target` Synthtrace can not automatically setup APM.
- If you specify both `--target` and `--kibana` the tool will automatically attempt to install the appropriate APM package
- For Cloud its easiest to specify `--cloudId` as it will unpack the ES/Kibana targets and migrate cloud over to managed APM automatically.
- If you only specify `--kibana` and it's using a cloud hostname a very naive `--target` to Elasticsearch will be inferred.
### Scenario options
| Option | Type | Default | Description |
|------------------------|-----------|:--------|--------------------------------------------------------------------------------------------------------------------------------------------|
| `--from` | [date] | `now()` | The start of the time window |
| `--to` | [date] | | The end of the time window |
| `--maxDocs` | [number] | | The maximum number of documents we are allowed to generate |
| `--maxDocsConfidence` | [number] | `1` | Expert setting: --maxDocs relies on accurate tpm reporting of generators setting this to >1 will widen the estimated data generation range |
| `--live` | [boolean] | | Generate and index data continuously |
| `--dryRun` | [boolean] | | Enumerates the stream without sending events to Elasticsearch |
| `--scenarioOpts` | | | Raw options specific to the scenario |
| `--forceLegacyIndices` | [boolean] | `false` | Force writing to legacy indices |
Note:
- The default `--to` is `15m` unless `--maxDocs` is specified in which case `--to` is calculated based on the scenario's TPM.
- You can combine `--from` `--maxDocs` and `--to` with `--live` to back-fill some data.
### Setup options
| Option | Type | Default | Description |
|------------------------|-----------|:-----------|---------------------------------------------------------------------------------------------------------|
| `--numShards` | [number] | | Updates the component templates to update the number of primary shards, requires cloudId to be provided |
| `--clean` | [boolean] | `false` | Clean APM data before indexing new data |
| `--workers` | [number] | | Amount of Node.js worker threads |
| `--logLevel` | [enum] | `info` | Log level |
| `--gcpRepository` | [string] | | Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format |

View file

@ -13,6 +13,7 @@ export { cleanWriteTargets } from './lib/utils/clean_write_targets';
export { createLogger, LogLevel } from './lib/utils/create_logger';
export type { Fields } from './lib/entity';
export type { ApmFields } from './lib/apm/apm_fields';
export type { ApmException, ApmSynthtraceEsClient } from './lib/apm';
export type { SpanIterable } from './lib/span_iterable';
export { SpanArrayIterable } from './lib/span_iterable';
export type { EntityIterable } from './lib/entity_iterable';
export { EntityArrayIterable } from './lib/entity_iterable';

View file

@ -11,47 +11,71 @@ import { cleanWriteTargets } from '../../utils/clean_write_targets';
import { getApmWriteTargets } from '../utils/get_apm_write_targets';
import { Logger } from '../../utils/create_logger';
import { ApmFields } from '../apm_fields';
import { SpanIterable } from '../../span_iterable';
import { EntityIterable } from '../../entity_iterable';
import { StreamProcessor } from '../../stream_processor';
import { SpanGeneratorsUnion } from '../../span_generators_union';
import { EntityStreams } from '../../entity_streams';
import { Fields } from '../../entity';
export interface StreamToBulkOptions {
export interface StreamToBulkOptions<TFields extends Fields = ApmFields> {
concurrency?: number;
// the maximum number of documents to process
maxDocs?: number;
// the number of documents to flush the bulk operation defaults to 10k
flushInterval?: number;
mapToIndex?: (document: Record<string, any>) => string;
dryRun: boolean;
itemStartStopCallback?: (item: TFields | null, done: boolean) => void;
}
export interface ApmSynthtraceEsClientOptions {
forceLegacyIndices?: boolean;
// defaults to true if unspecified
refreshAfterIndex?: boolean;
}
export class ApmSynthtraceEsClient {
private readonly forceLegacyIndices: boolean;
private readonly refreshAfterIndex: boolean;
constructor(
private readonly client: Client,
private readonly logger: Logger,
private readonly forceDataStreams: boolean
) {}
private getWriteTargets() {
return getApmWriteTargets({ client: this.client, forceDataStreams: this.forceDataStreams });
options?: ApmSynthtraceEsClientOptions
) {
this.forceLegacyIndices = options?.forceLegacyIndices ?? false;
this.refreshAfterIndex = options?.refreshAfterIndex ?? true;
}
clean() {
private getWriteTargets() {
return getApmWriteTargets({
client: this.client,
forceLegacyIndices: this.forceLegacyIndices,
});
}
async runningVersion() {
const info = await this.client.info();
return info.version.number;
}
async clean() {
return this.getWriteTargets().then(async (writeTargets) => {
const indices = Object.values(writeTargets);
this.logger.info(`Attempting to clean: ${indices}`);
if (this.forceDataStreams) {
for (const name of indices) {
const dataStream = await this.client.indices.getDataStream({ name }, { ignore: [404] });
if (dataStream.data_streams && dataStream.data_streams.length > 0) {
this.logger.debug(`Deleting datastream: ${name}`);
await this.client.indices.deleteDataStream({ name });
}
}
return;
if (this.forceLegacyIndices) {
return cleanWriteTargets({
client: this.client,
targets: indices,
logger: this.logger,
});
}
return cleanWriteTargets({
client: this.client,
targets: indices,
logger: this.logger,
});
for (const name of indices) {
const dataStream = await this.client.indices.getDataStream({ name }, { ignore: [404] });
if (dataStream.data_streams && dataStream.data_streams.length > 0) {
this.logger.debug(`Deleting datastream: ${name}`);
await this.client.indices.deleteDataStream({ name });
}
}
return;
});
}
@ -76,38 +100,38 @@ export class ApmSynthtraceEsClient {
}
}
async index(events: SpanIterable | SpanIterable[], options?: StreamToBulkOptions) {
const dataStream = Array.isArray(events) ? new SpanGeneratorsUnion(events) : events;
async registerGcpRepository(connectionString: string) {
// <client_name>:<bucket>[:base_path]
const [clientName, bucket, basePath] = connectionString.split(':');
if (!clientName)
throw new Error(
`client name is mandatory for gcp repostitory registration: ${connectionString}`
);
if (!bucket)
throw new Error(`bucket is mandatory for gcp repostitory registration: ${connectionString}`);
const writeTargets = await this.getWriteTargets();
// TODO logger.perf
await this.client.helpers.bulk<ApmFields>({
concurrency: options?.concurrency ?? 10,
refresh: false,
refreshOnCompletion: false,
datasource: new StreamProcessor({
processors: StreamProcessor.apmProcessors,
maxSourceEvents: options?.maxDocs,
logger: this.logger,
})
// TODO https://github.com/elastic/elasticsearch-js/issues/1610
// having to map here is awkward, it'd be better to map just before serialization.
.streamToDocumentAsync(StreamProcessor.toDocument, dataStream),
onDrop: (doc) => {
this.logger.info(doc);
},
// TODO bug in client not passing generic to BulkHelperOptions<>
// https://github.com/elastic/elasticsearch-js/issues/1611
onDocument: (doc: unknown) => {
const d = doc as Record<string, any>;
const index = options?.mapToIndex
? options?.mapToIndex(d)
: this.forceDataStreams
? StreamProcessor.getDataStreamForEvent(d, writeTargets)
: StreamProcessor.getIndexForEvent(d, writeTargets);
return { create: { _index: index } };
const name = `gcp-repository-${clientName}`;
this.logger.info(`Registering gcp repository ${name}`);
const putRepository = await this.client.snapshot.createRepository({
name,
type: 'gcs',
settings: {
// @ts-ignore
// missing from es types
bucket,
client: clientName,
base_path: basePath,
},
});
this.logger.info(putRepository);
this.logger.info(`Verifying gcp repository ${name}`);
const verifyRepository = await this.client.snapshot.verifyRepository({ name });
this.logger.info(verifyRepository);
}
async refresh() {
const writeTargets = await this.getWriteTargets();
const indices = Object.values(writeTargets);
this.logger.info(`Indexed all data attempting to refresh: ${indices}`);
@ -118,4 +142,73 @@ export class ApmSynthtraceEsClient {
ignore_unavailable: true,
});
}
async index<TFields>(
events: EntityIterable<TFields> | Array<EntityIterable<TFields>>,
options?: StreamToBulkOptions,
streamProcessor?: StreamProcessor
) {
const dataStream = Array.isArray(events) ? new EntityStreams(events) : events;
const sp =
streamProcessor != null
? streamProcessor
: new StreamProcessor({
processors: StreamProcessor.apmProcessors,
maxSourceEvents: options?.maxDocs,
logger: this.logger,
});
let item: Record<any, any> | null = null;
let yielded = 0;
if (options?.dryRun) {
await this.logger.perf('enumerate_scenario', async () => {
// @ts-ignore
// We just want to enumerate
for await (item of sp.streamToDocumentAsync(sp.toDocument, dataStream)) {
if (yielded === 0) {
options.itemStartStopCallback?.apply(this, [item, false]);
yielded++;
}
}
options.itemStartStopCallback?.apply(this, [item, true]);
});
return;
}
const writeTargets = await this.getWriteTargets();
// TODO logger.perf
await this.client.helpers.bulk<ApmFields>({
concurrency: options?.concurrency ?? 10,
refresh: false,
refreshOnCompletion: false,
flushBytes: 500000,
// TODO https://github.com/elastic/elasticsearch-js/issues/1610
// having to map here is awkward, it'd be better to map just before serialization.
datasource: sp.streamToDocumentAsync(sp.toDocument, dataStream),
onDrop: (doc) => {
this.logger.info(JSON.stringify(doc, null, 2));
},
// TODO bug in client not passing generic to BulkHelperOptions<>
// https://github.com/elastic/elasticsearch-js/issues/1611
onDocument: (doc: unknown) => {
item = doc as Record<string, any>;
if (yielded === 0) {
options?.itemStartStopCallback?.apply(this, [item, false]);
yielded++;
}
const index = options?.mapToIndex
? options?.mapToIndex(item)
: !this.forceLegacyIndices
? StreamProcessor.getDataStreamForEvent(item, writeTargets)
: StreamProcessor.getIndexForEvent(item, writeTargets);
return { create: { _index: index } };
},
});
options?.itemStartStopCallback?.apply(this, [item, true]);
if (this.refreshAfterIndex) {
await this.refresh();
}
}
}

View file

@ -7,6 +7,7 @@
*/
import fetch from 'node-fetch';
import Semver from 'semver';
import { Logger } from '../../utils/create_logger';
export class ApmSynthtraceKibanaClient {
@ -40,4 +41,54 @@ export class ApmSynthtraceKibanaClient {
}
});
}
async discoverLocalKibana() {
return await fetch('http://localhost:5601', {
method: 'HEAD',
follow: 1,
redirect: 'manual',
}).then((res) => {
const kibanaUrl = res.headers.get('location');
this.logger.info(`Discovered local kibana running at: ${kibanaUrl}`);
return kibanaUrl;
});
}
async fetchLatestApmPackageVersion(version: string) {
const url =
'https://epr-snapshot.elastic.co/search?package=apm&prerelease=true&all=true&kibana.version=';
const response = await fetch(url + version, { method: 'GET' });
const json = await response.json();
if (!Array.isArray(json)) {
throw new Error('Could not locate apm package compatible with the current kibana version');
}
const versions = json
.map<string>((item) => item.version)
.filter((v) => Semver.valid(v))
.sort(Semver.rcompare);
if (versions.length === 0) {
throw new Error('Could not locate apm package compatible with the current kibana version');
}
return versions[0];
}
async installApmPackage(kibanaUrl: string, version: string, username: string, password: string) {
const packageVersion = await this.fetchLatestApmPackageVersion(version);
const response = await fetch(kibanaUrl + '/api/fleet/epm/packages/apm/' + packageVersion, {
method: 'POST',
headers: {
Authorization: 'Basic ' + Buffer.from(username + ':' + password).toString('base64'),
Accept: 'application/json',
'Content-Type': 'application/json',
'kbn-xsrf': 'kibana',
},
body: '{"force":true}',
});
const responseJson = await response.json();
if (responseJson.statusCode) {
throw Error(`unable to install apm package ${packageVersion}`);
}
if (responseJson.items) {
this.logger.info(`Installed apm package ${packageVersion}`);
} else this.logger.error(responseJson);
}
}

View file

@ -18,12 +18,12 @@ export interface ApmElasticsearchOutputWriteTargets {
export async function getApmWriteTargets({
client,
forceDataStreams,
forceLegacyIndices,
}: {
client: Client;
forceDataStreams?: boolean;
forceLegacyIndices?: boolean;
}): Promise<ApmElasticsearchOutputWriteTargets> {
if (forceDataStreams) {
if (!forceLegacyIndices) {
return {
transaction: 'traces-apm-default',
span: 'traces-apm-default',

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Interval } from './interval';
import { EntityStreams } from './entity_streams';
import { EntityIterable } from './entity_iterable';
import { Serializable } from './serializable';
export class EntityGenerator<TField> implements EntityIterable<TField> {
private readonly _gen: () => Generator<Serializable<TField>>;
constructor(
private readonly interval: Interval,
dataGenerator: (interval: Interval) => Generator<Serializable<TField>>
) {
this._order = interval.from > interval.to ? 'desc' : 'asc';
const generator = dataGenerator(this.interval);
const peek = generator.next();
const value = peek.value;
let callCount = 0;
this._gen = function* () {
if (callCount === 0) {
callCount++;
yield value;
yield* generator;
} else {
yield* dataGenerator(this.interval);
}
};
const peekedNumberOfEvents = peek.done ? 0 : peek.value.serialize().length;
this._ratePerMinute = interval.ratePerMinute() * peekedNumberOfEvents;
}
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
toArray(): TField[] {
return Array.from(this);
}
merge(...iterables: Array<EntityIterable<TField>>): EntityStreams<TField> {
return new EntityStreams([this, ...iterables]);
}
private readonly _ratePerMinute: number;
ratePerMinute() {
return this._ratePerMinute;
}
*[Symbol.iterator]() {
for (const span of this._gen()) {
for (const fields of span.serialize()) {
yield fields;
}
}
}
async *[Symbol.asyncIterator]() {
for (const span of this._gen()) {
for (const fields of span.serialize()) {
yield fields;
}
}
}
}

View file

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ApmFields } from './apm/apm_fields';
import { EntityStreams } from './entity_streams';
import { Fields } from './entity';
export interface EntityIterable<TFields extends Fields = ApmFields>
extends Iterable<TFields>,
AsyncIterable<TFields> {
order(): 'desc' | 'asc';
ratePerMinute(): number;
toArray(): ApmFields[];
merge(...iterables: Array<EntityIterable<TFields>>): EntityStreams<TFields>;
}
export class EntityArrayIterable<TFields extends Fields = ApmFields>
implements EntityIterable<TFields>
{
constructor(private fields: TFields[]) {
const timestamps = fields.filter((f) => f['@timestamp']).map((f) => f['@timestamp']!);
this._order = timestamps.length > 1 ? (timestamps[0] > timestamps[1] ? 'desc' : 'asc') : 'asc';
const sorted = timestamps.sort();
const [first, last] = [sorted[0], sorted.slice(-1)[0]];
const numberOfMinutes = Math.ceil(Math.abs(last - first) / (1000 * 60)) % 60;
this._ratePerMinute = sorted.length / numberOfMinutes;
}
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
private readonly _ratePerMinute: number;
ratePerMinute() {
return this._ratePerMinute;
}
async *[Symbol.asyncIterator](): AsyncIterator<TFields> {
return this.fields[Symbol.iterator]();
}
[Symbol.iterator](): Iterator<TFields> {
return this.fields[Symbol.iterator]();
}
merge(...iterables: Array<EntityIterable<TFields>>): EntityStreams<TFields> {
return new EntityStreams<TFields>([this, ...iterables]);
}
toArray(): TFields[] {
return this.fields;
}
}

View file

@ -6,44 +6,47 @@
* Side Public License, v 1.
*/
import { ApmFields } from './apm/apm_fields';
import { SpanIterable } from './span_iterable';
import { EntityIterable } from './entity_iterable';
import { merge } from './utils/merge_iterable';
export class SpanGeneratorsUnion implements SpanIterable {
constructor(private readonly dataGenerators: SpanIterable[]) {
export class EntityStreams<TFields> implements EntityIterable<TFields> {
constructor(private readonly dataGenerators: Array<EntityIterable<TFields>>) {
const orders = new Set<'desc' | 'asc'>(dataGenerators.map((d) => d.order()));
if (orders.size > 1) throw Error('Can only combine intervals with the same order()');
this._order = orders.has('asc') ? 'asc' : 'desc';
}
static empty: SpanGeneratorsUnion = new SpanGeneratorsUnion([]);
this._ratePerMinute = dataGenerators.map((d) => d.ratePerMinute()).reduce((a, b) => a + b, 0);
}
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
toArray(): ApmFields[] {
private readonly _ratePerMinute: number;
ratePerMinute() {
return this._ratePerMinute;
}
toArray(): TFields[] {
return Array.from(this);
}
concat(...iterables: SpanIterable[]) {
return new SpanGeneratorsUnion([...this.dataGenerators, ...iterables]);
merge(...iterables: Array<EntityIterable<TFields>>): EntityStreams<TFields> {
return new EntityStreams([...this.dataGenerators, ...iterables]);
}
*[Symbol.iterator]() {
*[Symbol.iterator](): Iterator<TFields> {
const iterator = merge(this.dataGenerators);
for (const fields of iterator) {
yield fields;
}
}
async *[Symbol.asyncIterator]() {
for (const iterator of this.dataGenerators) {
for (const fields of iterator) {
yield fields;
}
async *[Symbol.asyncIterator](): AsyncIterator<TFields> {
const iterator = merge(this.dataGenerators);
for await (const fields of iterator) {
yield fields;
}
}
}

View file

@ -5,12 +5,13 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import { ApmFields } from './apm/apm_fields';
import { SpanIterable } from './span_iterable';
import { SpanGenerator } from './span_generator';
import moment, { unitOfTime } from 'moment';
import { random } from 'lodash';
import { EntityIterable } from './entity_iterable';
import { EntityGenerator } from './entity_generator';
import { Serializable } from './serializable';
export function parseInterval(interval: string): [number, string] {
export function parseInterval(interval: string): [number, unitOfTime.DurationConstructor] {
const args = interval.match(/(\d+)(s|m|h|d)/);
if (!args || args.length < 3) {
throw new Error('Failed to parse interval');
@ -18,38 +19,71 @@ export function parseInterval(interval: string): [number, string] {
return [Number(args[1]), args[2] as any];
}
export interface IntervalOptions {
from: Date;
to: Date;
interval: string;
yieldRate?: number;
intervalUpper?: number;
rateUpper?: number;
}
export class Interval implements Iterable<number> {
constructor(
public readonly from: Date,
public readonly to: Date,
public readonly interval: string,
public readonly yieldRate: number = 1
) {
[this.intervalAmount, this.intervalUnit] = parseInterval(interval);
constructor(public readonly options: IntervalOptions) {
const parsed = parseInterval(options.interval);
this.intervalAmount = parsed[0];
this.intervalUnit = parsed[1];
this.from = this.options.from;
this.to = this.options.to;
}
public readonly from: Date;
public readonly to: Date;
private readonly intervalAmount: number;
private readonly intervalUnit: any;
spans(map: (timestamp: number, index?: number) => ApmFields[]): SpanIterable {
return new SpanGenerator(this, [
function* (i) {
let index = 0;
for (const x of i) {
for (const a of map(x, index)) {
yield a;
index++;
}
private readonly intervalUnit: unitOfTime.DurationConstructor;
generator<TField>(
map: (timestamp: number, index?: number) => Serializable<TField> | Array<Serializable<TField>>
): EntityIterable<TField> {
return new EntityGenerator(this, function* (i) {
let index = 0;
for (const x of i) {
const data = map(x, index);
if (Array.isArray(data)) {
yield* data;
} else {
yield data;
}
},
]);
index++;
}
});
}
rate(rate: number): Interval {
return new Interval({ ...this.options, yieldRate: rate });
}
rate(rate: number): Interval {
return new Interval(this.from, this.to, this.interval, rate);
randomize(rateUpper: number, intervalUpper: number): Interval {
return new Interval({ ...this.options, intervalUpper, rateUpper });
}
ratePerMinute(): number {
const rate = this.options.rateUpper
? Math.max(1, this.options.rateUpper)
: this.options.yieldRate ?? 1;
const interval = this.options.intervalUpper ? this.options.intervalUpper : this.intervalAmount;
const first = moment();
const last = moment(first).subtract(interval, this.intervalUnit);
const numberOfMinutes =
(Math.abs(last.toDate().getTime() - first.toDate().getTime()) / (1000 * 60)) % 60;
return rate / numberOfMinutes;
}
private yieldRateTimestamps(timestamp: number) {
return new Array<number>(this.yieldRate).fill(timestamp);
const rate = this.options.rateUpper
? random(this.options.yieldRate ?? 1, Math.max(1, this.options.rateUpper))
: this.options.yieldRate ?? 1;
return new Array<number>(rate).fill(timestamp);
}
private *_generate(): Iterable<number> {
@ -57,17 +91,25 @@ export class Interval implements Iterable<number> {
let now = this.from;
do {
yield* this.yieldRateTimestamps(now.getTime());
now = new Date(moment(now).subtract(this.intervalAmount, this.intervalUnit).valueOf());
const amount = this.interval();
now = new Date(moment(now).subtract(amount, this.intervalUnit).valueOf());
} while (now > this.to);
} else {
let now = this.from;
do {
yield* this.yieldRateTimestamps(now.getTime());
now = new Date(moment(now).add(this.intervalAmount, this.intervalUnit).valueOf());
const amount = this.interval();
now = new Date(moment(now).add(amount, this.intervalUnit).valueOf());
} while (now < this.to);
}
}
private interval() {
return this.options.intervalUpper
? random(this.intervalAmount, this.options.intervalUpper)
: this.intervalAmount;
}
[Symbol.iterator]() {
return this._generate()[Symbol.iterator]();
}

View file

@ -1,50 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Interval } from './interval';
import { ApmFields } from './apm/apm_fields';
import { SpanGeneratorsUnion } from './span_generators_union';
import { SpanIterable } from './span_iterable';
export class SpanGenerator implements SpanIterable {
constructor(
private readonly interval: Interval,
private readonly dataGenerator: Array<(interval: Interval) => Generator<ApmFields>>
) {
this._order = interval.from > interval.to ? 'desc' : 'asc';
}
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
toArray(): ApmFields[] {
return Array.from(this);
}
concat(...iterables: SpanGenerator[]) {
return new SpanGeneratorsUnion([this, ...iterables]);
}
*[Symbol.iterator]() {
for (const iterator of this.dataGenerator) {
for (const fields of iterator(this.interval)) {
yield fields;
}
}
}
async *[Symbol.asyncIterator]() {
for (const iterator of this.dataGenerator) {
for (const fields of iterator(this.interval)) {
yield fields;
}
}
}
}

View file

@ -1,46 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ApmFields } from './apm/apm_fields';
import { SpanGeneratorsUnion } from './span_generators_union';
export interface SpanIterable extends Iterable<ApmFields>, AsyncIterable<ApmFields> {
order(): 'desc' | 'asc';
toArray(): ApmFields[];
concat(...iterables: SpanIterable[]): SpanGeneratorsUnion;
}
export class SpanArrayIterable implements SpanIterable {
constructor(private fields: ApmFields[]) {
const timestamps = fields.filter((f) => f['@timestamp']).map((f) => f['@timestamp']!);
this._order = timestamps.length > 1 ? (timestamps[0] > timestamps[1] ? 'desc' : 'asc') : 'asc';
}
private readonly _order: 'desc' | 'asc';
order() {
return this._order;
}
async *[Symbol.asyncIterator](): AsyncIterator<ApmFields> {
return this.fields[Symbol.iterator]();
}
[Symbol.iterator](): Iterator<ApmFields> {
return this.fields[Symbol.iterator]();
}
concat(...iterables: SpanIterable[]): SpanGeneratorsUnion {
return new SpanGeneratorsUnion([this, ...iterables]);
}
toArray(): ApmFields[] {
return this.fields;
}
}

View file

@ -8,7 +8,7 @@
import moment from 'moment';
import { ApmFields } from './apm/apm_fields';
import { SpanIterable } from './span_iterable';
import { EntityIterable } from './entity_iterable';
import { getTransactionMetrics } from './apm/processors/get_transaction_metrics';
import { getSpanDestinationMetrics } from './apm/processors/get_span_destination_metrics';
import { getBreakdownMetrics } from './apm/processors/get_breakdown_metrics';
@ -16,41 +16,54 @@ import { parseInterval } from './interval';
import { dedot } from './utils/dedot';
import { ApmElasticsearchOutputWriteTargets } from './apm/utils/get_apm_write_targets';
import { Logger } from './utils/create_logger';
import { Fields } from './entity';
export interface StreamProcessorOptions {
processors: Array<(events: ApmFields[]) => ApmFields[]>;
export interface StreamProcessorOptions<TFields extends Fields = ApmFields> {
version?: string;
processors: Array<(events: TFields[]) => TFields[]>;
flushInterval?: string;
// defaults to 10k
maxBufferSize?: number;
// the maximum source events to process, not the maximum documents outputted by the processor
maxSourceEvents?: number;
logger?: Logger;
name?: string;
// called everytime maxBufferSize is processed
processedCallback?: (processedDocuments: number) => void;
}
export class StreamProcessor {
export class StreamProcessor<TFields extends Fields = ApmFields> {
public static readonly apmProcessors = [
getTransactionMetrics,
getSpanDestinationMetrics,
getBreakdownMetrics,
];
public static defaultFlushInterval: number = 10000;
constructor(private readonly options: StreamProcessorOptions) {
constructor(private readonly options: StreamProcessorOptions<TFields>) {
[this.intervalAmount, this.intervalUnit] = this.options.flushInterval
? parseInterval(this.options.flushInterval)
: parseInterval('1m');
this.name = this.options?.name ?? 'StreamProcessor';
this.version = this.options.version ?? '8.0.0';
this.versionMajor = Number.parseInt(this.version.split('.')[0], 10);
}
private readonly intervalAmount: number;
private readonly intervalUnit: any;
private readonly name: string;
private readonly version: string;
private readonly versionMajor: number;
// TODO move away from chunking and feed this data one by one to processors
*stream(...eventSources: SpanIterable[]) {
const maxBufferSize = this.options.maxBufferSize ?? 10000;
*stream(...eventSources: Array<EntityIterable<TFields>>): Generator<ApmFields, any, any> {
const maxBufferSize = this.options.maxBufferSize ?? StreamProcessor.defaultFlushInterval;
const maxSourceEvents = this.options.maxSourceEvents;
let localBuffer = [];
let flushAfter: number | null = null;
let sourceEventsYielded = 0;
for (const eventSource of eventSources) {
const order = eventSource.order();
this.options.logger?.info(`order: ${order}`);
this.options.logger?.debug(`order: ${order}`);
for (const event of eventSource) {
const eventDate = event['@timestamp'] as number;
localBuffer.push(event);
@ -58,10 +71,15 @@ export class StreamProcessor {
flushAfter = this.calculateFlushAfter(eventDate, order);
}
yield StreamProcessor.enrich(event);
yield StreamProcessor.enrich(event, this.version, this.versionMajor);
sourceEventsYielded++;
if (maxSourceEvents && sourceEventsYielded % (maxSourceEvents / 10) === 0) {
this.options.logger?.info(`Yielded ${sourceEventsYielded} events`);
if (sourceEventsYielded % maxBufferSize === 0) {
if (this.options?.processedCallback) {
this.options.processedCallback(maxBufferSize);
}
}
if (maxSourceEvents && sourceEventsYielded % maxBufferSize === 0) {
this.options.logger?.debug(`${this.name} yielded ${sourceEventsYielded} events`);
}
if (maxSourceEvents && sourceEventsYielded >= maxSourceEvents) {
// yielded the maximum source events, we still want the local buffer to generate derivative documents
@ -76,25 +94,34 @@ export class StreamProcessor {
const e = new Date(eventDate).toISOString();
const f = new Date(flushAfter!).toISOString();
this.options.logger?.debug(
`flush ${localBuffer.length} documents ${order}: ${e} => ${f}`
`${this.name} flush ${localBuffer.length} documents ${order}: ${e} => ${f}`
);
for (const processor of this.options.processors) {
yield* processor(localBuffer).map(StreamProcessor.enrich);
yield* processor(localBuffer).map((d) =>
StreamProcessor.enrich(d, this.version, this.versionMajor)
);
}
localBuffer = [];
flushAfter = this.calculateFlushAfter(flushAfter, order);
}
}
if (maxSourceEvents && sourceEventsYielded >= maxSourceEvents) {
this.options.logger?.info(`Yielded maximum number of documents: ${maxSourceEvents}`);
this.options.logger?.info(
`${this.name} yielded maximum number of documents: ${maxSourceEvents}`
);
break;
}
}
if (localBuffer.length > 0) {
this.options.logger?.info(`Processing remaining buffer: ${localBuffer.length} items left`);
this.options.logger?.info(
`${this.name} processing remaining buffer: ${localBuffer.length} items left`
);
for (const processor of this.options.processors) {
yield* processor(localBuffer).map(StreamProcessor.enrich);
yield* processor(localBuffer).map((d) =>
StreamProcessor.enrich(d, this.version, this.versionMajor)
);
}
this.options.processedCallback?.apply(this, [localBuffer.length]);
}
}
@ -106,34 +133,35 @@ export class StreamProcessor {
}
}
async *streamAsync(...eventSources: SpanIterable[]): AsyncIterator<ApmFields> {
async *streamAsync(...eventSources: Array<EntityIterable<TFields>>): AsyncIterable<ApmFields> {
yield* this.stream(...eventSources);
}
*streamToDocument<TDocument>(
map: (d: ApmFields) => TDocument,
...eventSources: SpanIterable[]
): Generator<ApmFields> {
...eventSources: Array<EntityIterable<TFields>>
): Generator<TDocument> {
for (const apmFields of this.stream(...eventSources)) {
yield map(apmFields);
}
}
async *streamToDocumentAsync<TDocument>(
map: (d: ApmFields) => TDocument,
...eventSources: SpanIterable[]
): AsyncIterator<ApmFields> {
for (const apmFields of this.stream(...eventSources)) {
...eventSources: Array<EntityIterable<TFields>>
): AsyncIterable<TDocument> & AsyncIterator<TDocument> {
for await (const apmFields of this.stream(...eventSources)) {
yield map(apmFields);
}
}
streamToArray(...eventSources: SpanIterable[]) {
streamToArray(...eventSources: Array<EntityIterable<TFields>>) {
return Array.from<ApmFields>(this.stream(...eventSources));
}
static enrich(document: ApmFields): ApmFields {
private static enrich(document: ApmFields, version: string, versionMajor: number): ApmFields {
// see https://github.com/elastic/apm-server/issues/7088 can not be provided as flat key/values
document.observer = {
version: '8.0.0',
version_major: 8,
version: version ?? '8.2.0',
version_major: versionMajor,
};
document['service.node.name'] =
document['service.node.name'] || document['container.id'] || document['host.name'];
@ -145,11 +173,11 @@ export class StreamProcessor {
return document;
}
static toDocument(document: ApmFields): Record<string, any> {
if (!document.observer) {
document = StreamProcessor.enrich(document);
}
toDocument(document: ApmFields): Record<string, any> {
const newDoc: Record<string, any> = {};
if (!document.observer) {
document = StreamProcessor.enrich(document, this.version, this.versionMajor);
}
dedot(document, newDoc);
if (typeof newDoc['@timestamp'] === 'number') {
const timestamp = newDoc['@timestamp'];
@ -192,16 +220,16 @@ export class StreamProcessor {
}
}
export async function* streamProcessAsync(
processors: Array<(events: ApmFields[]) => ApmFields[]>,
...eventSources: SpanIterable[]
export async function* streamProcessAsync<TFields>(
processors: Array<(events: TFields[]) => TFields[]>,
...eventSources: Array<EntityIterable<TFields>>
) {
return new StreamProcessor({ processors }).streamAsync(...eventSources);
}
export function streamProcessToArray(
processors: Array<(events: ApmFields[]) => ApmFields[]>,
...eventSources: SpanIterable[]
export function streamProcessToArray<TFields>(
processors: Array<(events: TFields[]) => TFields[]>,
...eventSources: Array<EntityIterable<TFields>>
) {
return new StreamProcessor({ processors }).streamToArray(...eventSources);
}

View file

@ -12,7 +12,7 @@ export class Timerange {
constructor(private from: Date, private to: Date) {}
interval(interval: string) {
return new Interval(this.from, this.to, interval);
return new Interval({ from: this.from, to: this.to, interval });
}
}

View file

@ -6,20 +6,24 @@
* Side Public License, v 1.
*/
import { range } from 'lodash';
import { ApmFields } from '../apm/apm_fields';
import { SpanIterable } from '../span_iterable';
import { Fields } from '../entity';
import { EntityIterable } from '../entity_iterable';
export function merge(iterables: SpanIterable[]): Iterable<ApmFields> {
export function merge<TField extends Fields>(
iterables: Array<EntityIterable<TField>>
): Iterable<TField> {
if (iterables.length === 1) return iterables[0];
const iterators = iterables.map<Iterator<ApmFields>>((i) => {
return i[Symbol.iterator]();
const iterators = iterables.map<{ it: Iterator<ApmFields>; weight: number }>((i) => {
return { it: i[Symbol.iterator](), weight: Math.max(1, i.ratePerMinute()) };
});
let done = false;
const myIterable: Iterable<ApmFields> = {
const myIterable: Iterable<TField> = {
*[Symbol.iterator]() {
do {
const items = iterators.map((i) => i.next());
const items = iterators.flatMap((i) => range(0, i.weight).map(() => i.it.next()));
done = items.every((item) => item.done);
if (!done) {
yield* items.filter((i) => !i.done).map((i) => i.value);
@ -27,8 +31,8 @@ export function merge(iterables: SpanIterable[]): Iterable<ApmFields> {
} while (!done);
// Done for the first time: close all iterators
for (const iterator of iterators) {
if (typeof iterator.return === 'function') {
iterator.return();
if (typeof iterator.it.return === 'function') {
iterator.it.return();
}
}
},

View file

@ -7,13 +7,14 @@
*/
import { apm, timerange } from '../../index';
import { ApmFields } from '../../lib/apm/apm_fields';
import { Instance } from '../../lib/apm/instance';
import { Scenario } from '../scenario';
import { getCommonServices } from '../utils/get_common_services';
import { getLogger } from '../utils/get_common_services';
import { RunOptions } from '../utils/parse_run_cli_flags';
const scenario: Scenario = async (runOptions: RunOptions) => {
const { logger } = getCommonServices(runOptions);
const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
const logger = getLogger(runOptions);
const { numServices = 3 } = runOptions.scenarioOpts || {};
@ -31,7 +32,7 @@ const scenario: Scenario = async (runOptions: RunOptions) => {
apm.service(`opbeans-go-${index}`, 'production', 'go').instance('instance')
);
const instanceSpans = (instance: Instance) => {
const successfulTraceEvents = successfulTimestamps.spans((timestamp) =>
const successfulTraceEvents = successfulTimestamps.generator((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
@ -50,10 +51,9 @@ const scenario: Scenario = async (runOptions: RunOptions) => {
.success()
.timestamp(timestamp)
)
.serialize()
);
const failedTraceEvents = failedTimestamps.spans((timestamp) =>
const failedTraceEvents = failedTimestamps.generator((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
@ -62,13 +62,12 @@ const scenario: Scenario = async (runOptions: RunOptions) => {
.errors(
instance.error('[ResponseError] index_not_found_exception').timestamp(timestamp + 50)
)
.serialize()
);
const metricsets = range
.interval('30s')
.rate(1)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.appMetrics({
'system.memory.actual.free': 800,
@ -77,15 +76,14 @@ const scenario: Scenario = async (runOptions: RunOptions) => {
'system.process.cpu.total.norm.pct': 0.7,
})
.timestamp(timestamp)
.serialize()
);
return successfulTraceEvents.concat(failedTraceEvents, metricsets);
return successfulTraceEvents.merge(failedTraceEvents, metricsets);
};
return instances
.map((instance) => logger.perf('generating_apm_events', () => instanceSpans(instance)))
.reduce((p, c) => p.concat(c));
.reduce((p, c) => p.merge(c));
},
};
};

View file

@ -8,15 +8,17 @@
import { stackMonitoring, timerange } from '../../index';
import { Scenario } from '../scenario';
import { getCommonServices } from '../utils/get_common_services';
import { getLogger } from '../utils/get_common_services';
import { RunOptions } from '../utils/parse_run_cli_flags';
import { ApmFields } from '../../lib/apm/apm_fields';
const scenario: Scenario = async (runOptions: RunOptions) => {
const { logger } = getCommonServices(runOptions);
const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
const logger = getLogger(runOptions);
if (!runOptions.writeTarget) {
throw new Error('Write target is not defined');
}
// TODO reintroduce overwrite
// if (!runOptions.writeTarget) {
// throw new Error('Write target is not defined');
// }
return {
generate: ({ from, to }) => {
@ -26,9 +28,9 @@ const scenario: Scenario = async (runOptions: RunOptions) => {
return range
.interval('30s')
.rate(1)
.spans((timestamp) => {
.generator((timestamp) => {
const events = logger.perf('generating_sm_events', () => {
return kibanaStats.timestamp(timestamp).requests(10, 20).serialize();
return kibanaStats.timestamp(timestamp).requests(10, 20);
});
return events;
});

View file

@ -10,11 +10,12 @@
import { stackMonitoring, timerange } from '../../index';
import { Scenario } from '../scenario';
import { getCommonServices } from '../utils/get_common_services';
import { getLogger } from '../utils/get_common_services';
import { RunOptions } from '../utils/parse_run_cli_flags';
import { StackMonitoringFields } from '../../lib/stack_monitoring/stack_monitoring_fields';
const scenario: Scenario = async (runOptions: RunOptions) => {
const { logger } = getCommonServices(runOptions);
const scenario: Scenario<StackMonitoringFields> = async (runOptions: RunOptions) => {
const logger = getLogger(runOptions);
return {
mapToIndex: (data) => {
@ -28,18 +29,18 @@ const scenario: Scenario = async (runOptions: RunOptions) => {
const kibanaStats = cluster.kibana('kibana-01').stats();
const range = timerange(from, to);
return range
.interval('10s')
.rate(1)
.spans((timestamp) => {
const clusterEvents = logger.perf('generating_es_events', () => {
return clusterStats.timestamp(timestamp).indices(115).serialize();
});
const kibanaEvents = logger.perf('generating_kb_events', () => {
return kibanaStats.timestamp(timestamp).requests(10, 20).serialize();
});
return [...clusterEvents, ...kibanaEvents];
});
const interval = range.interval('10s').rate(1);
return interval
.generator((timestamp) =>
logger.perf('generating_es_events', () => clusterStats.timestamp(timestamp).indices(115))
)
.merge(
interval.generator((timestamp) =>
logger.perf('generating_kb_events', () =>
kibanaStats.timestamp(timestamp).requests(10, 20)
)
)
);
},
};
};

View file

@ -14,7 +14,6 @@ import { startLiveDataUpload } from './utils/start_live_data_upload';
import { parseRunCliFlags } from './utils/parse_run_cli_flags';
import { getCommonServices } from './utils/get_common_services';
import { ApmSynthtraceKibanaClient } from '../lib/apm/client/apm_synthtrace_kibana_client';
import { ApmSynthtraceEsClient } from '../lib/apm/client/apm_synthtrace_es_client';
function options(y: Argv) {
return y
@ -27,20 +26,29 @@ function options(y: Argv) {
describe: 'Elasticsearch target',
string: true,
})
.option('kibana', {
describe: 'Kibana target, used to bootstrap datastreams/mappings/templates/settings',
string: true,
})
.option('cloudId', {
describe:
'Provide connection information and will force APM on the cloud to migrate to run as a Fleet integration',
string: true,
})
.option('local', {
describe:
'Shortcut during development, assumes `yarn es snapshot` and `yarn start` are running',
boolean: true,
})
.option('username', {
describe: 'Basic authentication username',
string: true,
demandOption: true,
default: 'elastic',
})
.option('password', {
describe: 'Basic authentication password',
string: true,
demandOption: true,
default: 'changeme',
})
.option('from', {
description: 'The start of the time window',
@ -52,15 +60,20 @@ function options(y: Argv) {
description: 'Generate and index data continuously',
boolean: true,
})
.option('--dryRun', {
.option('dryRun', {
description: 'Enumerates the stream without sending events to Elasticsearch ',
boolean: true,
})
.option('maxDocs', {
description:
'The maximum number of documents we are allowed to generate, should be multiple of 10.000',
description: 'The maximum number of documents we are allowed to generate',
number: true,
})
.option('maxDocsConfidence', {
description:
'Expert setting: --maxDocs relies on accurate tpm reporting of generators setting this to >1 will widen the estimated data generation range',
number: true,
default: 1,
})
.option('numShards', {
description:
'Updates the component templates to update the number of primary shards, requires cloudId to be provided',
@ -73,31 +86,15 @@ function options(y: Argv) {
})
.option('workers', {
describe: 'Amount of Node.js worker threads',
default: 5,
})
.option('bucketSize', {
describe: 'Size of bucket for which to generate data',
default: '15m',
})
.option('interval', {
describe: 'The interval at which to index data',
default: '10s',
})
.option('clientWorkers', {
describe: 'Number of concurrently connected ES clients',
default: 5,
})
.option('batchSize', {
describe: 'Number of documents per bulk index request',
default: 1000,
number: true,
})
.option('logLevel', {
describe: 'Log level',
default: 'info',
})
.option('writeTarget', {
describe: 'Target to index',
string: true,
.option('forceLegacyIndices', {
describe: 'Force writing to legacy indices',
boolean: true,
})
.option('scenarioOpts', {
describe: 'Options specific to the scenario',
@ -105,68 +102,115 @@ function options(y: Argv) {
return arg as Record<string, any> | undefined;
},
})
.conflicts('to', 'live')
.conflicts('maxDocs', 'live')
.conflicts('target', 'cloudId');
.option('gcpRepository', {
describe:
'Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format',
string: true,
})
.conflicts('target', 'cloudId')
.conflicts('kibana', 'cloudId')
.conflicts('local', 'target')
.conflicts('local', 'kibana')
.conflicts('local', 'cloudId');
}
export type RunCliFlags = ReturnType<typeof options>['argv'];
yargs(process.argv.slice(2))
.command('*', 'Generate data and index into Elasticsearch', options, async (argv) => {
const runOptions = parseRunCliFlags(argv);
.command(
'*',
'Generate data and index into Elasticsearch',
options,
async (argv: RunCliFlags) => {
if (argv.local) {
argv.target = 'http://localhost:9200';
}
if (argv.kibana && !argv.target) {
const url = new URL(argv.kibana);
// super naive inference of ES target based on public kibana Cloud endpoint
if (url.hostname.match(/\.kb\./)) {
argv.target = argv.kibana.replace(/\.kb\./, '.es.');
}
}
const { logger, client } = getCommonServices(runOptions);
const runOptions = parseRunCliFlags(argv);
const toMs = datemath.parse(String(argv.to ?? 'now'))!.valueOf();
const to = new Date(toMs);
const defaultTimeRange = !runOptions.maxDocs ? '15m' : '52w';
const fromMs = argv.from
? datemath.parse(String(argv.from))!.valueOf()
: toMs - intervalToMs(defaultTimeRange);
const from = new Date(fromMs);
const { logger, apmEsClient } = getCommonServices(runOptions);
const live = argv.live;
const toMs = datemath.parse(String(argv.to ?? 'now'))!.valueOf();
const to = new Date(toMs);
const defaultTimeRange = !runOptions.maxDocs ? '15m' : '520w';
const fromMs = argv.from
? datemath.parse(String(argv.from))!.valueOf()
: toMs - intervalToMs(defaultTimeRange);
const from = new Date(fromMs);
const forceDataStreams = !!runOptions.cloudId;
const esClient = new ApmSynthtraceEsClient(client, logger, forceDataStreams);
if (runOptions.dryRun) {
await startHistoricalDataUpload(esClient, logger, runOptions, from, to);
return;
}
if (runOptions.cloudId) {
const kibanaClient = new ApmSynthtraceKibanaClient(logger);
await kibanaClient.migrateCloudToManagedApm(
runOptions.cloudId,
runOptions.username,
runOptions.password
const live = argv.live;
if (runOptions.dryRun) {
await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, '8.0.0');
return;
}
// we need to know the running version to generate events that satisfy the min version requirements
let version = await apmEsClient.runningVersion();
logger.info(`Discovered Elasticsearch running version: ${version}`);
version = version.replace('-SNAPSHOT', '');
// We automatically set up managed APM either by migrating on cloud or installing the package locally
if (runOptions.cloudId || argv.local || argv.kibana) {
const kibanaClient = new ApmSynthtraceKibanaClient(logger);
if (runOptions.cloudId) {
await kibanaClient.migrateCloudToManagedApm(
runOptions.cloudId,
runOptions.username,
runOptions.password
);
} else {
let kibanaUrl: string | null = argv.kibana ?? null;
if (argv.local) {
kibanaUrl = await kibanaClient.discoverLocalKibana();
}
if (!kibanaUrl) throw Error('kibanaUrl could not be determined');
await kibanaClient.installApmPackage(
kibanaUrl,
version,
runOptions.username,
runOptions.password
);
}
}
if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) {
await apmEsClient.updateComponentTemplates(runOptions.numShards);
}
if (argv.clean) {
await apmEsClient.clean();
}
if (runOptions.gcpRepository) {
await apmEsClient.registerGcpRepository(runOptions.gcpRepository);
}
logger.info(
`Starting data generation\n: ${JSON.stringify(
{
...runOptions,
from: from.toISOString(),
to: to.toISOString(),
},
null,
2
)}`
);
if (runOptions.maxDocs !== 0)
await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, version);
if (live) {
await startLiveDataUpload(apmEsClient, logger, runOptions, to, version);
}
}
if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) {
await esClient.updateComponentTemplates(runOptions.numShards);
}
if (argv.clean) {
await esClient.clean();
}
logger.info(
`Starting data generation\n: ${JSON.stringify(
{
...runOptions,
from: from.toISOString(),
to: to.toISOString(),
},
null,
2
)}`
);
await startHistoricalDataUpload(esClient, logger, runOptions, from, to);
if (live) {
await startLiveDataUpload(esClient, logger, runOptions, to);
}
})
)
.parse();

View file

@ -7,10 +7,10 @@
*/
import { RunOptions } from './utils/parse_run_cli_flags';
import { SpanIterable } from '../lib/span_iterable';
import { EntityIterable } from '../lib/entity_iterable';
type Generate = (range: { from: Date; to: Date }) => SpanIterable;
export type Scenario = (options: RunOptions) => Promise<{
generate: Generate;
type Generate<TFields> = (range: { from: Date; to: Date }) => EntityIterable<TFields>;
export type Scenario<TFields> = (options: RunOptions) => Promise<{
generate: Generate<TFields>;
mapToIndex?: (data: Record<string, any>) => string;
}>;

View file

@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { random } from 'lodash';
import { apm, timerange } from '../../index';
import { ApmFields } from '../../lib/apm/apm_fields';
import { Instance } from '../../lib/apm/instance';
import { Scenario } from '../scenario';
import { getLogger } from '../utils/get_common_services';
import { RunOptions } from '../utils/parse_run_cli_flags';
const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
const logger = getLogger(runOptions);
const numServices = 3;
const languages = ['go', 'dotnet', 'java', 'python'];
const services = ['web', 'order-processing', 'api-backend', 'proxy'];
return {
generate: ({ from, to }) => {
const range = timerange(from, to);
const successfulTimestamps = range.interval('1s').rate(1);
// `.randomize(3, 180);
const instances = [...Array(numServices).keys()].map((index) =>
apm
.service(
`${services[index % services.length]}-${languages[index % languages.length]}-${index}`,
'production',
languages[index % languages.length]
)
.instance('instance')
);
const urls = ['GET /order/{id}', 'POST /basket/{id}', 'DELETE /basket', 'GET /products'];
const instanceSpans = (instance: Instance, url: string, index: number) => {
const successfulTraceEvents = successfulTimestamps.generator((timestamp) => {
const mod = index % 4;
const randomHigh = random(100, mod * 1000);
const randomLow = random(10, randomHigh / 10 + mod * 3);
const duration = random(randomLow, randomHigh);
const childDuration = random(randomLow, duration);
const remainderDuration = duration - childDuration;
const generateError = index % random(mod, 9) === 0;
const generateChildError = index % random(mod, 9) === 0;
const span = instance
.transaction(url)
.timestamp(timestamp)
.duration(duration)
.children(
instance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.duration(childDuration)
.destination('elasticsearch')
.timestamp(timestamp)
.outcome(generateError && generateChildError ? 'failure' : 'success'),
instance
.span('custom_operation', 'custom')
.duration(remainderDuration)
.success()
.timestamp(timestamp + childDuration)
);
return !generateError
? span.success()
: span
.failure()
.errors(instance.error(`No handler for ${url}`).timestamp(timestamp + 50));
});
return successfulTraceEvents;
};
return instances
.flatMap((instance) => urls.map((url) => ({ instance, url })))
.map(({ instance, url }, index) =>
logger.perf('generating_apm_events', () => instanceSpans(instance, url, index))
)
.reduce((p, c) => p.merge(c));
},
};
};
export default scenario;

View file

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { random } from 'lodash';
import { apm, timerange } from '../../index';
import { Instance } from '../../lib/apm/instance';
import { Scenario } from '../scenario';
import { getLogger } from '../utils/get_common_services';
import { RunOptions } from '../utils/parse_run_cli_flags';
import { ApmFields } from '../../lib/apm/apm_fields';
const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
const logger = getLogger(runOptions);
const numServices = 500;
const languages = ['go', 'dotnet', 'java', 'python'];
const services = ['web', 'order-processing', 'api-backend', 'proxy'];
return {
generate: ({ from, to }) => {
const range = timerange(from, to);
const successfulTimestamps = range.interval('1s').rate(3);
const instances = [...Array(numServices).keys()].map((index) =>
apm
.service(
`${services[index % services.length]}-${languages[index % languages.length]}-${index}`,
'production',
languages[index % languages.length]
)
.instance('instance')
);
const urls = ['GET /order/{id}', 'POST /basket/{id}', 'DELETE /basket', 'GET /products'];
const instanceSpans = (instance: Instance, url: string) => {
const successfulTraceEvents = successfulTimestamps.generator((timestamp) => {
const randomHigh = random(1000, 4000);
const randomLow = random(100, randomHigh / 5);
const duration = random(randomLow, randomHigh);
const childDuration = random(randomLow, duration);
const remainderDuration = duration - childDuration;
const generateError = random(1, 4) % 3 === 0;
const generateChildError = random(0, 5) % 2 === 0;
const span = instance
.transaction(url)
.timestamp(timestamp)
.duration(duration)
.children(
instance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.duration(childDuration)
.destination('elasticsearch')
.timestamp(timestamp)
.outcome(generateError && generateChildError ? 'failure' : 'success'),
instance
.span('custom_operation', 'custom')
.duration(remainderDuration)
.success()
.timestamp(timestamp + childDuration)
);
return !generateError
? span.success()
: span
.failure()
.errors(instance.error(`No handler for ${url}`).timestamp(timestamp + 50));
});
return successfulTraceEvents;
};
return instances
.flatMap((instance) => urls.map((url) => ({ instance, url })))
.map(({ instance, url }) =>
logger.perf('generating_apm_events', () => instanceSpans(instance, url))
)
.reduce((p, c) => p.merge(c));
},
};
};
export default scenario;

View file

@ -7,10 +7,18 @@
*/
import { Client, ClientOptions } from '@elastic/elasticsearch';
import { createLogger } from '../../lib/utils/create_logger';
import { ApmSynthtraceEsClient } from '../../lib/apm/client/apm_synthtrace_es_client';
import { createLogger, Logger } from '../../lib/utils/create_logger';
import { RunOptions } from './parse_run_cli_flags';
export function getCommonServices({ target, cloudId, username, password, logLevel }: RunOptions) {
export function getLogger({ logLevel }: RunOptions) {
return createLogger(logLevel);
}
export function getCommonServices(
{ target, cloudId, username, password, logLevel, forceLegacyIndices }: RunOptions,
logger?: Logger
) {
if (!target && !cloudId) {
throw Error('target or cloudId needs to be specified');
}
@ -19,14 +27,27 @@ export function getCommonServices({ target, cloudId, username, password, logLeve
username,
password,
};
// Useful when debugging trough mitmproxy
/*
options.Connection = HttpConnection;
options.proxy = 'http://localhost:8080';
options.tls = {
rejectUnauthorized: false,
};
*/
const client = new Client(options);
const logger = createLogger(logLevel);
logger = logger ?? createLogger(logLevel);
const apmEsClient = new ApmSynthtraceEsClient(client, logger, {
forceLegacyIndices,
refreshAfterIndex: false,
});
return {
logger,
client,
apmEsClient,
};
}

View file

@ -8,6 +8,7 @@
import Path from 'path';
import { Logger } from '../../lib/utils/create_logger';
import { Scenario } from '../scenario';
import { Fields } from '../../lib/entity';
export function getScenario({ file, logger }: { file: unknown; logger: Logger }) {
const location = Path.join(process.cwd(), String(file));
@ -19,5 +20,5 @@ export function getScenario({ file, logger }: { file: unknown; logger: Logger })
return m.default;
}
throw new Error(`Could not find scenario at ${location}`);
}) as Promise<Scenario>;
}) as Promise<Scenario<Fields>>;
}

View file

@ -9,10 +9,9 @@
import { pick } from 'lodash';
import { LogLevel } from '../../lib/utils/create_logger';
import { RunCliFlags } from '../run';
import { intervalToMs } from './interval_to_ms';
export function parseRunCliFlags(flags: RunCliFlags) {
const { file, _, logLevel, interval, bucketSize } = flags;
const { file, _, logLevel } = flags;
const parsedFile = String(file || _[0]);
@ -35,35 +34,24 @@ export function parseRunCliFlags(flags: RunCliFlags) {
break;
}
const intervalInMs = intervalToMs(interval);
if (!intervalInMs) {
throw new Error('Invalid interval');
}
const bucketSizeInMs = intervalToMs(bucketSize);
if (!bucketSizeInMs) {
throw new Error('Invalid bucket size');
}
return {
...pick(
flags,
'maxDocs',
'maxDocsConfidence',
'target',
'cloudId',
'username',
'password',
'workers',
'clientWorkers',
'batchSize',
'writeTarget',
'flushSizeBulk',
'flushSize',
'numShards',
'scenarioOpts',
'dryRun'
'forceLegacyIndices',
'dryRun',
'gcpRepository'
),
intervalInMs,
bucketSizeInMs,
logLevel: parsedLogLevel,
file: parsedFile,
};

View file

@ -5,55 +5,218 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import moment from 'moment';
import { Worker } from 'worker_threads';
import Path from 'path';
import { range } from 'lodash';
import pLimit from 'p-limit';
import { cpus } from 'os';
import { RunOptions } from './parse_run_cli_flags';
import { getScenario } from './get_scenario';
import { ApmSynthtraceEsClient } from '../../lib/apm';
import { ApmSynthtraceEsClient, LogLevel } from '../..';
import { Logger } from '../../lib/utils/create_logger';
import { StreamProcessor } from '../../lib/stream_processor';
export async function startHistoricalDataUpload(
esClient: ApmSynthtraceEsClient,
logger: Logger,
runOptions: RunOptions,
from: Date,
to: Date
to: Date,
version: string
) {
const file = runOptions.file;
const scenario = await logger.perf('get_scenario', () => getScenario({ file, logger }));
const { generate, mapToIndex } = await scenario(runOptions);
// if we want to generate a maximum number of documents reverse generation to descend.
[from, to] = runOptions.maxDocs ? [to, from] : [from, to];
logger.info(`Generating data from ${from} to ${to}`);
const file = runOptions.file;
const scenario = await logger.perf('get_scenario', () => getScenario({ file, logger }));
const { generate } = await scenario(runOptions);
const events = logger.perf('generate_scenario', () => generate({ from, to }));
const cores = cpus().length;
// settle on a reasonable max concurrency arbitrarily capping at 10.
let maxConcurrency = Math.min(10, cores - 1);
// maxWorkers to be spawned is double that of maxConcurrency. We estimate the number of ranges over
// maxConcurrency, if that is too conservative this provides more available workers to complete the job.
// If any worker finds that work is already completed they will spin down immediately.
let maxWorkers = maxConcurrency * 2;
logger.info(
`Discovered ${cores} cores, splitting work over ${maxWorkers} workers with limited concurrency: ${maxConcurrency}`
);
if (runOptions.dryRun) {
const maxDocs = runOptions.maxDocs;
const stream = new StreamProcessor({
processors: StreamProcessor.apmProcessors,
maxSourceEvents: maxDocs,
logger,
}).streamToDocument(StreamProcessor.toDocument, events);
logger.perf('enumerate_scenario', () => {
// @ts-ignore
// We just want to enumerate
let yielded = 0;
for (const _ of stream) {
yielded++;
}
});
return;
// if --workers N is specified it should take precedence over inferred maximum workers
if (runOptions.workers) {
// ensure maxWorkers is at least 1
maxWorkers = Math.max(1, runOptions.workers);
// ensure max concurrency is at least 1 or the ceil of --workers N / 2
maxConcurrency = Math.ceil(Math.max(1, maxWorkers / 2));
logger.info(
`updating maxWorkers to ${maxWorkers} and maxConcurrency to ${maxConcurrency} because it was explicitly set through --workers`
);
}
const clientWorkers = runOptions.clientWorkers;
await logger.perf('index_scenario', () =>
esClient.index(events, {
concurrency: clientWorkers,
maxDocs: runOptions.maxDocs,
mapToIndex,
})
const events = logger.perf('generate_scenario', () => generate({ from, to }));
const ratePerMinute = events.ratePerMinute();
logger.info(
`Scenario is generating ${ratePerMinute.toLocaleString()} events per minute interval`
);
let rangeEnd = to;
if (runOptions.maxDocs) {
// estimate a more accurate range end for when --maxDocs is specified
rangeEnd = moment(from)
// ratePerMinute() is not exact if the generator is yielding variable documents
// the rate is calculated by peeking the first yielded event and its children.
// for real complex cases manually specifying --to is encouraged.
.subtract((runOptions.maxDocs / ratePerMinute) * runOptions.maxDocsConfidence, 'm')
.toDate();
}
const diff = moment(from).diff(rangeEnd);
const d = moment.duration(Math.abs(diff), 'ms');
logger.info(
`Range: ${d.years()} days ${d.days()} days, ${d.hours()} hours ${d.minutes()} minutes ${d.seconds()} seconds`
);
// make sure ranges cover at least 100k documents
const minIntervalSpan = moment.duration(100000 / ratePerMinute, 'm');
const minNumberOfRanges = d.asMilliseconds() / minIntervalSpan.asMilliseconds();
if (minNumberOfRanges < maxWorkers) {
maxWorkers = Math.max(1, Math.floor(minNumberOfRanges));
maxConcurrency = Math.max(1, maxWorkers / 2);
if (runOptions.workers) {
logger.info(
`Ignoring --workers ${runOptions.workers} since each worker would not see enough data`
);
}
logger.info(
`updating maxWorkers to ${maxWorkers} and maxConcurrency to ${maxConcurrency} to ensure each worker does enough work`
);
}
logger.info(`Generating data from ${from.toISOString()} to ${rangeEnd.toISOString()}`);
type WorkerMessages =
| { log: LogLevel; args: any[] }
| { workerIndex: number; processedDocuments: number }
| { workerIndex: number; firstTimestamp: Date }
| { workerIndex: number; lastTimestamp: Date };
interface WorkerTotals {
total: number;
bucketFrom: Date;
bucketTo: Date;
firstTimestamp?: Date;
lastTimestamp?: Date;
}
function rangeStep(interval: number) {
if (from > rangeEnd) return moment(from).subtract(interval, 'ms').toDate();
return moment(from).add(interval, 'ms').toDate();
}
// precalculate intervals to spawn workers over.
// abs() the difference to make add/subtract explicit in rangeStep() in favor of subtracting a negative number
const intervalSpan = Math.abs(diff / maxWorkers);
const intervals = range(0, maxWorkers)
.map((i) => intervalSpan * i)
.map((interval, index) => ({
workerIndex: index,
bucketFrom: rangeStep(interval),
bucketTo: rangeStep(interval + intervalSpan),
}));
// precalculate interval state for each worker to report on.
let totalProcessed = 0;
const workerProcessed = range(0, maxWorkers).reduce<Record<number, WorkerTotals>>((p, c, i) => {
p[i] = { total: 0, bucketFrom: intervals[i].bucketFrom, bucketTo: intervals[i].bucketTo };
return p;
}, {});
function runService({
bucketFrom,
bucketTo,
workerIndex,
}: {
bucketFrom: Date;
bucketTo: Date;
workerIndex: number;
}) {
return new Promise((resolve, reject) => {
logger.info(`Setting up Worker: ${workerIndex}`);
if (runOptions.maxDocs && totalProcessed >= runOptions.maxDocs + 10000) {
logger.info(
`Worker ${workerIndex} has no need to run since ${totalProcessed} documents were already processed `
);
return resolve(null);
}
const progressToConsole = runOptions?.maxDocs
? Math.min(2000000, runOptions.maxDocs / 20)
: 2000000;
const worker = new Worker(Path.join(__dirname, './worker.js'), {
workerData: {
runOptions,
bucketFrom,
bucketTo,
workerIndex,
version,
},
});
worker.on('message', (message: WorkerMessages) => {
if ('workerIndex' in message) {
if ('processedDocuments' in message) {
totalProcessed += message.processedDocuments;
workerProcessed[workerIndex].total += message.processedDocuments;
const check = Math.round(totalProcessed / 10000) * 10000;
if (check % progressToConsole === 0) {
logger.info(`processed: ${totalProcessed} documents`);
}
}
if ('firstTimestamp' in message)
workerProcessed[message.workerIndex].firstTimestamp = message.firstTimestamp;
if ('lastTimestamp' in message)
workerProcessed[message.workerIndex].lastTimestamp = message.lastTimestamp;
} else {
switch (message.log) {
case LogLevel.debug:
logger.debug.apply({}, message.args);
return;
case LogLevel.info:
logger.info.apply({}, message.args);
return;
case LogLevel.trace:
logger.debug.apply({}, message.args);
return;
case LogLevel.error:
logger.error.apply({}, message.args);
return;
default:
logger.info(message);
}
}
});
worker.on('error', (message) => {
logger.error(message);
reject();
});
worker.on('exit', (code) => {
if (code === 2) reject(new Error(`Worker ${workerIndex} exited with error: ${code}`));
if (code === 1) {
logger.info(`Worker ${workerIndex} exited early because cancellation was requested`);
}
resolve(null);
});
worker.postMessage('setup');
worker.postMessage('start');
});
}
const limiter = pLimit(Math.max(1, Math.floor(intervals.length / 2)));
const workers = range(0, intervals.length).map((index) => () => runService(intervals[index]));
return Promise.all(workers.map((worker) => limiter(() => worker())))
.then(async () => {
if (!runOptions.dryRun) {
await esClient.refresh();
}
})
.then(() => {
// eslint-disable-next-line no-console
console.table(workerProcessed);
logger.info(`Finished producing ${totalProcessed} events`);
});
}

View file

@ -12,13 +12,15 @@ import { RunOptions } from './parse_run_cli_flags';
import { ApmFields } from '../../lib/apm/apm_fields';
import { ApmSynthtraceEsClient } from '../../lib/apm';
import { Logger } from '../../lib/utils/create_logger';
import { SpanArrayIterable } from '../../lib/span_iterable';
import { EntityArrayIterable } from '../../lib/entity_iterable';
import { StreamProcessor } from '../../lib/stream_processor';
export async function startLiveDataUpload(
esClient: ApmSynthtraceEsClient,
logger: Logger,
runOptions: RunOptions,
start: Date
start: Date,
version: string
) {
const file = runOptions.file;
@ -27,12 +29,13 @@ export async function startLiveDataUpload(
let queuedEvents: ApmFields[] = [];
let requestedUntil: Date = start;
const bucketSizeInMs = 1000 * 60;
async function uploadNextBatch() {
const end = new Date();
if (end > requestedUntil) {
const bucketFrom = requestedUntil;
const bucketTo = new Date(requestedUntil.getTime() + runOptions.bucketSizeInMs);
const bucketTo = new Date(requestedUntil.getTime() + bucketSizeInMs);
// TODO this materializes into an array, assumption is that the live buffer will fit in memory
const nextEvents = logger.perf('execute_scenario', () =>
generate({ from: bucketFrom, to: bucketTo }).toArray()
@ -55,19 +58,30 @@ export async function startLiveDataUpload(
logger.info(`Uploading until ${new Date(end).toISOString()}, events: ${eventsToUpload.length}`);
queuedEvents = eventsToRemainInQueue;
const streamProcessor = new StreamProcessor({
version,
logger,
processors: StreamProcessor.apmProcessors,
maxSourceEvents: runOptions.maxDocs,
name: `Live index`,
});
await logger.perf('index_live_scenario', () =>
esClient.index(new SpanArrayIterable(eventsToUpload), {
concurrency: runOptions.clientWorkers,
maxDocs: runOptions.maxDocs,
mapToIndex,
})
esClient.index(
new EntityArrayIterable(eventsToUpload),
{
concurrency: runOptions.workers,
maxDocs: runOptions.maxDocs,
mapToIndex,
dryRun: false,
},
streamProcessor
)
);
}
do {
await uploadNextBatch();
await delay(runOptions.intervalInMs);
await delay(bucketSizeInMs);
} while (true);
}
async function delay(ms: number) {

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/* eslint-disable @typescript-eslint/no-var-requires*/
require('@babel/register')({
extensions: ['.ts', '.js'],
presets: [['@babel/preset-env', { targets: { node: 'current' } }], '@babel/preset-typescript'],
});
require('./worker.ts');

View file

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
// import pLimit from 'p-limit';
import { workerData, parentPort } from 'worker_threads';
import { RunOptions } from './parse_run_cli_flags';
import { getScenario } from './get_scenario';
import { StreamToBulkOptions } from '../../lib/apm/client/apm_synthtrace_es_client';
import { getCommonServices } from './get_common_services';
import { LogLevel } from '../../lib/utils/create_logger';
import { StreamProcessor } from '../../lib/stream_processor';
import { Scenario } from '../scenario';
import { EntityIterable, Fields } from '../..';
// logging proxy to main thread, ensures we see real time logging
const l = {
perf: <T extends any>(name: string, cb: () => T): T => {
return cb();
},
debug: (...args: any[]) => parentPort?.postMessage({ log: LogLevel.debug, args }),
info: (...args: any[]) => parentPort?.postMessage({ log: LogLevel.info, args }),
error: (...args: any[]) => parentPort?.postMessage({ log: LogLevel.error, args }),
};
export interface WorkerData {
bucketFrom: Date;
bucketTo: Date;
runOptions: RunOptions;
workerIndex: number;
version: string;
}
const { bucketFrom, bucketTo, runOptions, workerIndex, version } = workerData as WorkerData;
const { logger, apmEsClient } = getCommonServices(runOptions, l);
const file = runOptions.file;
let scenario: Scenario<Fields>;
let events: EntityIterable<Fields>;
let streamToBulkOptions: StreamToBulkOptions;
let streamProcessor: StreamProcessor;
async function setup() {
scenario = await logger.perf('get_scenario', () => getScenario({ file, logger }));
const { generate, mapToIndex } = await scenario(runOptions);
events = logger.perf('generate_scenario', () => generate({ from: bucketFrom, to: bucketTo }));
streamToBulkOptions = {
maxDocs: runOptions.maxDocs,
mapToIndex,
dryRun: !!runOptions.dryRun,
};
streamToBulkOptions.itemStartStopCallback = (item, done) => {
if (!item) return;
if (!done) {
parentPort?.postMessage({ workerIndex, firstTimestamp: item['@timestamp'] });
} else {
parentPort?.postMessage({ workerIndex, lastTimestamp: item['@timestamp'] });
}
};
streamProcessor = new StreamProcessor({
version,
processors: StreamProcessor.apmProcessors,
maxSourceEvents: runOptions.maxDocs,
logger: l,
processedCallback: (processedDocuments) => {
parentPort!.postMessage({ workerIndex, processedDocuments });
},
name: `Worker ${workerIndex}`,
});
}
async function doWork() {
await logger.perf(
'index_scenario',
async () => await apmEsClient.index(events, streamToBulkOptions, streamProcessor)
);
}
parentPort!.on('message', async (message) => {
if (message === 'setup') {
await setup();
}
if (message === 'start') {
try {
await doWork();
process.exit(0);
} catch (error) {
l.info(error);
process.exit(2);
}
}
});

View file

@ -11,6 +11,7 @@ import { StreamProcessor } from '../lib/stream_processor';
describe('output apm events to elasticsearch', () => {
let event: ApmFields;
const streamProcessor = new StreamProcessor({ processors: [], version: '8.0.0' });
beforeEach(() => {
event = {
@ -22,12 +23,12 @@ describe('output apm events to elasticsearch', () => {
});
it('properly formats @timestamp', () => {
const doc = StreamProcessor.toDocument(event);
const doc = streamProcessor.toDocument(event);
expect(doc['@timestamp']).toEqual('2020-12-31T23:00:00.000Z');
});
it('formats a nested object', () => {
const doc = StreamProcessor.toDocument(event);
const doc = streamProcessor.toDocument(event);
expect(doc.processor).toEqual({
event: 'transaction',
@ -36,7 +37,7 @@ describe('output apm events to elasticsearch', () => {
});
it('formats all fields consistently', () => {
const doc = StreamProcessor.toDocument(event);
const doc = streamProcessor.toDocument(event);
expect(doc).toMatchInlineSnapshot(`
Object {

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { EntityArrayIterable } from '../lib/entity_iterable';
import { apm } from '../lib/apm';
import { timerange } from '../lib/timerange';
import { ApmFields } from '../lib/apm/apm_fields';
describe('DSL invocations', () => {
let arrayIterable: EntityArrayIterable<ApmFields>;
let eventsCopy: Array<Record<string, any>>;
const range = timerange(
new Date('2021-01-01T00:00:00.000Z'),
new Date('2021-01-01T00:15:00.000Z')
);
const javaService = apm.service('opbeans-java', 'production', 'java');
const javaInstance = javaService.instance('instance-1');
let globalSeq = 0;
const iterable = range
.interval('1m')
.rate(1)
.generator((timestamp, index) =>
javaInstance
.transaction(`GET /api/product/${index}/${globalSeq++}`)
.duration(1000)
.success()
.timestamp(timestamp)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.success()
.duration(900)
.timestamp(timestamp + 50)
)
);
const events = iterable.toArray();
beforeEach(() => {
eventsCopy = iterable.toArray();
arrayIterable = new EntityArrayIterable(events);
});
it('to array on iterable reads to completion', () => {
expect(events.length).toBe(15 * 2);
});
it('calling to array on SpanArrayIterable returns all events', () => {
expect(arrayIterable.toArray().length).toBe(15 * 2);
});
it('calling toArray multiple times always sees all events', () => {
expect(eventsCopy.length).toBe(15 * 2);
});
it('will yield the first peeked value', () => {
expect(events[0]['transaction.name']).toBe('GET /api/product/0/0');
});
it('2nd invocation of toArray sees a new copy of generator invocation', () => {
expect(eventsCopy[0]['transaction.name']).not.toBe('GET /api/product/0/0');
});
it('array iterable holds a copy and will yield the same items', () => {
const copy = arrayIterable.toArray();
expect(events[0]['transaction.name']).toBe(copy[0]['transaction.name']);
});
});

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { EntityArrayIterable, EntityIterable } from '../lib/entity_iterable';
import { apm } from '../lib/apm';
import { timerange } from '../lib/timerange';
import { ApmFields } from '../lib/apm/apm_fields';
describe('rate per minute calculations', () => {
let iterable: EntityIterable<ApmFields>;
let arrayIterable: EntityArrayIterable<ApmFields>;
let events: Array<Record<string, any>>;
const range = timerange(
new Date('2021-01-01T00:00:00.000Z'),
new Date('2021-01-01T00:15:00.000Z')
);
const i1r3 = range.interval('1m').rate(3);
const i5r6 = range.interval('5m').rate(6);
const i30r6 = range.interval('30m').rate(6);
const i1sr3 = range.interval('1s').rate(3);
beforeEach(() => {
const javaService = apm.service('opbeans-java', 'production', 'java');
const javaInstance = javaService.instance('instance-1');
iterable = range
.interval('1m')
.rate(1)
.generator((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
.success()
.timestamp(timestamp)
.children(
javaInstance
.span('GET apm-*/_search', 'db', 'elasticsearch')
.success()
.duration(900)
.timestamp(timestamp + 50)
)
);
events = iterable.toArray();
arrayIterable = new EntityArrayIterable(events);
});
it('array iterable returns exact rate per minute', () => {
expect(arrayIterable.ratePerMinute()).toEqual(2);
});
it('iterable returns rate per minute approximation', () => {
expect(iterable.ratePerMinute()).toEqual(2);
});
it('iterable returns same rate as materialized iterable', () => {
expect(iterable.ratePerMinute()).toEqual(arrayIterable.ratePerMinute());
});
it('interval of 3 per minute returns 3', () => {
expect(i1r3.ratePerMinute()).toEqual(3);
});
it('interval of 6 per 5 minutes returns 6/5', () => {
expect(i5r6.ratePerMinute()).toEqual(6 / 5);
});
it('interval of 6 per 30 minutes returns 6/30', () => {
expect(i30r6.ratePerMinute()).toEqual(6 / 30);
});
it('interval of 3 per second returns 60 * 3', () => {
expect(i1sr3.ratePerMinute()).toEqual(60 * 3);
});
});

View file

@ -6,10 +6,13 @@
* Side Public License, v 1.
*/
import { EntityIterable } from '../..';
import { apm } from '../../lib/apm';
import { ApmFields } from '../../lib/apm/apm_fields';
import { timerange } from '../../lib/timerange';
describe('simple trace', () => {
let iterable: EntityIterable<ApmFields>;
let events: Array<Record<string, any>>;
beforeEach(() => {
@ -21,10 +24,10 @@ describe('simple trace', () => {
new Date('2021-01-01T00:15:00.000Z')
);
events = range
iterable = range
.interval('1m')
.rate(1)
.spans((timestamp) =>
.generator((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
@ -37,11 +40,11 @@ describe('simple trace', () => {
.duration(900)
.timestamp(timestamp + 50)
)
.serialize()
)
.toArray();
);
events = iterable.toArray();
});
// TODO this is not entirely factual, since id's are generated of a global sequence number
it('generates the same data every time', () => {
expect(events).toMatchSnapshot();
});

View file

@ -10,6 +10,7 @@ import { apm } from '../../lib/apm';
import { timerange } from '../../lib/timerange';
import { getTransactionMetrics } from '../../lib/apm/processors/get_transaction_metrics';
import { StreamProcessor } from '../../lib/stream_processor';
import { ApmFields } from '../../lib/apm/apm_fields';
describe('transaction metrics', () => {
let events: Array<Record<string, any>>;
@ -26,7 +27,7 @@ describe('transaction metrics', () => {
const span = (timestamp: number) =>
javaInstance.transaction('GET /api/product/list').duration(1000).timestamp(timestamp);
const processor = new StreamProcessor({
const processor = new StreamProcessor<ApmFields>({
processors: [getTransactionMetrics],
flushInterval: '15m',
});
@ -35,11 +36,11 @@ describe('transaction metrics', () => {
range
.interval('1m')
.rate(25)
.spans((timestamp) => span(timestamp).success().serialize()),
.generator((timestamp) => span(timestamp).success()),
range
.interval('1m')
.rate(50)
.spans((timestamp) => span(timestamp).failure().serialize())
.generator((timestamp) => span(timestamp).failure())
)
.filter((fields) => fields['metricset.name'] === 'transaction');
});

View file

@ -10,6 +10,7 @@ import { apm } from '../../lib/apm';
import { timerange } from '../../lib/timerange';
import { getSpanDestinationMetrics } from '../../lib/apm/processors/get_span_destination_metrics';
import { StreamProcessor } from '../../lib/stream_processor';
import { ApmFields } from '../../lib/apm/apm_fields';
describe('span destination metrics', () => {
let events: Array<Record<string, any>>;
@ -22,13 +23,13 @@ describe('span destination metrics', () => {
new Date('2021-01-01T00:00:00.000Z'),
new Date('2021-01-01T00:15:00.000Z')
);
const processor = new StreamProcessor({ processors: [getSpanDestinationMetrics] });
const processor = new StreamProcessor<ApmFields>({ processors: [getSpanDestinationMetrics] });
events = processor
.streamToArray(
range
.interval('1m')
.rate(25)
.spans((timestamp) =>
.generator((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
@ -42,12 +43,11 @@ describe('span destination metrics', () => {
.destination('elasticsearch')
.success()
)
.serialize()
),
range
.interval('1m')
.rate(50)
.spans((timestamp) =>
.generator((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.duration(1000)
@ -66,7 +66,6 @@ describe('span destination metrics', () => {
.duration(500)
.success()
)
.serialize()
)
)
.filter((fields) => fields['metricset.name'] === 'span_destination');

View file

@ -32,7 +32,7 @@ describe('breakdown metrics', () => {
const listSpans = range
.interval('30s')
.rate(LIST_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.timestamp(timestamp)
@ -44,13 +44,12 @@ describe('breakdown metrics', () => {
.duration(500),
javaInstance.span('GET foo', 'db', 'redis').timestamp(timestamp).duration(100)
)
.serialize()
);
const productPageSpans = range
.interval('30s')
.rate(ID_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
javaInstance
.transaction('GET /api/product/:id')
.timestamp(timestamp)
@ -67,7 +66,6 @@ describe('breakdown metrics', () => {
.duration(100)
)
)
.serialize()
);
const processor = new StreamProcessor({

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { timerange } from '../lib/timerange';
import { Serializable } from '../lib/serializable';
import { Fields } from '../lib/entity';
export type DocFields = Fields & Partial<{ type: string }>;
class Doc extends Serializable<DocFields> {
constructor(type: string) {
super({
type,
});
}
}
describe('Merging streams', () => {
let events: DocFields[];
let types: string[];
const range = timerange(
new Date('2021-01-01T00:00:00.000Z'),
new Date('2021-01-01T00:02:00.000Z')
);
beforeEach(() => {
const iterable = range
.interval('1m')
.rate(1)
.generator(() => new Doc('metric'))
.merge(
range
.interval('1m')
.rate(4)
.generator(() => new Doc('transaction'))
);
events = iterable.toArray();
types = events.map((e) => e.type!);
});
it('metrics yields before transaction event hough it has less weight', () => {
expect(events[0].type).toBe('metric');
});
it('merging data streams uses rate per minute to ensure high volume streams are represented', () => {
expect(types).toEqual([
'metric',
'transaction',
'transaction',
'transaction',
'transaction',
'metric',
'transaction',
'transaction',
'transaction',
'transaction',
]);
});
});

View file

@ -13,25 +13,26 @@ To access an Elasticsearch instance that has live data you have three options:
## 1. Using Synthtrace
**Start Elasticsearch**
**Start Elasticsearch & Kibana**
Elasticsearch:
```
yarn es snapshot
```
**Create APM mappings**
Kibana:
```
node ./scripts/es_archiver load "x-pack/plugins/apm/ftr_e2e/cypress/fixtures/es_archiver/apm_mappings_only_8.0.0" --es-url=http://system_indices_superuser:changeme@localhost:9200 --kibana-url=http://elastic:changeme@localhost:5601 --config=./test/functional/config.js
yarn start
```
*Note: Elasticsearch must be available before running the above command*
**Run Synthtrace**
```
node packages/elastic-apm-synthtrace/src/scripts/run packages/elastic-apm-synthtrace/src/scripts/examples/01_simple_trace.ts --target=http://localhost:9200 --username=elastic --password=changeme
node packages/elastic-apm-synthtrace/src/scripts/run packages/elastic-apm-synthtrace/src/scripts/examples/01_simple_trace.ts \
--local
```
The `--local` flag is a shortcut to specifying `--target` and `--kibana`. It autodiscovers the current kibana basepath and installs the appropiate APM package.
**Connect Kibana to ES**
Update `config/kibana.dev.yml` with:
@ -47,6 +48,16 @@ Documentation for [Synthtrace](https://github.com/elastic/kibana/blob/main/packa
Use the [oblt-cli](https://github.com/elastic/observability-test-environments/blob/master/tools/oblt_cli/README.md) to connect to a cloud-based ES cluster.
**Run Synthtrace**
If you want to bootstrap some data on a cloud instance you can also use the following
```
node packages/elastic-apm-synthtrace/src/scripts/run packages/elastic-apm-synthtrace/src/scripts/examples/01_simple_trace.ts \
--cloudId "myname:<base64string>" \
--maxDocs 100000
```
## 3. Local ES Cluster
### Start Elasticsearch and APM data generators

View file

@ -27,8 +27,8 @@ export function opbeans({ from, to }: { from: number; to: number }) {
return range
.interval('1s')
.rate(1)
.spans((timestamp) => [
...opbeansJava
.generator((timestamp) => [
opbeansJava
.transaction('GET /api/product')
.timestamp(timestamp)
.duration(1000)
@ -43,24 +43,17 @@ export function opbeans({ from, to }: { from: number; to: number }) {
.duration(50)
.success()
.destination('postgresql')
)
.serialize(),
...opbeansNode
),
opbeansNode
.transaction('GET /api/product/:id')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
...opbeansNode
.success(),
opbeansNode
.transaction('Worker job', 'Worker')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize(),
...opbeansRum
.transaction('/')
.timestamp(timestamp)
.duration(1000)
.serialize(),
.success(),
opbeansRum.transaction('/').timestamp(timestamp).duration(1000),
]);
}

View file

@ -40,19 +40,17 @@ function generateData({
return range
.interval('1m')
.rate(1)
.spans((timestamp, index) => [
...service1
.generator((timestamp, index) => [
service1
.transaction('GET /apple 🍎 ')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize(),
...service2
.success(),
service2
.transaction('GET /banana 🍌')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
.success(),
]);
}

View file

@ -21,8 +21,8 @@ export function generateData({ from, to }: { from: number; to: number }) {
return range
.interval('2m')
.rate(1)
.spans((timestamp, index) => [
...opbeansJava
.generator((timestamp, index) => [
opbeansJava
.transaction('GET /apple 🍎 ')
.timestamp(timestamp)
.duration(1000)
@ -31,13 +31,11 @@ export function generateData({ from, to }: { from: number; to: number }) {
opbeansJava
.error(`Error ${index}`, `exception ${index}`)
.timestamp(timestamp)
)
.serialize(),
...opbeansNode
),
opbeansNode
.transaction('GET /banana 🍌')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
.success(),
]);
}

View file

@ -29,18 +29,16 @@ export function generateData({
return range
.interval('2m')
.rate(1)
.spans((timestamp, index) => [
...service1
.generator((timestamp) => [
service1
.transaction('GET /apple 🍎 ')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize(),
...opbeansNode
.success(),
opbeansNode
.transaction('GET /banana 🍌')
.timestamp(timestamp)
.duration(500)
.success()
.serialize(),
.success(),
]);
}

View file

@ -24,7 +24,7 @@ export function generateData({ start, end }: { start: number; end: number }) {
const traceEvents = timerange(start, end)
.interval('1m')
.rate(rate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(transaction.name)
.defaults({
@ -34,7 +34,6 @@ export function generateData({ start, end }: { start: number; end: number }) {
.timestamp(timestamp)
.duration(transaction.duration)
.success()
.serialize()
);
return traceEvents;

View file

@ -7,8 +7,8 @@
import {
apm,
createLogger,
EntityArrayIterable,
LogLevel,
SpanArrayIterable,
} from '@elastic/apm-synthtrace';
import { createEsClientForTesting } from '@kbn/test';
@ -38,16 +38,18 @@ const plugin: Cypress.PluginConfig = (on, config) => {
isCloud: !!config.env.TEST_CLOUD,
});
const forceDataStreams = false;
const synthtraceEsClient = new apm.ApmSynthtraceEsClient(
client,
createLogger(LogLevel.info),
forceDataStreams
{
forceLegacyIndices: true,
refreshAfterIndex: true,
}
);
on('task', {
'synthtrace:index': async (events: Array<Record<string, any>>) => {
await synthtraceEsClient.index(new SpanArrayIterable(events));
await synthtraceEsClient.index(new EntityArrayIterable(events));
return null;
},
'synthtrace:clean': async () => {

View file

@ -4,10 +4,10 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SpanIterable } from '@elastic/apm-synthtrace';
import { EntityIterable } from '@elastic/apm-synthtrace';
export const synthtrace = {
index: (events: SpanIterable) =>
index: (events: EntityIterable) =>
new Promise((resolve) => {
cy.task('synthtrace:index', events.toArray()).then(resolve);
}),

View file

@ -11,6 +11,8 @@ import { InheritedFtrProviderContext } from './ftr_provider_context';
export async function synthtraceEsClientService(context: InheritedFtrProviderContext) {
const es = context.getService('es');
const forceDataStreams = false;
return new apm.ApmSynthtraceEsClient(es, createLogger(LogLevel.info), forceDataStreams);
return new apm.ApmSynthtraceEsClient(es, createLogger(LogLevel.info), {
forceLegacyIndices: true,
refreshAfterIndex: true,
});
}

View file

@ -43,7 +43,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
const events = timerange(new Date(start).getTime(), new Date(end).getTime())
.interval('1m')
.rate(1)
.spans((timestamp) => {
.generator((timestamp) => {
const isInSpike = timestamp >= spikeStart && timestamp < spikeEnd;
const count = isInSpike ? 4 : NORMAL_RATE;
const duration = isInSpike ? 1000 : NORMAL_DURATION;
@ -56,7 +56,6 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.timestamp(timestamp)
.duration(duration)
.outcome(outcome)
.serialize()
),
];
});

View file

@ -103,7 +103,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
const events = timerange(new Date(start).getTime(), new Date(end).getTime())
.interval('1m')
.rate(1)
.spans((timestamp) => {
.generator((timestamp) => {
const isInSpike = timestamp >= spikeStart && timestamp < spikeEnd;
const count = isInSpike ? 4 : NORMAL_RATE;
const duration = isInSpike ? 1000 : NORMAL_DURATION;
@ -116,14 +116,12 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.timestamp(timestamp)
.duration(duration)
.outcome(outcome)
.serialize()
),
...serviceB
serviceB
.transaction('tx', 'Worker')
.timestamp(timestamp)
.duration(duration)
.success()
.serialize(),
.success(),
];
});

View file

@ -32,7 +32,7 @@ export async function generateData({
const traceEvents = timerange(start, end)
.interval('1m')
.rate(coldStartRate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(transactionName)
.defaults({
@ -41,13 +41,12 @@ export async function generateData({
.timestamp(timestamp)
.duration(duration)
.success()
.serialize()
)
.concat(
.merge(
timerange(start, end)
.interval('1m')
.rate(warmStartRate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(transactionName)
.defaults({
@ -56,7 +55,6 @@ export async function generateData({
.timestamp(timestamp)
.duration(duration)
.success()
.serialize()
)
);

View file

@ -39,7 +39,7 @@ export async function generateData({
timerange(start, end)
.interval('1m')
.rate(coldStartRate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(coldStartTransaction.name)
.defaults({
@ -48,12 +48,11 @@ export async function generateData({
.timestamp(timestamp)
.duration(coldStartTransaction.duration)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(warmStartRate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(warmStartTransaction.name)
.defaults({
@ -62,7 +61,6 @@ export async function generateData({
.timestamp(timestamp)
.duration(warmStartTransaction.duration)
.success()
.serialize()
),
];

View file

@ -142,8 +142,8 @@ function generateApmData(synthtrace: ApmSynthtraceEsClient) {
range
.interval('1s')
.rate(1)
.spans((timestamp) =>
instance.transaction('GET /api').timestamp(timestamp).duration(30).success().serialize()
.generator((timestamp) =>
instance.transaction('GET /api').timestamp(timestamp).duration(30).success()
),
]);
}

View file

@ -37,7 +37,7 @@ export async function generateData({
timerange(start, end)
.interval('1m')
.rate(rate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(transaction.name)
.timestamp(timestamp)
@ -51,7 +51,6 @@ export async function generateData({
.destination(span.destination)
.timestamp(timestamp)
)
.serialize()
)
);
}

View file

@ -131,46 +131,42 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_ERROR_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_ERROR_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
]);
});

View file

@ -80,46 +80,42 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList, 'Worker')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_PROD_LIST_ERROR_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList, 'Worker')
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ID_ERROR_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
]);
});

View file

@ -79,48 +79,44 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(appleTransaction.successRate)
.spans((timestamp) =>
.generator((timestamp) =>
serviceInstance
.transaction(appleTransaction.name)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(appleTransaction.failureRate)
.spans((timestamp) =>
.generator((timestamp) =>
serviceInstance
.transaction(appleTransaction.name)
.errors(serviceInstance.error('error 1', 'foo').timestamp(timestamp))
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(bananaTransaction.successRate)
.spans((timestamp) =>
.generator((timestamp) =>
serviceInstance
.transaction(bananaTransaction.name)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(bananaTransaction.failureRate)
.spans((timestamp) =>
.generator((timestamp) =>
serviceInstance
.transaction(bananaTransaction.name)
.errors(serviceInstance.error('error 2', 'bar').timestamp(timestamp))
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
]);
});

View file

@ -41,19 +41,18 @@ export async function generateData({
return timerange(start, end)
.interval(interval)
.rate(transaction.successRate)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transaction.name)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
)
.concat(
.merge(
timerange(start, end)
.interval(interval)
.rate(transaction.failureRate)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transaction.name)
.errors(
@ -62,7 +61,6 @@ export async function generateData({
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
)
);
});

View file

@ -133,22 +133,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(GO_PROD_DURATION)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(GO_DEV_DURATION)
.timestamp(timestamp)
.serialize()
),
]);
});

View file

@ -79,22 +79,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list', 'Worker')
.duration(GO_PROD_DURATION)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(GO_DEV_DURATION)
.timestamp(timestamp)
.serialize()
),
]);
});

View file

@ -106,32 +106,29 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(JAVA_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceJavaInstance
.transaction('POST /api/product/buy')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
]);
});

View file

@ -58,7 +58,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(1)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.appMetrics({
'system.process.cpu.total.norm.pct': 1,
@ -67,7 +67,6 @@ export default function ApiTest({ getService }: FtrProviderContext) {
'jvm.thread.count': 25,
})
.timestamp(timestamp)
.serialize()
)
);
});

View file

@ -319,41 +319,37 @@ export default function ApiTest({ getService }: FtrProviderContext) {
}
return synthtraceEsClient.index([
interval.rate(GO_A_INSTANCE_RATE_SUCCESS).spans((timestamp) =>
interval.rate(GO_A_INSTANCE_RATE_SUCCESS).generator((timestamp) =>
goInstanceA
.transaction('GET /api/product/list')
.success()
.duration(500)
.timestamp(timestamp)
.children(...withSpans(timestamp))
.serialize()
),
interval.rate(GO_A_INSTANCE_RATE_FAILURE).spans((timestamp) =>
interval.rate(GO_A_INSTANCE_RATE_FAILURE).generator((timestamp) =>
goInstanceA
.transaction('GET /api/product/list')
.failure()
.duration(500)
.timestamp(timestamp)
.children(...withSpans(timestamp))
.serialize()
),
interval.rate(GO_B_INSTANCE_RATE_SUCCESS).spans((timestamp) =>
interval.rate(GO_B_INSTANCE_RATE_SUCCESS).generator((timestamp) =>
goInstanceB
.transaction('GET /api/product/list')
.success()
.duration(500)
.timestamp(timestamp)
.children(...withSpans(timestamp))
.serialize()
),
interval.rate(JAVA_INSTANCE_RATE).spans((timestamp) =>
interval.rate(JAVA_INSTANCE_RATE).generator((timestamp) =>
javaInstance
.transaction('GET /api/product/list')
.success()
.duration(500)
.timestamp(timestamp)
.children(...withSpans(timestamp))
.serialize()
),
]);
});

View file

@ -45,48 +45,44 @@ export async function generateData({
timerange(start, end)
.interval('1m')
.rate(PROD_LIST_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(PROD_LIST_ERROR_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductList)
.errors(serviceGoProdInstance.error(ERROR_NAME_1, 'foo').timestamp(timestamp))
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(PROD_ID_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(PROD_ID_ERROR_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionNameProductId)
.errors(serviceGoProdInstance.error(ERROR_NAME_2, 'bar').timestamp(timestamp))
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
]);
}

View file

@ -62,13 +62,12 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(1)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction('GET /api/product/list')
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
)
);
});

View file

@ -71,7 +71,7 @@ export async function generateData({
timerange(start, end)
.interval('30s')
.rate(rate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(transaction.name)
.timestamp(timestamp)
@ -94,12 +94,11 @@ export async function generateData({
})
.duration(transaction.duration)
.success()
.serialize()
),
timerange(start, end)
.interval('30s')
.rate(rate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(transaction.name)
.timestamp(timestamp)
@ -122,7 +121,6 @@ export async function generateData({
})
.duration(transaction.duration)
.success()
.serialize()
),
];

View file

@ -38,7 +38,7 @@ export async function generateData({
const traceEvents = timerange(start, end)
.interval('30s')
.rate(rate)
.spans((timestamp) =>
.generator((timestamp) =>
instance
.transaction(transaction.name)
.defaults({
@ -49,7 +49,6 @@ export async function generateData({
.timestamp(timestamp)
.duration(transaction.duration)
.success()
.serialize()
);
await synthtraceEsClient.index(traceEvents);

View file

@ -67,19 +67,17 @@ export default function ApiTest({ getService }: FtrProviderContext) {
const eventsWithinTimerange = timerange(new Date(start).getTime(), new Date(end).getTime())
.interval('15m')
.rate(1)
.spans((timestamp) => {
.generator((timestamp) => {
const isInSpike = spikeStart <= timestamp && spikeEnd >= timestamp;
return [
...serviceA
serviceA
.transaction('GET /api')
.duration(isInSpike ? 1000 : 1100)
.timestamp(timestamp)
.serialize(),
...serviceB
.timestamp(timestamp),
serviceB
.transaction('GET /api')
.duration(isInSpike ? 1000 : 4000)
.timestamp(timestamp)
.serialize(),
.timestamp(timestamp),
];
});
@ -89,15 +87,11 @@ export default function ApiTest({ getService }: FtrProviderContext) {
)
.interval('15m')
.rate(1)
.spans((timestamp) => {
return serviceC
.transaction('GET /api', 'custom')
.duration(1000)
.timestamp(timestamp)
.serialize();
.generator((timestamp) => {
return serviceC.transaction('GET /api', 'custom').duration(1000).timestamp(timestamp);
});
await synthtraceClient.index(eventsWithinTimerange.concat(eventsOutsideOfTimerange));
await synthtraceClient.index(eventsWithinTimerange.merge(eventsOutsideOfTimerange));
await Promise.all([
createAndRunApmMlJob({ environment: 'production', ml }),

View file

@ -88,32 +88,29 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(JAVA_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceJavaInstance
.transaction('POST /api/product/buy')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
]);
});

View file

@ -98,36 +98,33 @@ export default function ApiTest({ getService }: FtrProviderContext) {
return synthtrace.index([
transactionInterval
.rate(config.multiple.prod.rps)
.spans((timestamp) => [
...multipleEnvServiceProdInstance
.generator((timestamp) =>
multipleEnvServiceProdInstance
.transaction('GET /api')
.timestamp(timestamp)
.duration(config.multiple.prod.duration)
.success()
.serialize(),
]),
),
transactionInterval
.rate(config.multiple.dev.rps)
.spans((timestamp) => [
...multipleEnvServiceDevInstance
.generator((timestamp) =>
multipleEnvServiceDevInstance
.transaction('GET /api')
.timestamp(timestamp)
.duration(config.multiple.dev.duration)
.failure()
.serialize(),
]),
),
transactionInterval
.rate(config.multiple.prod.rps)
.spans((timestamp) => [
...multipleEnvServiceDevInstance
.generator((timestamp) =>
multipleEnvServiceDevInstance
.transaction('non-request', 'rpc')
.timestamp(timestamp)
.duration(config.multiple.prod.duration)
.success()
.serialize(),
]),
metricInterval.rate(1).spans((timestamp) => [
...metricOnlyInstance
),
metricInterval.rate(1).generator((timestamp) =>
metricOnlyInstance
.appMetrics({
'system.memory.actual.free': 1,
'system.cpu.total.norm.pct': 1,
@ -135,13 +132,10 @@ export default function ApiTest({ getService }: FtrProviderContext) {
'system.process.cpu.total.norm.pct': 1,
})
.timestamp(timestamp)
.serialize(),
]),
),
errorInterval
.rate(1)
.spans((timestamp) => [
...errorOnlyInstance.error('Foo').timestamp(timestamp).serialize(),
]),
.generator((timestamp) => errorOnlyInstance.error('Foo').timestamp(timestamp)),
]);
});

View file

@ -104,7 +104,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
@ -130,12 +130,11 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.timestamp(timestamp)
)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(JAVA_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceJavaInstance
.transaction('POST /api/product/buy')
.duration(1000)
@ -153,7 +152,6 @@ export default function ApiTest({ getService }: FtrProviderContext) {
.success()
.timestamp(timestamp)
)
.serialize()
),
]);
});

View file

@ -119,22 +119,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
]);
});

View file

@ -79,22 +79,20 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction('GET /api/product/list', 'Worker')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_DEV_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoDevInstance
.transaction('GET /api/product/:id')
.duration(1000)
.timestamp(timestamp)
.serialize()
),
]);
});

View file

@ -93,24 +93,22 @@ export default function ApiTest({ getService }: FtrProviderContext) {
timerange(start, end)
.interval('1m')
.rate(GO_PROD_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionName)
.timestamp(timestamp)
.duration(1000)
.success()
.serialize()
),
timerange(start, end)
.interval('1m')
.rate(GO_PROD_ERROR_RATE)
.spans((timestamp) =>
.generator((timestamp) =>
serviceGoProdInstance
.transaction(transactionName)
.duration(1000)
.timestamp(timestamp)
.failure()
.serialize()
),
]);
});