Introduces StreamAggregator to Synththrace (#130902)

* Introduces StreamAggregator

This allows us to write 'true' stream processing aggregations.

Implementations of `StreamAggregator` can self bootstrap new
datastreams/timeseries and route data to this new locations

* if a stream aggregator returns dimensions setup timeseries otherwise datastream

* rename worker.ts to allign with new naming rules

* Pick fields from ApmFields

(cherry picked from commit 0147c683d2ccda2953fcbf5ef24a801cdf34a5dd)

* include service.environment and transaction.type as dimensions

(cherry picked from commit 2f0b6044eef768349613fabf8a250cfc0375bc7b)

* rename service.latency to transaction.duration.aggregate

(cherry picked from commit f4d8b17302be9dd56e4c518fcc8919a998b1c40b)

* removed unnecessary intermediate method createFieldsFromState() in favor of flush()

(cherry picked from commit 6e3f5cd6dc898214740d1b483c7dc29839514695)

* ensure we flush previously held range if current event exceeds max window age

(cherry picked from commit 55a52f1d592a67511782c7522c69836a615c0d93)

* move the processor.name to 'metric' for now

(cherry picked from commit 480bbe4120937c4e2cd597ac61a9ee279df42a89)

* clean aggregator stream with wildcard for namespace

(cherry picked from commit 9fb7905dfbaa9b3cd906411ec721e8794655fc98)

* add apm-ui as codeowners of synthtrace

* metricset is not always set should not throw an error when determining writetarget

* safeguard check for max window age

* safeguard incrementing state

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Martijn Laarman 2022-05-05 14:53:43 +02:00 committed by GitHub
parent 6dc4f7b3cf
commit e923d92b3c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 311 additions and 18 deletions

1
.github/CODEOWNERS vendored
View file

@ -128,6 +128,7 @@
/src/apm.js @elastic/kibana-core @vigneshshanmugam
/packages/kbn-apm-config-loader/ @elastic/kibana-core @vigneshshanmugam
/src/core/types/elasticsearch @elastic/apm-ui
/packages/elastic-apm-synthtrace/ @elastic/apm-ui
#CC# /src/plugins/apm_oss/ @elastic/apm-ui
#CC# /x-pack/plugins/observability/ @elastic/apm-ui

View file

@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { random } from 'lodash';
import { Client } from '@elastic/elasticsearch';
import { ApmFields } from '../apm_fields';
import { Fields } from '../../entity';
import { StreamAggregator } from '../../stream_aggregator';
type LatencyState = {
count: number;
min: number;
max: number;
sum: number;
timestamp: number;
} & Pick<ApmFields, 'service.name' | 'service.environment' | 'transaction.type'>;
export type ServiceFields = Fields &
Pick<
ApmFields,
| 'timestamp.us'
| 'ecs.version'
| 'metricset.name'
| 'observer'
| 'processor.event'
| 'processor.name'
| 'service.name'
| 'service.version'
| 'service.environment'
| 'transaction.type'
> &
Partial<{
'transaction.duration.aggregate': {
min: number;
max: number;
sum: number;
value_count: number;
};
}>;
export class ServiceLatencyAggregator implements StreamAggregator<ApmFields> {
public readonly name;
constructor() {
this.name = 'service-latency';
}
getDataStreamName(): string {
return 'metrics-apm.service';
}
getMappings(): Record<string, any> {
return {
properties: {
'@timestamp': {
type: 'date',
format: 'date_optional_time||epoch_millis',
},
transaction: {
type: 'object',
properties: {
type: { type: 'keyword', time_series_dimension: true },
duration: {
type: 'object',
properties: {
aggregate: {
type: 'aggregate_metric_double',
metrics: ['min', 'max', 'sum', 'value_count'],
default_metric: 'sum',
time_series_metric: 'gauge',
},
},
},
},
},
service: {
type: 'object',
properties: {
name: { type: 'keyword', time_series_dimension: true },
environment: { type: 'keyword', time_series_dimension: true },
},
},
},
};
}
getDimensions(): string[] {
return ['service.name', 'service.environment', 'transaction.type'];
}
getWriteTarget(document: Record<string, any>): string | null {
const eventType = document.metricset?.name;
if (eventType === 'service') return 'metrics-apm.service-default';
return null;
}
private state: Record<string, LatencyState> = {};
private processedComponent: number = 0;
process(event: ApmFields): Fields[] | null {
if (!event['@timestamp']) return null;
const service = event['service.name']!;
const environment = event['service.environment'] ?? 'production';
const transactionType = event['transaction.type'] ?? 'request';
const key = `${service}-${environment}-${transactionType}`;
const addToState = (timestamp: number) => {
if (!this.state[key]) {
this.state[key] = {
timestamp,
count: 0,
min: 0,
max: 0,
sum: 0,
'service.name': service,
'service.environment': environment,
'transaction.type': transactionType,
};
}
const duration = Number(event['transaction.duration.us']);
if (duration >= 0) {
const state = this.state[key];
state.count++;
state.sum += duration;
if (duration > state.max) state.max = duration;
if (duration < state.min) state.min = Math.min(0, duration);
}
};
// ensure we flush current state first if event falls out of the current max window age
if (this.state[key]) {
const diff = Math.abs(event['@timestamp'] - this.state[key].timestamp);
if (diff >= 1000 * 60) {
const fields = this.createServiceFields(key);
delete this.state[key];
addToState(event['@timestamp']);
return [fields];
}
}
addToState(event['@timestamp']);
// if cardinality is too high force emit of current state
if (Object.keys(this.state).length === 1000) {
return this.flush();
}
return null;
}
flush(): Fields[] {
const fields = Object.keys(this.state).map((key) => this.createServiceFields(key));
this.state = {};
return fields;
}
private createServiceFields(key: string): ServiceFields {
this.processedComponent = ++this.processedComponent % 1000;
const component = Date.now() % 100;
const state = this.state[key];
return {
'@timestamp': state.timestamp + random(0, 100) + component + this.processedComponent,
'metricset.name': 'service',
'processor.event': 'metric',
'service.name': state['service.name'],
'service.environment': state['service.environment'],
'transaction.type': state['transaction.type'],
'transaction.duration.aggregate': {
min: state.min,
max: state.max,
sum: state.sum,
value_count: state.count,
},
};
}
async bootstrapElasticsearch(esClient: Client): Promise<void> {}
}

View file

@ -7,6 +7,7 @@
*/
import { Client } from '@elastic/elasticsearch';
import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types';
import { cleanWriteTargets } from '../../utils/clean_write_targets';
import { getApmWriteTargets } from '../utils/get_apm_write_targets';
import { Logger } from '../../utils/create_logger';
@ -15,6 +16,7 @@ import { EntityIterable } from '../../entity_iterable';
import { StreamProcessor } from '../../stream_processor';
import { EntityStreams } from '../../entity_streams';
import { Fields } from '../../entity';
import { StreamAggregator } from '../../stream_aggregator';
export interface StreamToBulkOptions<TFields extends Fields = ApmFields> {
concurrency?: number;
@ -57,7 +59,7 @@ export class ApmSynthtraceEsClient {
return info.version.number;
}
async clean() {
async clean(dataStreams?: string[]) {
return this.getWriteTargets().then(async (writeTargets) => {
const indices = Object.values(writeTargets);
this.logger.info(`Attempting to clean: ${indices}`);
@ -68,7 +70,7 @@ export class ApmSynthtraceEsClient {
logger: this.logger,
});
}
for (const name of indices) {
for (const name of indices.concat(dataStreams ?? [])) {
const dataStream = await this.client.indices.getDataStream({ name }, { ignore: [404] });
if (dataStream.data_streams && dataStream.data_streams.length > 0) {
this.logger.debug(`Deleting datastream: ${name}`);
@ -149,7 +151,6 @@ export class ApmSynthtraceEsClient {
streamProcessor?: StreamProcessor
) {
const dataStream = Array.isArray(events) ? new EntityStreams(events) : events;
const sp =
streamProcessor != null
? streamProcessor
@ -165,7 +166,7 @@ export class ApmSynthtraceEsClient {
await this.logger.perf('enumerate_scenario', async () => {
// @ts-ignore
// We just want to enumerate
for await (item of sp.streamToDocumentAsync(sp.toDocument, dataStream)) {
for await (item of sp.streamToDocumentAsync((e) => sp.toDocument(e), dataStream)) {
if (yielded === 0) {
options.itemStartStopCallback?.apply(this, [item, false]);
yielded++;
@ -185,7 +186,7 @@ export class ApmSynthtraceEsClient {
flushBytes: 500000,
// TODO https://github.com/elastic/elasticsearch-js/issues/1610
// having to map here is awkward, it'd be better to map just before serialization.
datasource: sp.streamToDocumentAsync(sp.toDocument, dataStream),
datasource: sp.streamToDocumentAsync((e) => sp.toDocument(e), dataStream),
onDrop: (doc) => {
this.logger.info(JSON.stringify(doc, null, 2));
},
@ -197,11 +198,12 @@ export class ApmSynthtraceEsClient {
options?.itemStartStopCallback?.apply(this, [item, false]);
yielded++;
}
const index = options?.mapToIndex
? options?.mapToIndex(item)
: !this.forceLegacyIndices
? StreamProcessor.getDataStreamForEvent(item, writeTargets)
: StreamProcessor.getIndexForEvent(item, writeTargets);
let index = options?.mapToIndex ? options?.mapToIndex(item) : null;
if (!index) {
index = !this.forceLegacyIndices
? sp.getDataStreamForEvent(item, writeTargets)
: StreamProcessor.getIndexForEvent(item, writeTargets);
}
return { create: { _index: index } };
},
});
@ -211,4 +213,53 @@ export class ApmSynthtraceEsClient {
await this.refresh();
}
}
async createDataStream(aggregator: StreamAggregator) {
const datastreamName = aggregator.getDataStreamName();
const mappings = aggregator.getMappings();
const dimensions = aggregator.getDimensions();
const indexSettings: IndicesIndexSettings = { lifecycle: { name: 'metrics' } };
if (dimensions.length > 0) {
indexSettings.mode = 'time_series';
indexSettings.routing_path = dimensions;
}
await this.client.cluster.putComponentTemplate({
name: `${datastreamName}-mappings`,
template: {
mappings,
},
_meta: {
description: `Mappings for ${datastreamName}-*`,
},
});
this.logger.info(`Created mapping component template for ${datastreamName}-*`);
await this.client.cluster.putComponentTemplate({
name: `${datastreamName}-settings`,
template: {
settings: {
index: indexSettings,
},
},
_meta: {
description: `Settings for ${datastreamName}-*`,
},
});
this.logger.info(`Created settings component template for ${datastreamName}-*`);
await this.client.indices.putIndexTemplate({
name: `${datastreamName}-index_template`,
index_patterns: [`${datastreamName}-*`],
data_stream: {},
composed_of: [`${datastreamName}-mappings`, `${datastreamName}-settings`],
priority: 500,
});
this.logger.info(`Created index template for ${datastreamName}-*`);
await this.client.indices.createDataStream({ name: datastreamName + '-default' });
await aggregator.bootstrapElasticsearch(this.client);
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ApmFields, Fields } from '..';
export interface StreamAggregator<TFields extends Fields = ApmFields> {
name: string;
getWriteTarget(document: Record<string, any>): string | null;
process(event: TFields): Fields[] | null;
flush(): Fields[];
bootstrapElasticsearch(esClient: Client): Promise<void>;
getDataStreamName(): string;
getDimensions(): string[];
getMappings(): Record<string, any>;
}

View file

@ -17,10 +17,12 @@ import { dedot } from './utils/dedot';
import { ApmElasticsearchOutputWriteTargets } from './apm/utils/get_apm_write_targets';
import { Logger } from './utils/create_logger';
import { Fields } from './entity';
import { StreamAggregator } from './stream_aggregator';
export interface StreamProcessorOptions<TFields extends Fields = ApmFields> {
version?: string;
processors: Array<(events: TFields[]) => TFields[]>;
processors?: Array<(events: TFields[]) => TFields[]>;
streamAggregators?: Array<StreamAggregator<TFields>>;
flushInterval?: string;
// defaults to 10k
maxBufferSize?: number;
@ -39,6 +41,8 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
getBreakdownMetrics,
];
public static defaultFlushInterval: number = 10000;
private readonly processors: Array<(events: TFields[]) => TFields[]>;
private readonly streamAggregators: Array<StreamAggregator<TFields>>;
constructor(private readonly options: StreamProcessorOptions<TFields>) {
[this.intervalAmount, this.intervalUnit] = this.options.flushInterval
@ -47,6 +51,8 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
this.name = this.options?.name ?? 'StreamProcessor';
this.version = this.options.version ?? '8.0.0';
this.versionMajor = Number.parseInt(this.version.split('.')[0], 10);
this.processors = options.processors ?? [];
this.streamAggregators = options.streamAggregators ?? [];
}
private readonly intervalAmount: number;
private readonly intervalUnit: any;
@ -73,6 +79,15 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
yield StreamProcessor.enrich(event, this.version, this.versionMajor);
sourceEventsYielded++;
for (const aggregator of this.streamAggregators) {
const aggregatedEvents = aggregator.process(event);
if (aggregatedEvents) {
yield* aggregatedEvents.map((d) =>
StreamProcessor.enrich(d, this.version, this.versionMajor)
);
}
}
if (sourceEventsYielded % maxBufferSize === 0) {
if (this.options?.processedCallback) {
this.options.processedCallback(maxBufferSize);
@ -96,7 +111,7 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
this.options.logger?.debug(
`${this.name} flush ${localBuffer.length} documents ${order}: ${e} => ${f}`
);
for (const processor of this.options.processors) {
for (const processor of this.processors) {
yield* processor(localBuffer).map((d) =>
StreamProcessor.enrich(d, this.version, this.versionMajor)
);
@ -116,13 +131,16 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
this.options.logger?.info(
`${this.name} processing remaining buffer: ${localBuffer.length} items left`
);
for (const processor of this.options.processors) {
for (const processor of this.processors) {
yield* processor(localBuffer).map((d) =>
StreamProcessor.enrich(d, this.version, this.versionMajor)
);
}
this.options.processedCallback?.apply(this, [localBuffer.length]);
}
for (const aggregator of this.streamAggregators) {
yield* aggregator.flush();
}
}
private calculateFlushAfter(eventDate: number | null, order: 'asc' | 'desc') {
@ -186,10 +204,7 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
return newDoc;
}
static getDataStreamForEvent(
d: Record<string, any>,
writeTargets: ApmElasticsearchOutputWriteTargets
) {
getDataStreamForEvent(d: Record<string, any>, writeTargets: ApmElasticsearchOutputWriteTargets) {
if (!d.processor?.event) {
throw Error("'processor.event' is not set on document, can not determine target index");
}
@ -204,6 +219,13 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
}
}
}
for (const aggregator of this.streamAggregators) {
const target = aggregator.getWriteTarget(d);
if (target) {
dataStream = target;
break;
}
}
return dataStream;
}

View file

@ -14,6 +14,8 @@ import { startLiveDataUpload } from './utils/start_live_data_upload';
import { parseRunCliFlags } from './utils/parse_run_cli_flags';
import { getCommonServices } from './utils/get_common_services';
import { ApmSynthtraceKibanaClient } from '../lib/apm/client/apm_synthtrace_kibana_client';
import { StreamAggregator } from '../lib/stream_aggregator';
import { ServiceLatencyAggregator } from '../lib/apm/aggregators/service_latency_aggregator';
function options(y: Argv) {
return y
@ -186,8 +188,9 @@ yargs(process.argv.slice(2))
await apmEsClient.updateComponentTemplates(runOptions.numShards);
}
const aggregators: StreamAggregator[] = [new ServiceLatencyAggregator()];
if (argv.clean) {
await apmEsClient.clean();
await apmEsClient.clean(aggregators.map((a) => a.getDataStreamName() + '-*'));
}
if (runOptions.gcpRepository) {
await apmEsClient.registerGcpRepository(runOptions.gcpRepository);
@ -205,6 +208,8 @@ yargs(process.argv.slice(2))
)}`
);
for (const aggregator of aggregators) await apmEsClient.createDataStream(aggregator);
if (runOptions.maxDocs !== 0)
await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, version);

View file

@ -15,6 +15,8 @@ import { LogLevel } from '../../lib/utils/create_logger';
import { StreamProcessor } from '../../lib/stream_processor';
import { Scenario } from '../scenario';
import { EntityIterable, Fields } from '../..';
import { StreamAggregator } from '../../lib/stream_aggregator';
import { ServiceLatencyAggregator } from '../../lib/apm/aggregators/service_latency_aggregator';
// logging proxy to main thread, ensures we see real time logging
const l = {
@ -61,9 +63,11 @@ async function setup() {
parentPort?.postMessage({ workerIndex, lastTimestamp: item['@timestamp'] });
}
};
const aggregators: StreamAggregator[] = [new ServiceLatencyAggregator()];
streamProcessor = new StreamProcessor({
version,
processors: StreamProcessor.apmProcessors,
streamAggregators: aggregators,
maxSourceEvents: runOptions.maxDocs,
logger: l,
processedCallback: (processedDocuments) => {