[synthtrace] Add support for Infra, Asset and Monitoring data (#160479)

Closes #154081

In order to make it easier to test the Assets API signal source mode, we
wanted an easy way to generate test data for the signals we consume.
Since Synthtrace already produces test data for APM services and is
integrated with FTR, we felt it was a good path to try to extend it with
assets and infra data (which will benefit other teams as well in the
future).
This PR adds some barebones entities, and re-introduces the Monitoring
entities that had been removed in the past (hopefully this time in a way
that is unobtrusive).

In order to accommodate these new entities, this PR also extracts a base
client from the existing APM client, in order to share common code
between the new entity clients.
This commit is contained in:
Milton Hultgren 2023-06-29 15:26:15 +02:00 committed by GitHub
parent 89b97a52ec
commit c30251c46a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 920 additions and 168 deletions

View file

@ -19,15 +19,17 @@ export type {
OSInfo,
} from './src/lib/apm/mobile_device';
export { httpExitSpan } from './src/lib/apm/span';
export { type AssetDocument } from './src/lib/assets';
export { DistributedTrace } from './src/lib/dsl/distributed_trace_client';
export { serviceMap } from './src/lib/dsl/service_map';
export type { Fields } from './src/lib/entity';
export { infra, type InfraDocument } from './src/lib/infra';
export { parseInterval } from './src/lib/interval';
export { monitoring, type MonitoringDocument } from './src/lib/monitoring';
export type { Serializable } from './src/lib/serializable';
export { timerange } from './src/lib/timerange';
export type { Timerange } from './src/lib/timerange';
export { dedot } from './src/lib/utils/dedot';
export { generateLongId, generateShortId } from './src/lib/utils/generate_id';
export { appendHash, hashKeysOf } from './src/lib/utils/hash';
export { dedot } from './src/lib/utils/dedot';
export type { ESDocumentWithOperation, SynthtraceESAction, SynthtraceGenerator } from './src/types';
export { parseInterval } from './src/lib/interval';

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import { ServiceAsset } from '../assets';
import { Entity } from '../entity';
import { ApmFields } from './apm_fields';
import { Instance } from './instance';
@ -18,6 +19,15 @@ export class Service extends Entity<ApmFields> {
'host.name': instanceName,
});
}
asset() {
return new ServiceAsset({
'asset.kind': 'service',
'asset.id': this.fields['service.name']!,
'asset.name': this.fields['service.name'],
'asset.ean': `service:${this.fields['service.name']}`,
});
}
}
export function service(name: string, environment: string, agentName: string): Service;

View file

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
// eslint-disable-next-line max-classes-per-file
import { Fields } from '../entity';
import { Serializable } from '../serializable';
// Can I pull in types from asset-manager here?
type AssetKind = 'host' | 'pod' | 'container' | 'service';
export interface AssetKindDocument<T extends AssetKind> extends Fields {
'asset.kind': T;
'asset.ean': string;
'asset.id': string;
'asset.name'?: string;
'asset.parents'?: string[];
'asset.children'?: string[];
'asset.references'?: string[];
}
// What is the best way to tie up relationships?
// With these setters we can go both ways but the entities might be able to produce
// pre-linked assets as well
class Asset<T extends AssetKind> extends Serializable<AssetKindDocument<T>> {
parents(parents: string[]) {
this.fields['asset.parents'] = parents;
}
children(children: string[]) {
this.fields['asset.children'] = children;
}
references(references: string[]) {
this.fields['asset.references'] = references;
}
}
export class HostAsset extends Asset<'host'> {}
export class PodAsset extends Asset<'pod'> {}
export class ContainerAsset extends Asset<'container'> {}
export class ServiceAsset extends Asset<'service'> {}
export type AssetDocument =
| AssetKindDocument<'host'>
| AssetKindDocument<'pod'>
| AssetKindDocument<'container'>
| AssetKindDocument<'service'>;

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/* eslint-disable max-classes-per-file */
import { ContainerAsset } from '../assets';
import { Entity, Fields } from '../entity';
import { Serializable } from '../serializable';
interface ContainerDocument extends Fields {
'container.id': string;
'kubernetes.pod.uid': string;
'kubernetes.node.name': string;
}
export class Container extends Entity<ContainerDocument> {
metrics() {
return new ContainerMetrics({
...this.fields,
'kubernetes.container.cpu.usage.limit.pct': 46,
});
}
asset() {
return new ContainerAsset({
'asset.kind': 'container',
'asset.id': this.fields['container.id'],
'asset.name': this.fields['container.id'],
'asset.ean': `container:${this.fields['container.id']}`,
});
}
}
export interface ContainerMetricsDocument extends ContainerDocument {
'kubernetes.container.cpu.usage.limit.pct': number;
}
class ContainerMetrics extends Serializable<ContainerMetricsDocument> {}
export function container(id: string, uid: string, nodeName: string) {
return new Container({
'container.id': id,
'kubernetes.pod.uid': uid,
'kubernetes.node.name': nodeName,
});
}

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/* eslint-disable max-classes-per-file */
import { HostAsset } from '../assets';
import { Entity, Fields } from '../entity';
import { Serializable } from '../serializable';
import { pod } from './pod';
interface HostDocument extends Fields {
'host.hostname': string;
}
class Host extends Entity<HostDocument> {
metrics() {
return new HostMetrics({
...this.fields,
'system.cpu.total.norm.pct': 46,
});
}
asset() {
return new HostAsset({
'asset.kind': 'host',
'asset.id': this.fields['host.hostname'],
'asset.name': this.fields['host.hostname'],
'asset.ean': `host:${this.fields['host.hostname']}`,
});
}
pod(uid: string) {
return pod(uid, this.fields['host.hostname']);
}
}
export interface HostMetricsDocument extends HostDocument {
'system.cpu.total.norm.pct': number;
}
class HostMetrics extends Serializable<HostMetricsDocument> {}
export function host(name: string): Host {
return new Host({
'host.hostname': name,
});
}

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { container, ContainerMetricsDocument } from './container';
import { host, HostMetricsDocument } from './host';
import { pod, PodMetricsDocument } from './pod';
export type InfraDocument = HostMetricsDocument | PodMetricsDocument | ContainerMetricsDocument;
export const infra = {
host,
pod,
container,
};

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
/* eslint-disable max-classes-per-file */
import { PodAsset } from '../assets';
import { Entity, Fields } from '../entity';
import { Serializable } from '../serializable';
import { container } from './container';
interface PodDocument extends Fields {
'kubernetes.pod.uid': string;
'kubernetes.node.name': string;
}
export class Pod extends Entity<PodDocument> {
metrics() {
return new PodMetrics({
...this.fields,
'kubernetes.pod.cpu.usage.limit.pct': 46,
});
}
asset() {
return new PodAsset({
'asset.kind': 'pod',
'asset.id': this.fields['kubernetes.pod.uid'],
'asset.name': this.fields['kubernetes.pod.uid'],
'asset.ean': `pod:${this.fields['kubernetes.pod.uid']}`,
});
}
container(id: string) {
return container(id, this.fields['kubernetes.pod.uid'], this.fields['kubernetes.node.name']);
}
}
export interface PodMetricsDocument extends PodDocument {
'kubernetes.pod.cpu.usage.limit.pct': number;
}
class PodMetrics extends Serializable<PodMetricsDocument> {}
export function pod(uid: string, nodeName: string) {
return new Pod({
'kubernetes.pod.uid': uid,
'kubernetes.node.name': nodeName,
});
}

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Entity, Fields } from '../entity';
import { generateShortId } from '../utils/generate_id';
import { clusterStats } from './cluster_stats';
import { kibana } from './kibana';
interface ClusterDocument extends Fields {
cluster_name: string;
cluster_uuid: string;
}
class Cluster extends Entity<ClusterDocument> {
stats() {
return clusterStats(this.fields.cluster_name, this.fields.cluster_uuid);
}
kibana(name: string, index: string = '.kibana') {
return kibana(name, generateShortId(), this.fields.cluster_uuid, index);
}
}
export function cluster(name: string) {
return new Cluster({
cluster_name: name,
cluster_uuid: generateShortId(),
});
}

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Fields } from '../entity';
import { Serializable } from '../serializable';
export interface ClusterStatsDocument extends Fields {
cluster_name: string;
cluster_uuid: string;
type: 'cluster_stats';
'license.status'?: string;
'cluster_stats.timestamp'?: string;
'cluster_stats.indices.count'?: number;
}
export class ClusterStats extends Serializable<ClusterStatsDocument> {
constructor(fields: ClusterStatsDocument) {
super(fields);
this.fields.type = 'cluster_stats';
this.fields['license.status'] = 'active';
}
timestamp(timestamp: number) {
super.timestamp(timestamp);
this.fields['cluster_stats.timestamp'] = new Date(timestamp).toISOString();
return this;
}
indices(count: number) {
this.fields['cluster_stats.indices.count'] = count;
return this;
}
}
export function clusterStats(name: string, uuid: string) {
return new ClusterStats({
cluster_name: name,
cluster_uuid: uuid,
type: 'cluster_stats',
});
}

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { cluster } from './cluster';
import { ClusterStatsDocument } from './cluster_stats';
import { kibana } from './kibana';
import { KibanaStatsDocument } from './kibana_stats';
export type MonitoringDocument = ClusterStatsDocument | KibanaStatsDocument;
export const monitoring = {
cluster,
kibana,
};

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Entity, Fields } from '../entity';
import { kibanaStats } from './kibana_stats';
interface KibanaDocument extends Fields {
cluster_uuid: string;
'kibana_stats.kibana.name': string;
'kibana_stats.kibana.uuid': string;
'kibana_stats.kibana.index': string;
}
export class Kibana extends Entity<KibanaDocument> {
stats() {
return kibanaStats(
this.fields.cluster_uuid,
this.fields['kibana_stats.kibana.name'],
this.fields['kibana_stats.kibana.uuid'],
this.fields['kibana_stats.kibana.index']
);
}
}
export function kibana(name: string, uuid: string, clusterUuid: string, index: string = '.kibana') {
return new Kibana({
cluster_uuid: clusterUuid,
'kibana_stats.kibana.name': name,
'kibana_stats.kibana.uuid': uuid,
'kibana_stats.kibana.index': index,
});
}

View file

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Fields } from '../entity';
import { Serializable } from '../serializable';
export interface KibanaStatsDocument extends Fields {
type: 'kibana_stats';
cluster_uuid: string;
'kibana_stats.kibana.name': string;
'kibana_stats.kibana.uuid': string;
'kibana_stats.kibana.index': string;
'kibana_stats.timestamp'?: string;
'kibana_stats.response_times.max'?: number;
'kibana_stats.kibana.status'?: string;
'kibana_stats.requests.disconnects'?: number;
'kibana_stats.requests.total'?: number;
}
export class KibanaStats extends Serializable<KibanaStatsDocument> {
timestamp(timestamp: number) {
super.timestamp(timestamp);
this.fields['kibana_stats.timestamp'] = new Date(timestamp).toISOString();
return this;
}
status(status: string) {
this.fields['kibana_stats.kibana.status'] = status;
}
responseTimes(max: number) {
this.fields['kibana_stats.response_times.max'] = max;
}
requests(disconnects: number, total: number) {
this.fields['kibana_stats.requests.disconnects'] = disconnects;
this.fields['kibana_stats.requests.total'] = total;
return this;
}
}
export function kibanaStats(name: string, uuid: string, clusterUuid: string, index: string) {
return new KibanaStats({
type: 'kibana_stats',
cluster_uuid: clusterUuid,
'kibana_stats.kibana.name': name,
'kibana_stats.kibana.uuid': uuid,
'kibana_stats.kibana.index': index,
});
}

View file

@ -10,3 +10,9 @@ export { createLogger, LogLevel } from './src/lib/utils/create_logger';
export { ApmSynthtraceEsClient } from './src/lib/apm/client/apm_synthtrace_es_client';
export { ApmSynthtraceKibanaClient } from './src/lib/apm/client/apm_synthtrace_kibana_client';
export { InfraSynthtraceEsClient } from './src/lib/infra/infra_synthtrace_es_client';
export { AssetsSynthtraceEsClient } from './src/lib/assets/assets_synthtrace_es_client';
export { MonitoringSynthtraceEsClient } from './src/lib/monitoring/monitoring_synthtrace_es_client';

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { PassThrough, pipeline, Readable } from 'stream';
import { getDedotTransform } from '../../../shared/get_dedot_transform';
import { getSerializeTransform } from '../../../shared/get_serialize_transform';
import { Logger } from '../../../utils/create_logger';
import { fork } from '../../../utils/stream_utils';
import { createBreakdownMetricsAggregator } from '../../aggregators/create_breakdown_metrics_aggregator';
import { createServiceMetricsAggregator } from '../../aggregators/create_service_metrics_aggregator';
import { createServiceSummaryMetricsAggregator } from '../../aggregators/create_service_summary_metrics_aggregator';
import { createSpanMetricsAggregator } from '../../aggregators/create_span_metrics_aggregator';
import { createTransactionMetricsAggregator } from '../../aggregators/create_transaction_metrics_aggregator';
import { getApmServerMetadataTransform } from './get_apm_server_metadata_transform';
import { getIntakeDefaultsTransform } from './get_intake_defaults_transform';
import { getRoutingTransform } from './get_routing_transform';
export function apmPipeline(logger: Logger, version: string, includeSerialization: boolean = true) {
return (base: Readable) => {
const aggregators = [
createTransactionMetricsAggregator('1m'),
createTransactionMetricsAggregator('10m'),
createTransactionMetricsAggregator('60m'),
createServiceMetricsAggregator('1m'),
createServiceMetricsAggregator('10m'),
createServiceMetricsAggregator('60m'),
createServiceSummaryMetricsAggregator('1m'),
createServiceSummaryMetricsAggregator('10m'),
createServiceSummaryMetricsAggregator('60m'),
createSpanMetricsAggregator('1m'),
createSpanMetricsAggregator('10m'),
createSpanMetricsAggregator('60m'),
];
const serializationTransform = includeSerialization ? [getSerializeTransform()] : [];
return pipeline(
// @ts-expect-error Some weird stuff here with the type definition for pipeline. We have tests!
base,
...serializationTransform,
getIntakeDefaultsTransform(),
fork(new PassThrough({ objectMode: true }), ...aggregators),
createBreakdownMetricsAggregator('30s'),
getApmServerMetadataTransform(version),
getRoutingTransform(),
getDedotTransform(),
(err) => {
if (err) {
logger.error(err);
}
}
);
};
}

View file

@ -7,38 +7,11 @@
*/
import { Client, estypes } from '@elastic/elasticsearch';
import {
ApmFields,
ESDocumentWithOperation,
SynthtraceESAction,
SynthtraceGenerator,
} from '@kbn/apm-synthtrace-client';
import { castArray } from 'lodash';
import { PassThrough, pipeline, Readable, Transform } from 'stream';
import { isGeneratorObject } from 'util/types';
import { ApmFields } from '@kbn/apm-synthtrace-client';
import { ValuesType } from 'utility-types';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../../../shared/base_client';
import { Logger } from '../../../utils/create_logger';
import { fork, sequential } from '../../../utils/stream_utils';
import { createBreakdownMetricsAggregator } from '../../aggregators/create_breakdown_metrics_aggregator';
import { createServiceMetricsAggregator } from '../../aggregators/create_service_metrics_aggregator';
import { createServiceSummaryMetricsAggregator } from '../../aggregators/create_service_summary_metrics_aggregator';
import { createSpanMetricsAggregator } from '../../aggregators/create_span_metrics_aggregator';
import { createTransactionMetricsAggregator } from '../../aggregators/create_transaction_metrics_aggregator';
import { getApmServerMetadataTransform } from './get_apm_server_metadata_transform';
import { getDedotTransform } from './get_dedot_transform';
import { getIntakeDefaultsTransform } from './get_intake_defaults_transform';
import { getRoutingTransform } from './get_routing_transform';
import { getSerializeTransform } from './get_serialize_transform';
export interface ApmSynthtraceEsClientOptions {
version: string;
concurrency?: number;
refreshAfterIndex?: boolean;
}
type MaybeArray<T> = T | T[];
const DATA_STREAMS = ['traces-apm*', 'metrics-apm*', 'logs-apm*'];
import { apmPipeline } from './apm_pipeline';
export enum ComponentTemplateName {
LogsApp = 'logs-apm.app@custom',
@ -50,33 +23,20 @@ export enum ComponentTemplateName {
TracesApmSampled = 'traces-apm.sampled@custom',
}
export class ApmSynthtraceEsClient {
private readonly client: Client;
private readonly logger: Logger;
export interface ApmSynthtraceEsClientOptions extends Omit<SynthtraceEsClientOptions, 'pipeline'> {
version: string;
}
private readonly concurrency: number;
private readonly refreshAfterIndex: boolean;
private readonly version: string;
private pipelineCallback: (base: Readable) => NodeJS.WritableStream = this.getDefaultPipeline();
export class ApmSynthtraceEsClient extends SynthtraceEsClient<ApmFields> {
private version: string;
constructor(options: { client: Client; logger: Logger } & ApmSynthtraceEsClientOptions) {
this.client = options.client;
this.logger = options.logger;
this.concurrency = options.concurrency ?? 1;
this.refreshAfterIndex = options.refreshAfterIndex ?? false;
this.version = options.version;
}
async clean() {
this.logger.info(`Cleaning APM data streams ${DATA_STREAMS.join(',')}`);
await this.client.indices.deleteDataStream({
name: DATA_STREAMS.join(','),
expand_wildcards: ['open', 'hidden'],
super({
...options,
pipeline: apmPipeline(options.logger, options.version),
});
this.dataStreams = ['traces-apm*', 'metrics-apm*', 'logs-apm*'];
this.version = options.version;
}
async updateComponentTemplate(
@ -105,117 +65,7 @@ export class ApmSynthtraceEsClient {
this.logger.info(`Updated component template: ${name}`);
}
async refresh(dataStreams: string[] = DATA_STREAMS) {
this.logger.info(`Refreshing ${dataStreams.join(',')}`);
return this.client.indices.refresh({
index: dataStreams,
allow_no_indices: true,
ignore_unavailable: true,
expand_wildcards: ['open', 'hidden'],
});
}
getDefaultPipeline(includeSerialization: boolean = true) {
return (base: Readable) => {
const aggregators = [
createTransactionMetricsAggregator('1m'),
createTransactionMetricsAggregator('10m'),
createTransactionMetricsAggregator('60m'),
createServiceMetricsAggregator('1m'),
createServiceMetricsAggregator('10m'),
createServiceMetricsAggregator('60m'),
createServiceSummaryMetricsAggregator('1m'),
createServiceSummaryMetricsAggregator('10m'),
createServiceSummaryMetricsAggregator('60m'),
createSpanMetricsAggregator('1m'),
createSpanMetricsAggregator('10m'),
createSpanMetricsAggregator('60m'),
];
const serializationTransform = includeSerialization ? [getSerializeTransform()] : [];
return pipeline(
// @ts-expect-error Some weird stuff here with the type definition for pipeline. We have tests!
base,
...serializationTransform,
getIntakeDefaultsTransform(),
fork(new PassThrough({ objectMode: true }), ...aggregators),
createBreakdownMetricsAggregator('30s'),
getApmServerMetadataTransform(this.version),
getRoutingTransform(),
getDedotTransform(),
(err) => {
if (err) {
this.logger.error(err);
}
}
);
};
}
pipeline(cb: (base: Readable) => NodeJS.WritableStream) {
this.pipelineCallback = cb;
}
getVersion() {
return this.version;
}
async index(streamOrGenerator: MaybeArray<Readable | SynthtraceGenerator<ApmFields>>) {
this.logger.debug(`Bulk indexing ${castArray(streamOrGenerator).length} stream(s)`);
const allStreams = castArray(streamOrGenerator).map((obj) => {
const base = isGeneratorObject(obj) ? Readable.from(obj) : obj;
return this.pipelineCallback(base);
}) as Transform[];
let count: number = 0;
const stream = sequential(...allStreams);
await this.client.helpers.bulk({
concurrency: this.concurrency,
refresh: false,
refreshOnCompletion: false,
flushBytes: 250000,
datasource: stream,
filter_path: 'errors,items.*.error,items.*.status',
onDocument: (doc: ESDocumentWithOperation<ApmFields>) => {
let action: SynthtraceESAction;
count++;
if (count % 100000 === 0) {
this.logger.info(`Indexed ${count} documents`);
} else if (count % 1000 === 0) {
this.logger.debug(`Indexed ${count} documents`);
}
if (doc._action) {
action = doc._action!;
delete doc._action;
} else if (doc._index) {
action = { create: { _index: doc._index } };
delete doc._index;
} else {
this.logger.debug(doc);
throw new Error(
`Could not determine operation: _index and _action not defined in document`
);
}
return action;
},
onDrop: (doc) => {
this.logger.error(`Dropped document: ${JSON.stringify(doc, null, 2)}`);
},
});
this.logger.info(`Produced ${count} events`);
if (this.refreshAfterIndex) {
await this.refresh();
}
return apmPipeline(this.logger, this.version, includeSerialization);
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { AssetDocument, ESDocumentWithOperation } from '@kbn/apm-synthtrace-client';
import { pipeline, Readable, Transform } from 'stream';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getDedotTransform } from '../shared/get_dedot_transform';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';
export type AssetsSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;
export class AssetsSynthtraceEsClient extends SynthtraceEsClient<AssetDocument> {
constructor(options: { client: Client; logger: Logger } & AssetsSynthtraceEsClientOptions) {
super({
...options,
pipeline: assetsPipeline(),
});
this.dataStreams = ['assets-*'];
}
}
function assetsPipeline() {
return (base: Readable) => {
return pipeline(
base,
getSerializeTransform(),
getRoutingTransform(),
getDedotTransform(),
(err: unknown) => {
if (err) {
throw err;
}
}
);
};
}
function getRoutingTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<AssetDocument>, encoding, callback) {
if ('asset.kind' in document) {
document._index = `assets-${document['asset.kind']}-default`;
} else {
throw new Error('Cannot determine index for event');
}
callback(null, document);
},
});
}

View file

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ESDocumentWithOperation, InfraDocument } from '@kbn/apm-synthtrace-client';
import { pipeline, Readable, Transform } from 'stream';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getDedotTransform } from '../shared/get_dedot_transform';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';
export type InfraSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;
export class InfraSynthtraceEsClient extends SynthtraceEsClient<InfraDocument> {
constructor(options: { client: Client; logger: Logger } & InfraSynthtraceEsClientOptions) {
super({
...options,
pipeline: infraPipeline(),
});
this.dataStreams = ['metrics-*', 'logs-*'];
}
}
function infraPipeline() {
return (base: Readable) => {
return pipeline(
base,
getSerializeTransform(),
getRoutingTransform(),
getDedotTransform(),
(err: unknown) => {
if (err) {
throw err;
}
}
);
};
}
function getRoutingTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<InfraDocument>, encoding, callback) {
if ('host.hostname' in document) {
document._index = 'metrics-system.cpu-default';
} else if ('container.id' in document) {
document._index = 'metrics-kubernetes.container-default';
} else if ('kubernetes.pod.uid' in document) {
document._index = 'metrics-kubernetes.pod-default';
} else {
throw new Error('Cannot determine index for event');
}
callback(null, document);
},
});
}

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ESDocumentWithOperation, MonitoringDocument } from '@kbn/apm-synthtrace-client';
import { pipeline, Readable, Transform } from 'stream';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getDedotTransform } from '../shared/get_dedot_transform';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';
export type MonitoringSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;
export class MonitoringSynthtraceEsClient extends SynthtraceEsClient<MonitoringDocument> {
constructor(options: { client: Client; logger: Logger } & MonitoringSynthtraceEsClientOptions) {
super({
...options,
pipeline: monitoringPipeline(),
});
this.dataStreams = ['.monitoring-*', 'metrics-*'];
}
}
function monitoringPipeline() {
return (base: Readable) => {
return pipeline(
base,
getSerializeTransform(),
getRoutingTransform(),
getDedotTransform(),
(err: unknown) => {
if (err) {
throw err;
}
}
);
};
}
function getRoutingTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<MonitoringDocument>, encoding, callback) {
if ('type' in document) {
if (document.type === 'cluster_stats') {
document._index = '.monitoring-es-7';
} else if (document.type === 'kibana_stats') {
document._index = '.monitoring-kibana-7';
} else {
throw new Error('Cannot determine index for event');
}
} else {
throw new Error('Cannot determine index for event');
}
callback(null, document);
},
});
}

View file

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import {
ESDocumentWithOperation,
Fields,
SynthtraceESAction,
SynthtraceGenerator,
} from '@kbn/apm-synthtrace-client';
import { castArray } from 'lodash';
import { Readable, Transform } from 'stream';
import { isGeneratorObject } from 'util/types';
import { Logger } from '../utils/create_logger';
import { sequential } from '../utils/stream_utils';
export interface SynthtraceEsClientOptions {
concurrency?: number;
refreshAfterIndex?: boolean;
pipeline: (base: Readable) => NodeJS.WritableStream;
}
type MaybeArray<T> = T | T[];
export class SynthtraceEsClient<TFields extends Fields> {
protected readonly client: Client;
protected readonly logger: Logger;
private readonly concurrency: number;
private readonly refreshAfterIndex: boolean;
private pipelineCallback: (base: Readable) => NodeJS.WritableStream;
protected dataStreams: string[] = [];
constructor(options: { client: Client; logger: Logger } & SynthtraceEsClientOptions) {
this.client = options.client;
this.logger = options.logger;
this.concurrency = options.concurrency ?? 1;
this.refreshAfterIndex = options.refreshAfterIndex ?? false;
this.pipelineCallback = options.pipeline;
}
async clean() {
this.logger.info(`Cleaning data streams ${this.dataStreams.join(',')}`);
await this.client.indices.deleteDataStream({
name: this.dataStreams.join(','),
expand_wildcards: ['open', 'hidden'],
});
}
async refresh() {
this.logger.info(`Refreshing ${this.dataStreams.join(',')}`);
return this.client.indices.refresh({
index: this.dataStreams,
allow_no_indices: true,
ignore_unavailable: true,
expand_wildcards: ['open', 'hidden'],
});
}
pipeline(cb: (base: Readable) => NodeJS.WritableStream) {
this.pipelineCallback = cb;
}
async index(streamOrGenerator: MaybeArray<Readable | SynthtraceGenerator<TFields>>) {
this.logger.debug(`Bulk indexing ${castArray(streamOrGenerator).length} stream(s)`);
const allStreams = castArray(streamOrGenerator).map((obj) => {
const base = isGeneratorObject(obj) ? Readable.from(obj) : obj;
return this.pipelineCallback(base);
}) as Transform[];
let count: number = 0;
const stream = sequential(...allStreams);
await this.client.helpers.bulk({
concurrency: this.concurrency,
refresh: false,
refreshOnCompletion: false,
flushBytes: 250000,
datasource: stream,
filter_path: 'errors,items.*.error,items.*.status',
onDocument: (doc: ESDocumentWithOperation<TFields>) => {
let action: SynthtraceESAction;
count++;
if (count % 100000 === 0) {
this.logger.info(`Indexed ${count} documents`);
} else if (count % 1000 === 0) {
this.logger.debug(`Indexed ${count} documents`);
}
if (doc._action) {
action = doc._action!;
delete doc._action;
} else if (doc._index) {
action = { create: { _index: doc._index } };
delete doc._index;
} else {
this.logger.debug(doc);
throw new Error(
`Could not determine operation: _index and _action not defined in document`
);
}
return action;
},
onDrop: (doc) => {
this.logger.error(`Dropped document: ${JSON.stringify(doc, null, 2)}`);
},
});
this.logger.info(`Produced ${count} events`);
if (this.refreshAfterIndex) {
await this.refresh();
}
}
}

View file

@ -5,14 +5,84 @@
* 2.0.
*/
import { APM_TEST_PASSWORD } from '@kbn/apm-plugin/server/test_helpers/create_apm_users/authentication';
import {
ApmSynthtraceEsClient,
ApmSynthtraceKibanaClient,
AssetsSynthtraceEsClient,
createLogger,
InfraSynthtraceEsClient,
LogLevel,
} from '@kbn/apm-synthtrace';
import { FtrConfigProviderContext } from '@kbn/test';
import url from 'url';
import { FtrProviderContext as InheritedFtrProviderContext } from '../../ftr_provider_context';
import { InheritedServices } from './types';
export default async function ({ readConfigFile }: FtrConfigProviderContext) {
interface AssetManagerConfig {
services: InheritedServices & {
assetsSynthtraceEsClient: (
context: InheritedFtrProviderContext
) => Promise<AssetsSynthtraceEsClient>;
infraSynthtraceEsClient: (
context: InheritedFtrProviderContext
) => Promise<InfraSynthtraceEsClient>;
apmSynthtraceEsClient: (context: InheritedFtrProviderContext) => Promise<ApmSynthtraceEsClient>;
};
}
export default async function createTestConfig({
readConfigFile,
}: FtrConfigProviderContext): Promise<AssetManagerConfig> {
const baseIntegrationTestsConfig = await readConfigFile(require.resolve('../../config.ts'));
const services = baseIntegrationTestsConfig.get('services');
return {
...baseIntegrationTestsConfig.getAll(),
testFiles: [require.resolve('.')],
services: {
...services,
assetsSynthtraceEsClient: (context: InheritedFtrProviderContext) => {
return new AssetsSynthtraceEsClient({
client: context.getService('es'),
logger: createLogger(LogLevel.info),
refreshAfterIndex: true,
});
},
infraSynthtraceEsClient: (context: InheritedFtrProviderContext) => {
return new InfraSynthtraceEsClient({
client: context.getService('es'),
logger: createLogger(LogLevel.info),
refreshAfterIndex: true,
});
},
apmSynthtraceEsClient: async (context: InheritedFtrProviderContext) => {
const servers = baseIntegrationTestsConfig.get('servers');
const kibanaServer = servers.kibana as url.UrlObject;
const kibanaServerUrl = url.format(kibanaServer);
const kibanaServerUrlWithAuth = url
.format({
...url.parse(kibanaServerUrl),
auth: `elastic:${APM_TEST_PASSWORD}`,
})
.slice(0, -1);
const kibanaClient = new ApmSynthtraceKibanaClient({
target: kibanaServerUrlWithAuth,
logger: createLogger(LogLevel.debug),
});
const kibanaVersion = await kibanaClient.fetchLatestApmPackageVersion();
await kibanaClient.installApmPackage(kibanaVersion);
return new ApmSynthtraceEsClient({
client: context.getService('es'),
logger: createLogger(LogLevel.info),
version: kibanaVersion,
refreshAfterIndex: true,
});
},
},
kbnTestServer: {
...baseIntegrationTestsConfig.get('kbnTestServer'),
serverArgs: [
@ -22,3 +92,7 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
},
};
}
export type CreateTestConfig = Awaited<ReturnType<typeof createTestConfig>>;
export type AssetManagerServices = CreateTestConfig['services'];

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { GenericFtrProviderContext } from '@kbn/test';
import { FtrProviderContext as InheritedFtrProviderContext } from '../../ftr_provider_context';
import { AssetManagerServices } from './config';
export type InheritedServices = InheritedFtrProviderContext extends GenericFtrProviderContext<
infer TServices,
{}
>
? TServices
: {};
export type FtrProviderContext = GenericFtrProviderContext<AssetManagerServices, {}>;