[APM] Add live mode to synthtrace (#115988)

This commit is contained in:
Dario Gieselaar 2021-10-23 23:54:21 +02:00 committed by GitHub
parent 003c0f36ec
commit 110a8418f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 750 additions and 136 deletions

View file

@ -25,6 +25,7 @@ NPM_MODULE_EXTRA_FILES = [
]
RUNTIME_DEPS = [
"//packages/elastic-datemath",
"@npm//@elastic/elasticsearch",
"@npm//lodash",
"@npm//moment",
@ -36,6 +37,7 @@ RUNTIME_DEPS = [
]
TYPES_DEPS = [
"//packages/elastic-datemath",
"@npm//@elastic/elasticsearch",
"@npm//moment",
"@npm//p-limit",

View file

@ -11,7 +11,7 @@ This section assumes that you've installed Kibana's dependencies by running `yar
This library can currently be used in two ways:
- Imported as a Node.js module, for instance to be used in Kibana's functional test suite.
- With a command line interface, to index data based on some example scenarios.
- With a command line interface, to index data based on a specified scenario.
### Using the Node.js module
@ -32,7 +32,7 @@ const instance = service('synth-go', 'production', 'go')
.instance('instance-a');
const from = new Date('2021-01-01T12:00:00.000Z').getTime();
const to = new Date('2021-01-01T12:00:00.000Z').getTime() - 1;
const to = new Date('2021-01-01T12:00:00.000Z').getTime();
const traceEvents = timerange(from, to)
.interval('1m')
@ -82,12 +82,26 @@ const esEvents = toElasticsearchOutput([
### CLI
Via the CLI, you can upload examples. The supported examples are listed in `src/lib/es.ts`. A `--target` option that specifies the Elasticsearch URL should be defined when running the `example` command. Here's an example:
Via the CLI, you can upload scenarios, either using a fixed time range or continuously generating data. Some examples are available in in `src/scripts/examples`. Here's an example for live data:
`$ node packages/elastic-apm-generator/src/scripts/es.js example simple-trace --target=http://admin:changeme@localhost:9200`
`$ node packages/elastic-apm-generator/src/scripts/run packages/elastic-apm-generator/src/examples/01_simple_trace.ts --target=http://admin:changeme@localhost:9200 --live`
For a fixed time window:
`$ node packages/elastic-apm-generator/src/scripts/run packages/elastic-apm-generator/src/examples/01_simple_trace.ts --target=http://admin:changeme@localhost:9200 --from=now-24h --to=now`
The script will try to automatically find bootstrapped APM indices. __If these indices do not exist, the script will exit with an error. It will not bootstrap the indices itself.__
The following options are supported:
- `to`: the end of the time range, in ISO format. By default, the current time will be used.
- `from`: the start of the time range, in ISO format. By default, `to` minus 15 minutes will be used.
- `apm-server-version`: the version used in the index names bootstrapped by APM Server, e.g. `7.16.0`. __If these indices do not exist, the script will exit with an error. It will not bootstrap the indices itself.__
| Option | Description | Default |
| -------------- | ------------------------------------------------------- | ------------ |
| `--from` | The start of the time window. | `now - 15m` |
| `--to` | The end of the time window. | `now` |
| `--live` | Continously ingest data | `false` |
| `--bucketSize` | Size of bucket for which to generate data. | `15m` |
| `--clean` | Clean APM indices before indexing new data. | `false` |
| `--interval` | The interval at which to index data. | `10s` |
| `--logLevel` | Log level. | `info` |
| `--lookback` | The lookback window for which data should be generated. | `15m` |
| `--target` | Elasticsearch target, including username/password. | **Required** |
| `--workers` | Amount of simultaneously connected ES clients. | `1` |

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
module.exports = {
rules: {
'import/no-default-export': 'off',
},
};

View file

@ -21,7 +21,7 @@ export class Interval {
throw new Error('Failed to parse interval');
}
const timestamps: number[] = [];
while (now <= this.to) {
while (now < this.to) {
timestamps.push(...new Array<number>(rate).fill(now));
now = moment(now)
.add(Number(args[1]), args[2] as any)

View file

@ -10,7 +10,25 @@ import { set } from 'lodash';
import { getObserverDefaults } from '../..';
import { Fields } from '../entity';
export function toElasticsearchOutput(events: Fields[], versionOverride?: string) {
export interface ElasticsearchOutput {
_index: string;
_source: unknown;
}
export interface ElasticsearchOutputWriteTargets {
transaction: string;
span: string;
error: string;
metric: string;
}
export function toElasticsearchOutput({
events,
writeTargets,
}: {
events: Fields[];
writeTargets: ElasticsearchOutputWriteTargets;
}): ElasticsearchOutput[] {
return events.map((event) => {
const values = {
...event,
@ -29,7 +47,7 @@ export function toElasticsearchOutput(events: Fields[], versionOverride?: string
set(document, key, val);
}
return {
_index: `apm-${versionOverride || values['observer.version']}-${values['processor.event']}`,
_index: writeTargets[event['processor.event'] as keyof ElasticsearchOutputWriteTargets],
_source: document,
};
});

View file

@ -1,113 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { inspect } from 'util';
import { Client } from '@elastic/elasticsearch';
import { chunk } from 'lodash';
import pLimit from 'p-limit';
import yargs from 'yargs/yargs';
import { toElasticsearchOutput } from '..';
import { simpleTrace } from './examples/01_simple_trace';
yargs(process.argv.slice(2))
.command(
'example',
'run an example scenario',
(y) => {
return y
.positional('scenario', {
describe: 'scenario to run',
choices: ['simple-trace'],
demandOption: true,
})
.option('target', {
describe: 'elasticsearch target, including username/password',
})
.option('from', { describe: 'start of timerange' })
.option('to', { describe: 'end of timerange' })
.option('workers', {
default: 1,
describe: 'number of concurrently connected ES clients',
})
.option('apm-server-version', {
describe: 'APM Server version override',
})
.demandOption('target');
},
(argv) => {
let events: any[] = [];
const toDateString = (argv.to as string | undefined) || new Date().toISOString();
const fromDateString =
(argv.from as string | undefined) ||
new Date(new Date(toDateString).getTime() - 15 * 60 * 1000).toISOString();
const to = new Date(toDateString).getTime();
const from = new Date(fromDateString).getTime();
switch (argv._[1]) {
case 'simple-trace':
events = simpleTrace(from, to);
break;
}
const docs = toElasticsearchOutput(events, argv['apm-server-version'] as string);
const client = new Client({
node: argv.target as string,
});
const fn = pLimit(argv.workers);
const batches = chunk(docs, 1000);
// eslint-disable-next-line no-console
console.log(
'Uploading',
docs.length,
'docs in',
batches.length,
'batches',
'from',
fromDateString,
'to',
toDateString
);
Promise.all(
batches.map((batch) =>
fn(() => {
return client.bulk({
require_alias: true,
body: batch.flatMap((doc) => {
return [{ index: { _index: doc._index } }, doc._source];
}),
});
})
)
)
.then((results) => {
const errors = results
.flatMap((result) => result.body.items)
.filter((item) => !!item.index?.error)
.map((item) => item.index?.error);
if (errors.length) {
// eslint-disable-next-line no-console
console.error(inspect(errors.slice(0, 10), { depth: null }));
throw new Error('Failed to upload some items');
}
process.exit();
})
.catch((err) => {
// eslint-disable-next-line no-console
console.error(err);
process.exit(1);
});
}
)
.parse();

View file

@ -9,12 +9,12 @@
import { service, timerange, getTransactionMetrics, getSpanDestinationMetrics } from '../..';
import { getBreakdownMetrics } from '../../lib/utils/get_breakdown_metrics';
export function simpleTrace(from: number, to: number) {
export default function ({ from, to }: { from: number; to: number }) {
const instance = service('opbeans-go', 'production', 'go').instance('instance');
const range = timerange(from, to);
const transactionName = '240rpm/60% 1000ms';
const transactionName = '240rpm/75% 1000ms';
const successfulTraceEvents = range
.interval('1s')

View file

@ -12,4 +12,4 @@ require('@babel/register')({
presets: [['@babel/preset-env', { targets: { node: 'current' } }], '@babel/preset-typescript'],
});
require('./es.ts');
require('./run.ts');

View file

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import datemath from '@elastic/datemath';
import yargs from 'yargs/yargs';
import { cleanWriteTargets } from './utils/clean_write_targets';
import {
bucketSizeOption,
cleanOption,
fileOption,
intervalOption,
targetOption,
workerOption,
logLevelOption,
} from './utils/common_options';
import { intervalToMs } from './utils/interval_to_ms';
import { getCommonResources } from './utils/get_common_resources';
import { startHistoricalDataUpload } from './utils/start_historical_data_upload';
import { startLiveDataUpload } from './utils/start_live_data_upload';
yargs(process.argv.slice(2))
.command(
'*',
'Generate data and index into Elasticsearch',
(y) => {
return y
.positional('file', fileOption)
.option('bucketSize', bucketSizeOption)
.option('workers', workerOption)
.option('interval', intervalOption)
.option('clean', cleanOption)
.option('target', targetOption)
.option('logLevel', logLevelOption)
.option('from', {
description: 'The start of the time window',
})
.option('to', {
description: 'The end of the time window',
})
.option('live', {
description: 'Generate and index data continuously',
boolean: true,
})
.conflicts('to', 'live');
},
async (argv) => {
const {
scenario,
intervalInMs,
bucketSizeInMs,
target,
workers,
clean,
logger,
writeTargets,
client,
} = await getCommonResources(argv);
if (clean) {
await cleanWriteTargets({ writeTargets, client, logger });
}
const to = datemath.parse(String(argv.to ?? 'now'))!.valueOf();
const from = argv.from
? datemath.parse(String(argv.from))!.valueOf()
: to - intervalToMs('15m');
const live = argv.live;
logger.info(
`Starting data generation\n: ${JSON.stringify(
{
intervalInMs,
bucketSizeInMs,
workers,
target,
writeTargets,
from: new Date(from).toISOString(),
to: new Date(to).toISOString(),
live,
},
null,
2
)}`
);
startHistoricalDataUpload({
from,
to,
scenario,
intervalInMs,
bucketSizeInMs,
client,
workers,
writeTargets,
logger,
});
if (live) {
startLiveDataUpload({
bucketSizeInMs,
client,
intervalInMs,
logger,
scenario,
start: to,
workers,
writeTargets,
});
}
}
)
.parse();

View file

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ElasticsearchOutputWriteTargets } from '../../lib/output/to_elasticsearch_output';
import { Logger } from './logger';
export async function cleanWriteTargets({
writeTargets,
client,
logger,
}: {
writeTargets: ElasticsearchOutputWriteTargets;
client: Client;
logger: Logger;
}) {
const targets = Object.values(writeTargets);
logger.info(`Cleaning indices: ${targets.join(', ')}`);
const response = await client.deleteByQuery({
index: targets,
allow_no_indices: true,
conflicts: 'proceed',
body: {
query: {
match_all: {},
},
},
wait_for_completion: false,
});
const task = response.body.task;
if (task) {
await new Promise<void>((resolve, reject) => {
const pollForTaskCompletion = async () => {
const taskResponse = await client.tasks.get({
task_id: String(task),
});
logger.debug(
`Polled for task:\n${JSON.stringify(taskResponse.body, ['completed', 'error'], 2)}`
);
if (taskResponse.body.completed) {
resolve();
} else if (taskResponse.body.error) {
reject(taskResponse.body.error);
} else {
setTimeout(pollForTaskCompletion, 2500);
}
};
pollForTaskCompletion();
});
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
const fileOption = {
describe: 'File that contains the trace scenario',
demandOption: true,
};
const intervalOption = {
describe: 'The interval at which to index data',
default: '10s',
};
const targetOption = {
describe: 'Elasticsearch target, including username/password',
demandOption: true,
};
const bucketSizeOption = {
describe: 'Size of bucket for which to generate data',
default: '15m',
};
const workerOption = {
describe: 'Amount of simultaneously connected ES clients',
default: 1,
};
const cleanOption = {
describe: 'Clean APM indices before indexing new data',
default: false,
boolean: true as const,
};
const logLevelOption = {
describe: 'Log level',
default: 'info',
};
export {
fileOption,
intervalOption,
targetOption,
bucketSizeOption,
workerOption,
cleanOption,
logLevelOption,
};

View file

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { getScenario } from './get_scenario';
import { getWriteTargets } from './get_write_targets';
import { intervalToMs } from './interval_to_ms';
import { createLogger, LogLevel } from './logger';
export async function getCommonResources({
file,
interval,
bucketSize,
workers,
target,
clean,
logLevel,
}: {
file: unknown;
interval: unknown;
bucketSize: unknown;
workers: unknown;
target: unknown;
clean: boolean;
logLevel: unknown;
}) {
let parsedLogLevel = LogLevel.info;
switch (logLevel) {
case 'info':
parsedLogLevel = LogLevel.info;
break;
case 'debug':
parsedLogLevel = LogLevel.debug;
break;
case 'quiet':
parsedLogLevel = LogLevel.quiet;
break;
}
const logger = createLogger(parsedLogLevel);
const intervalInMs = intervalToMs(interval);
if (!intervalInMs) {
throw new Error('Invalid interval');
}
const bucketSizeInMs = intervalToMs(bucketSize);
if (!bucketSizeInMs) {
throw new Error('Invalid bucket size');
}
const client = new Client({
node: String(target),
});
const [scenario, writeTargets] = await Promise.all([
getScenario({ file, logger }),
getWriteTargets({ client }),
]);
return {
scenario,
writeTargets,
logger,
client,
intervalInMs,
bucketSizeInMs,
workers: Number(workers),
target: String(target),
clean,
};
}

View file

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Path from 'path';
import { Fields } from '../../lib/entity';
import { Logger } from './logger';
export type Scenario = (options: { from: number; to: number }) => Fields[];
export function getScenario({ file, logger }: { file: unknown; logger: Logger }) {
const location = Path.join(process.cwd(), String(file));
logger.debug(`Loading scenario from ${location}`);
return import(location).then((m) => {
if (m && m.default) {
return m.default;
}
throw new Error(`Could not find scenario at ${location}`);
}) as Promise<Scenario>;
}

View file

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ElasticsearchOutputWriteTargets } from '../../lib/output/to_elasticsearch_output';
export async function getWriteTargets({
client,
}: {
client: Client;
}): Promise<ElasticsearchOutputWriteTargets> {
const [indicesResponse, datastreamsResponse] = await Promise.all([
client.indices.getAlias({
index: 'apm-*',
}),
client.indices.getDataStream({
name: '*apm',
}),
]);
function getDataStreamName(filter: string) {
return datastreamsResponse.body.data_streams.find((stream) => stream.name.includes(filter))
?.name;
}
function getAlias(filter: string) {
return Object.keys(indicesResponse.body)
.map((key) => {
return {
key,
writeIndexAlias: Object.entries(indicesResponse.body[key].aliases).find(
([_, alias]) => alias.is_write_index
)?.[0],
};
})
.find(({ key }) => key.includes(filter))?.writeIndexAlias!;
}
const targets = {
transaction: getDataStreamName('traces-apm') || getAlias('-transaction'),
span: getDataStreamName('traces-apm') || getAlias('-span'),
metric: getDataStreamName('metrics-apm') || getAlias('-metric'),
error: getDataStreamName('logs-apm') || getAlias('-error'),
};
if (!targets.transaction || !targets.span || !targets.metric || !targets.error) {
throw new Error('Write targets could not be determined');
}
return targets;
}

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export function intervalToMs(interval: unknown) {
const [, valueAsString, unit] = String(interval).split(/(.*)(s|m|h|d|w)/);
const value = Number(valueAsString);
switch (unit) {
case 's':
return value * 1000;
case 'm':
return value * 1000 * 60;
case 'h':
return value * 1000 * 60 * 60;
case 'd':
return value * 1000 * 60 * 60 * 24;
case 'w':
return value * 1000 * 60 * 60 * 24 * 7;
}
throw new Error('Could not parse interval');
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export enum LogLevel {
debug = 0,
info = 1,
quiet = 2,
}
export function createLogger(logLevel: LogLevel) {
return {
debug: (...args: any[]) => {
if (logLevel <= LogLevel.debug) {
// eslint-disable-next-line no-console
console.debug(...args);
}
},
info: (...args: any[]) => {
if (logLevel <= LogLevel.info) {
// eslint-disable-next-line no-console
console.log(...args);
}
},
};
}
export type Logger = ReturnType<typeof createLogger>;

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ElasticsearchOutputWriteTargets } from '../../lib/output/to_elasticsearch_output';
import { Scenario } from './get_scenario';
import { Logger } from './logger';
import { uploadEvents } from './upload_events';
export async function startHistoricalDataUpload({
from,
to,
scenario,
intervalInMs,
bucketSizeInMs,
client,
workers,
writeTargets,
logger,
}: {
from: number;
to: number;
scenario: Scenario;
intervalInMs: number;
bucketSizeInMs: number;
client: Client;
workers: number;
writeTargets: ElasticsearchOutputWriteTargets;
logger: Logger;
}) {
let requestedUntil: number = from;
function uploadNextBatch() {
const bucketFrom = requestedUntil;
const bucketTo = Math.min(to, bucketFrom + bucketSizeInMs);
const events = scenario({ from: bucketFrom, to: bucketTo });
logger.info(
`Uploading: ${new Date(bucketFrom).toISOString()} to ${new Date(bucketTo).toISOString()}`
);
uploadEvents({
events,
client,
workers,
writeTargets,
logger,
}).then(() => {
if (bucketTo >= to) {
return;
}
uploadNextBatch();
});
requestedUntil = bucketTo;
}
return uploadNextBatch();
}

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { partition } from 'lodash';
import { Fields } from '../../lib/entity';
import { ElasticsearchOutputWriteTargets } from '../../lib/output/to_elasticsearch_output';
import { Scenario } from './get_scenario';
import { Logger } from './logger';
import { uploadEvents } from './upload_events';
export function startLiveDataUpload({
start,
bucketSizeInMs,
intervalInMs,
workers,
writeTargets,
scenario,
client,
logger,
}: {
start: number;
bucketSizeInMs: number;
intervalInMs: number;
workers: number;
writeTargets: ElasticsearchOutputWriteTargets;
scenario: Scenario;
client: Client;
logger: Logger;
}) {
let queuedEvents: Fields[] = [];
let requestedUntil: number = start;
function uploadNextBatch() {
const end = new Date().getTime();
if (end > requestedUntil) {
const bucketFrom = requestedUntil;
const bucketTo = requestedUntil + bucketSizeInMs;
const nextEvents = scenario({ from: bucketFrom, to: bucketTo });
logger.debug(
`Requesting ${new Date(bucketFrom).toISOString()} to ${new Date(
bucketTo
).toISOString()}, events: ${nextEvents.length}`
);
queuedEvents.push(...nextEvents);
requestedUntil = bucketTo;
}
const [eventsToUpload, eventsToRemainInQueue] = partition(
queuedEvents,
(event) => event['@timestamp']! <= end
);
logger.info(`Uploading until ${new Date(end).toISOString()}, events: ${eventsToUpload.length}`);
queuedEvents = eventsToRemainInQueue;
uploadEvents({
events: eventsToUpload,
client,
workers,
writeTargets,
logger,
});
}
setInterval(uploadNextBatch, intervalInMs);
uploadNextBatch();
}

View file

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { chunk } from 'lodash';
import pLimit from 'p-limit';
import { inspect } from 'util';
import { Fields } from '../../lib/entity';
import {
ElasticsearchOutputWriteTargets,
toElasticsearchOutput,
} from '../../lib/output/to_elasticsearch_output';
import { Logger } from './logger';
export function uploadEvents({
events,
client,
workers,
writeTargets,
logger,
}: {
events: Fields[];
client: Client;
workers: number;
writeTargets: ElasticsearchOutputWriteTargets;
logger: Logger;
}) {
const esDocuments = toElasticsearchOutput({ events, writeTargets });
const fn = pLimit(workers);
const batches = chunk(esDocuments, 5000);
logger.debug(`Uploading ${esDocuments.length} in ${batches.length} batches`);
const time = new Date().getTime();
return Promise.all(
batches.map((batch) =>
fn(() => {
return client.bulk({
require_alias: true,
body: batch.flatMap((doc) => {
return [{ index: { _index: doc._index } }, doc._source];
}),
});
})
)
)
.then((results) => {
const errors = results
.flatMap((result) => result.body.items)
.filter((item) => !!item.index?.error)
.map((item) => item.index?.error);
if (errors.length) {
// eslint-disable-next-line no-console
console.error(inspect(errors.slice(0, 10), { depth: null }));
throw new Error('Failed to upload some items');
}
logger.debug(`Uploaded ${events.length} in ${new Date().getTime() - time}ms`);
})
.catch((err) => {
// eslint-disable-next-line no-console
console.error(err);
process.exit(1);
});
}

View file

@ -18,7 +18,7 @@ describe('simple trace', () => {
const range = timerange(
new Date('2021-01-01T00:00:00.000Z').getTime(),
new Date('2021-01-01T00:15:00.000Z').getTime() - 1
new Date('2021-01-01T00:15:00.000Z').getTime()
);
events = range

View file

@ -19,7 +19,7 @@ describe('transaction metrics', () => {
const range = timerange(
new Date('2021-01-01T00:00:00.000Z').getTime(),
new Date('2021-01-01T00:15:00.000Z').getTime() - 1
new Date('2021-01-01T00:15:00.000Z').getTime()
);
events = getTransactionMetrics(

View file

@ -19,7 +19,7 @@ describe('span destination metrics', () => {
const range = timerange(
new Date('2021-01-01T00:00:00.000Z').getTime(),
new Date('2021-01-01T00:15:00.000Z').getTime() - 1
new Date('2021-01-01T00:15:00.000Z').getTime()
);
events = getSpanDestinationMetrics(

View file

@ -26,7 +26,7 @@ describe('breakdown metrics', () => {
const start = new Date('2021-01-01T00:00:00.000Z').getTime();
const range = timerange(start, start + INTERVALS * 30 * 1000 - 1);
const range = timerange(start, start + INTERVALS * 30 * 1000);
events = getBreakdownMetrics([
...range

View file

@ -9,6 +9,13 @@
import { Fields } from '../lib/entity';
import { toElasticsearchOutput } from '../lib/output/to_elasticsearch_output';
const writeTargets = {
transaction: 'apm-8.0.0-transaction',
span: 'apm-8.0.0-span',
metric: 'apm-8.0.0-metric',
error: 'apm-8.0.0-error',
};
describe('output to elasticsearch', () => {
let event: Fields;
@ -21,13 +28,13 @@ describe('output to elasticsearch', () => {
});
it('properly formats @timestamp', () => {
const doc = toElasticsearchOutput([event])[0] as any;
const doc = toElasticsearchOutput({ events: [event], writeTargets })[0] as any;
expect(doc._source['@timestamp']).toEqual('2020-12-31T23:00:00.000Z');
});
it('formats a nested object', () => {
const doc = toElasticsearchOutput([event])[0] as any;
const doc = toElasticsearchOutput({ events: [event], writeTargets })[0] as any;
expect(doc._source.processor).toEqual({
event: 'transaction',

View file

@ -20,15 +20,20 @@ export async function traceData(context: InheritedFtrProviderContext) {
const es = context.getService('es');
return {
index: (events: any[]) => {
const esEvents = toElasticsearchOutput(
[
const esEvents = toElasticsearchOutput({
events: [
...events,
...getTransactionMetrics(events),
...getSpanDestinationMetrics(events),
...getBreakdownMetrics(events),
],
'7.14.0'
);
writeTargets: {
transaction: 'apm-7.14.0-transaction',
span: 'apm-7.14.0-span',
error: 'apm-7.14.0-error',
metric: 'apm-7.14.0-metric',
},
});
const batches = chunk(esEvents, 1000);
const limiter = pLimit(1);