[Synthtrace] Adding Entities support (#196258)

## known issue
```
- Transforms are not started by synthtrace. 
Because it duplicates data ingested by synthrace on signal indices. And it takes a long time to generate data.

- We are not able to open the Inventory page because of 👆🏻.
```
---

```
node scripts/synthtrace.js traces_logs_entities.ts --clean --live
```
or 
```
node scripts/synthtrace.js traces_logs_entities.ts --clean --from=2024-04-08T08:00:00.000Z --to=2024-04-08T08:15:00.000Z
```

docs produces by the new scenario:
```
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".entities.v1.latest.builtin_services_from_ecs_data",
        "_id": "2846700000000001",
        "_score": 1,
        "_source": {
          "service": {
            "name": "synth-node-trace-logs",
            "environment": "Synthtrace: traces_logs_entities"
          },
          "source_data_stream": {
            "type": [
              "traces",
              "logs"
            ]
          },
          "agent": {
            "name": [
              "nodejs"
            ]
          },
          "entity": {
            "id": "2846700000000001",
            "type": "service",
            "definitionId": "latest",
            "lastSeenTimestamp": "2024-10-15T08:56:20.562Z"
          },
          "event": {
            "ingested": "2024-10-15T08:56:20.562Z"
          }
        }
      },
      {
        "_index": ".entities.v1.latest.builtin_services_from_ecs_data",
        "_id": "2846700000000000",
        "_score": 1,
        "_source": {
          "service": {
            "name": "synth-java-trace",
            "environment": "Synthtrace: traces_logs_entities"
          },
          "source_data_stream": {
            "type": [
              "traces"
            ]
          },
          "agent": {
            "name": [
              "java"
            ]
          },
          "entity": {
            "id": "2846700000000000",
            "type": "service",
            "definitionId": "latest",
            "lastSeenTimestamp": "2024-10-15T08:56:20.562Z"
          },
          "event": {
            "ingested": "2024-10-15T08:56:20.562Z"
          }
        }
      },
      {
        "_index": ".entities.v1.latest.builtin_services_from_ecs_data",
        "_id": "2846700000000002",
        "_score": 1,
        "_source": {
          "service": {
            "name": "synth-go-logs",
            "environment": "Synthtrace: traces_logs_entities"
          },
          "source_data_stream": {
            "type": [
              "logs"
            ]
          },
          "agent": {
            "name": [
              "go"
            ]
          },
          "entity": {
            "id": "2846700000000002",
            "type": "service",
            "definitionId": "latest",
            "lastSeenTimestamp": "2024-10-15T08:56:20.562Z"
          },
          "event": {
            "ingested": "2024-10-15T08:56:20.562Z"
          }
        }
      }
    ]
  }
}
```
This commit is contained in:
Cauê Marcondes 2024-10-15 16:21:32 +01:00 committed by GitHub
parent 1bc487c1bf
commit fe22ac9928
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 475 additions and 486 deletions

View file

@ -35,6 +35,6 @@ export { generateLongId, generateShortId } from './src/lib/utils/generate_id';
export { appendHash, hashKeysOf } from './src/lib/utils/hash';
export type { ESDocumentWithOperation, SynthtraceESAction, SynthtraceGenerator } from './src/types';
export { log, type LogDocument, LONG_FIELD_NAME } from './src/lib/logs';
export { type AssetDocument } from './src/lib/assets';
export { syntheticsMonitor, type SyntheticsMonitorDocument } from './src/lib/synthetics';
export { otel, type OtelDocument } from './src/lib/otel';
export { type EntityFields, entities } from './src/lib/entities';

View file

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Fields } from '../entity';
import { Serializable } from '../serializable';
type AssetType = 'host' | 'pod' | 'container' | 'service' | 'aws_rds';
export interface AssetDocument extends Fields {
'asset.id': string;
'asset.type': AssetType;
'asset.first_seen': string;
'asset.last_seen': string;
'asset.identifying_metadata': string[];
'asset.signalTypes': {
'asset.traces'?: boolean;
'asset.logs'?: boolean;
};
}
export class Asset<F extends AssetDocument> extends Serializable<F> {}

View file

@ -1,12 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { ServiceAssetDocument } from './service_assets';
export type AssetDocument = ServiceAssetDocument;

View file

@ -1,23 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Asset, AssetDocument } from './asset';
export interface ServiceAssetDocument extends AssetDocument {
'service.language.name'?: string;
'service.name': string;
'service.node.name'?: string;
'service.environment'?: string;
}
export class ServiceAsset extends Asset<ServiceAssetDocument> {
constructor(fields: Omit<ServiceAssetDocument, 'asset.type'>) {
super({ 'asset.type': 'service', ...fields });
}
}

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { EntityDataStreamType, EntityFields } from '.';
import { Serializable } from '../serializable';
class ContainerEntity extends Serializable<EntityFields> {
constructor(fields: EntityFields) {
super({
...fields,
'entity.type': 'container',
'entity.definitionId': 'latest',
});
}
}
export function containerEntity({
agentName,
dataStreamType,
dataStreamDataset,
containerId,
entityId,
}: {
agentName: string[];
dataStreamType: EntityDataStreamType[];
dataStreamDataset: string;
containerId: string;
entityId: string;
}) {
return new ContainerEntity({
'source_data_stream.type': dataStreamType,
'source_data_stream.dataset': dataStreamDataset,
'agent.name': agentName,
'container.id': containerId,
'entity.id': entityId,
});
}

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { EntityDataStreamType, EntityFields } from '.';
import { Serializable } from '../serializable';
class HostEntity extends Serializable<EntityFields> {
constructor(fields: EntityFields) {
super({
...fields,
'entity.type': 'host',
'entity.definitionId': 'latest',
});
}
}
export function hostEntity({
agentName,
dataStreamType,
dataStreamDataset,
hostName,
entityId,
}: {
agentName: string[];
dataStreamType: EntityDataStreamType[];
dataStreamDataset: string;
hostName: string;
entityId: string;
}) {
return new HostEntity({
'source_data_stream.type': dataStreamType,
'source_data_stream.dataset': dataStreamDataset,
'agent.name': agentName,
'host.name': hostName,
'entity.id': entityId,
});
}

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Fields } from '../entity';
import { serviceEntity } from './service_entity';
import { hostEntity } from './host_entity';
import { containerEntity } from './container_entity';
export type EntityDataStreamType = 'metrics' | 'logs' | 'traces';
export type EntityFields = Fields &
Partial<{
'agent.name': string[];
'source_data_stream.type': string | string[];
'source_data_stream.dataset': string | string[];
'event.ingested': string;
sourceIndex: string;
'entity.lastSeenTimestamp': string;
'entity.schemaVersion': string;
'entity.definitionVersion': string;
'entity.displayName': string;
'entity.identityFields': string | string[];
'entity.id': string;
'entity.type': string;
'entity.definitionId': string;
[key: string]: any;
}>;
export const entities = { serviceEntity, hostEntity, containerEntity };

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { EntityDataStreamType, EntityFields } from '.';
import { Serializable } from '../serializable';
class ServiceEntity extends Serializable<EntityFields> {
constructor(fields: EntityFields) {
super({
...fields,
'entity.type': 'service',
'entity.definitionId': 'latest',
});
}
}
export function serviceEntity({
agentName,
dataStreamType,
serviceName,
environment,
entityId,
}: {
agentName: string[];
serviceName: string;
dataStreamType: EntityDataStreamType[];
environment?: string;
entityId: string;
}) {
return new ServiceEntity({
'service.name': serviceName,
'service.environment': environment,
'source_data_stream.type': dataStreamType,
'agent.name': agentName,
'entity.id': entityId,
});
}

View file

@ -15,7 +15,7 @@ export { InfraSynthtraceEsClient } from './src/lib/infra/infra_synthtrace_es_cli
export { InfraSynthtraceKibanaClient } from './src/lib/infra/infra_synthtrace_kibana_client';
export { MonitoringSynthtraceEsClient } from './src/lib/monitoring/monitoring_synthtrace_es_client';
export { LogsSynthtraceEsClient } from './src/lib/logs/logs_synthtrace_es_client';
export { AssetsSynthtraceEsClient } from './src/lib/assets/assets_synthtrace_es_client';
export { EntitiesSynthtraceEsClient } from './src/lib/entities/entities_synthtrace_es_client';
export { SyntheticsSynthtraceEsClient } from './src/lib/synthetics/synthetics_synthtrace_es_client';
export { OtelSynthtraceEsClient } from './src/lib/otel/otel_synthtrace_es_client';
export {

View file

@ -14,19 +14,24 @@ import {
LogsSynthtraceEsClient,
SyntheticsSynthtraceEsClient,
OtelSynthtraceEsClient,
EntitiesSynthtraceEsClient,
} from '../..';
import { AssetsSynthtraceEsClient } from '../lib/assets/assets_synthtrace_es_client';
import { Logger } from '../lib/utils/create_logger';
import { ScenarioReturnType } from '../lib/utils/with_client';
import { RunOptions } from './utils/parse_run_cli_flags';
import { EntitiesSynthtraceKibanaClient } from '../lib/apm/client/entities_synthtrace_kibana_client';
interface EsClients {
apmEsClient: ApmSynthtraceEsClient;
logsEsClient: LogsSynthtraceEsClient;
infraEsClient: InfraSynthtraceEsClient;
assetsEsClient: AssetsSynthtraceEsClient;
syntheticsEsClient: SyntheticsSynthtraceEsClient;
otelEsClient: OtelSynthtraceEsClient;
entitiesEsClient: EntitiesSynthtraceEsClient;
}
interface KibanaClients {
entitiesKibanaClient: EntitiesSynthtraceKibanaClient;
}
type Generate<TFields> = (options: {
@ -35,6 +40,6 @@ type Generate<TFields> = (options: {
}) => ScenarioReturnType<TFields> | Array<ScenarioReturnType<TFields>>;
export type Scenario<TFields> = (options: RunOptions & { logger: Logger }) => Promise<{
bootstrap?: (options: EsClients) => Promise<void>;
bootstrap?: (options: EsClients & KibanaClients) => Promise<void>;
generate: Generate<TFields>;
}>;

View file

@ -14,9 +14,10 @@ import { getInfraEsClient } from './get_infra_es_client';
import { getKibanaClient } from './get_kibana_client';
import { getServiceUrls } from './get_service_urls';
import { RunOptions } from './parse_run_cli_flags';
import { getAssetsEsClient } from './get_assets_es_client';
import { getSyntheticsEsClient } from './get_synthetics_es_client';
import { getOtelSynthtraceEsClient } from './get_otel_es_client';
import { getEntitiesEsClient } from './get_entities_es_client';
import { getEntitiesKibanaClient } from './get_entites_kibana_client';
export async function bootstrap(runOptions: RunOptions) {
const logger = createLogger(runOptions.logLevel);
@ -58,12 +59,17 @@ export async function bootstrap(runOptions: RunOptions) {
concurrency: runOptions.concurrency,
});
const assetsEsClient = getAssetsEsClient({
const entitiesEsClient = getEntitiesEsClient({
target: esUrl,
logger,
concurrency: runOptions.concurrency,
});
const entitiesKibanaClient = getEntitiesKibanaClient({
target: kibanaUrl,
logger,
});
const syntheticsEsClient = getSyntheticsEsClient({
target: esUrl,
logger,
@ -79,7 +85,7 @@ export async function bootstrap(runOptions: RunOptions) {
await apmEsClient.clean();
await logsEsClient.clean();
await infraEsClient.clean();
await assetsEsClient.clean();
await entitiesEsClient.clean();
await syntheticsEsClient.clean();
await otelEsClient.clean();
}
@ -89,11 +95,12 @@ export async function bootstrap(runOptions: RunOptions) {
apmEsClient,
logsEsClient,
infraEsClient,
assetsEsClient,
entitiesEsClient,
syntheticsEsClient,
otelEsClient,
version,
kibanaUrl,
esUrl,
entitiesKibanaClient,
};
}

View file

@ -7,7 +7,14 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { LogDocument } from '@kbn/apm-synthtrace-client';
import { createAssetsAggregatorFactory } from '../../utils/create_assets_aggregator_factory';
import { EntitiesSynthtraceKibanaClient } from '../../lib/apm/client/entities_synthtrace_kibana_client';
import { Logger } from '../../lib/utils/create_logger';
export const createLogsAssetsAggregator = createAssetsAggregatorFactory<LogDocument>();
export function getEntitiesKibanaClient({ target, logger }: { target: string; logger: Logger }) {
const kibanaClient = new EntitiesSynthtraceKibanaClient({
logger,
target,
});
return kibanaClient;
}

View file

@ -8,12 +8,12 @@
*/
import { Client } from '@elastic/elasticsearch';
import { AssetsSynthtraceEsClient } from '../../lib/assets/assets_synthtrace_es_client';
import { EntitiesSynthtraceEsClient } from '../../lib/entities/entities_synthtrace_es_client';
import { Logger } from '../../lib/utils/create_logger';
import { RunOptions } from './parse_run_cli_flags';
import { getEsClientTlsSettings } from './ssl';
export function getAssetsEsClient({
export function getEntitiesEsClient({
target,
logger,
concurrency,
@ -26,7 +26,7 @@ export function getAssetsEsClient({
tls: getEsClientTlsSettings(target),
});
return new AssetsSynthtraceEsClient({
return new EntitiesSynthtraceEsClient({
client,
logger,
concurrency,

View file

@ -26,7 +26,7 @@ export async function startHistoricalDataUpload({
from: Date;
to: Date;
}) {
const { logger, esUrl, version } = await bootstrap(runOptions);
const { logger, esUrl, version, kibanaUrl } = await bootstrap(runOptions);
const cores = cpus().length;
@ -93,6 +93,7 @@ export async function startHistoricalDataUpload({
workerId: workerIndex.toString(),
esUrl,
version,
kibanaUrl,
};
const worker = new Worker(Path.join(__dirname, './worker.js'), {
workerData,

View file

@ -31,13 +31,26 @@ export async function startLiveDataUpload({
apmEsClient,
logsEsClient,
infraEsClient,
assetsEsClient,
syntheticsEsClient,
otelEsClient,
entitiesEsClient,
entitiesKibanaClient,
} = await bootstrap(runOptions);
const scenario = await getScenario({ file, logger });
const { generate } = await scenario({ ...runOptions, logger });
const { generate, bootstrap: scenarioBootsrap } = await scenario({ ...runOptions, logger });
if (scenarioBootsrap) {
await scenarioBootsrap({
apmEsClient,
logsEsClient,
infraEsClient,
otelEsClient,
syntheticsEsClient,
entitiesEsClient,
entitiesKibanaClient,
});
}
const bucketSizeInMs = 1000 * 60;
let requestedUntil = start;
@ -76,7 +89,7 @@ export async function startLiveDataUpload({
logsEsClient,
apmEsClient,
infraEsClient,
assetsEsClient,
entitiesEsClient,
syntheticsEsClient,
otelEsClient,
},

View file

@ -7,20 +7,21 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { parentPort, workerData } from 'worker_threads';
import pidusage from 'pidusage';
import { castArray } from 'lodash';
import { memoryUsage } from 'process';
import { timerange } from '@kbn/apm-synthtrace-client';
import { castArray } from 'lodash';
import pidusage from 'pidusage';
import { memoryUsage } from 'process';
import { parentPort, workerData } from 'worker_threads';
import { getApmEsClient } from './get_apm_es_client';
import { getEntitiesKibanaClient } from './get_entites_kibana_client';
import { getEntitiesEsClient } from './get_entities_es_client';
import { getInfraEsClient } from './get_infra_es_client';
import { getLogsEsClient } from './get_logs_es_client';
import { getOtelSynthtraceEsClient } from './get_otel_es_client';
import { getScenario } from './get_scenario';
import { getSyntheticsEsClient } from './get_synthetics_es_client';
import { loggerProxy } from './logger_proxy';
import { RunOptions } from './parse_run_cli_flags';
import { getLogsEsClient } from './get_logs_es_client';
import { getInfraEsClient } from './get_infra_es_client';
import { getAssetsEsClient } from './get_assets_es_client';
import { getSyntheticsEsClient } from './get_synthetics_es_client';
import { getOtelSynthtraceEsClient } from './get_otel_es_client';
export interface WorkerData {
bucketFrom: Date;
@ -29,18 +30,24 @@ export interface WorkerData {
workerId: string;
esUrl: string;
version: string;
kibanaUrl: string;
}
const { bucketFrom, bucketTo, runOptions, esUrl, version } = workerData as WorkerData;
const { bucketFrom, bucketTo, runOptions, esUrl, version, kibanaUrl } = workerData as WorkerData;
async function start() {
const logger = loggerProxy;
const assetsEsClient = getAssetsEsClient({
const entitiesEsClient = getEntitiesEsClient({
concurrency: runOptions.concurrency,
target: esUrl,
logger,
});
const entitiesKibanaClient = getEntitiesKibanaClient({
target: kibanaUrl,
logger,
});
const apmEsClient = getApmEsClient({
concurrency: runOptions.concurrency,
target: esUrl,
@ -85,9 +92,10 @@ async function start() {
apmEsClient,
logsEsClient,
infraEsClient,
assetsEsClient,
syntheticsEsClient,
otelEsClient,
entitiesEsClient,
entitiesKibanaClient,
});
}
@ -100,7 +108,7 @@ async function start() {
logsEsClient,
apmEsClient,
infraEsClient,
assetsEsClient,
entitiesEsClient,
syntheticsEsClient,
otelEsClient,
},

View file

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import fetch from 'node-fetch';
import { Logger } from '../../utils/create_logger';
import { kibanaHeaders } from '../../shared/client_headers';
import { getFetchAgent } from '../../../cli/utils/ssl';
interface EntityDefinitionResponse {
definitions: Array<{ type: string; state: { installed: boolean; running: boolean } }>;
}
export class EntitiesSynthtraceKibanaClient {
private readonly logger: Logger;
private target: string;
constructor(options: { logger: Logger; target: string }) {
this.logger = options.logger;
this.target = options.target;
}
async installEntityIndexPatterns() {
const url = `${this.target}/internal/entities/definition?includeState=true`;
const response = await fetch(url, {
method: 'GET',
headers: kibanaHeaders(),
agent: getFetchAgent(url),
});
const entityDefinition: EntityDefinitionResponse = await response.json();
const hasEntityDefinitionsInstalled = entityDefinition.definitions.find(
(definition) => definition.type === 'service'
)?.state.installed;
if (hasEntityDefinitionsInstalled === true) {
this.logger.debug('Entity definitions are already defined');
} else {
this.logger.debug('Installing Entity definitions');
const entityEnablementUrl = `${this.target}/internal/entities/managed/enablement?installOnly=true`;
await fetch(entityEnablementUrl, {
method: 'PUT',
headers: kibanaHeaders(),
agent: getFetchAgent(url),
});
}
}
async uninstallEntityIndexPatterns() {
const url = `${this.target}/internal/entities/managed/enablement`;
await fetch(url, {
method: 'DELETE',
headers: kibanaHeaders(),
agent: getFetchAgent(url),
});
}
}

View file

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { hashKeysOf, LogDocument } from '@kbn/apm-synthtrace-client';
import { ServiceAssetDocument } from '@kbn/apm-synthtrace-client/src/lib/assets/service_assets';
import { identity, noop } from 'lodash';
import { createLogsAssetsAggregator } from './create_logs_assets_aggregator';
const KEY_FIELDS: Array<keyof LogDocument> = ['service.name'];
export function createLogsServiceAssetsAggregator() {
return createLogsAssetsAggregator<ServiceAssetDocument>(
{
filter: (event) => event['input.type'] === 'logs',
getAggregateKey: (event) => {
// see https://github.com/elastic/apm-server/blob/main/x-pack/apm-server/aggregation/txmetrics/aggregator.go
return hashKeysOf(event as LogDocument, KEY_FIELDS as Array<keyof LogDocument>);
},
init: (event, firstSeen, lastSeen) => {
return {
'asset.id': event['service.name']!,
'asset.type': 'service',
'asset.identifying_metadata': ['service.name'],
'asset.first_seen': firstSeen,
'asset.last_seen': lastSeen,
'asset.signalTypes': {
'asset.logs': true,
},
'service.name': event['service.name']!,
};
},
},
noop,
identity
);
}

View file

@ -1,13 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { ApmFields } from '@kbn/apm-synthtrace-client';
import { createAssetsAggregatorFactory } from '../../utils/create_assets_aggregator_factory';
export const createTracesAssetsAggregator = createAssetsAggregatorFactory<ApmFields>();

View file

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { ApmFields, hashKeysOf } from '@kbn/apm-synthtrace-client';
import { ServiceAssetDocument } from '@kbn/apm-synthtrace-client/src/lib/assets/service_assets';
import { identity, noop } from 'lodash';
import { createTracesAssetsAggregator } from './create_traces_assets_aggregator';
const KEY_FIELDS: Array<keyof ApmFields> = ['service.name'];
export function createTracesServiceAssetsAggregator() {
return createTracesAssetsAggregator<ServiceAssetDocument>(
{
filter: (event) => event['processor.event'] === 'transaction',
getAggregateKey: (event) => {
// see https://github.com/elastic/apm-server/blob/main/x-pack/apm-server/aggregation/txmetrics/aggregator.go
return hashKeysOf(event as ApmFields, KEY_FIELDS as Array<keyof ApmFields>);
},
init: (event, firstSeen, lastSeen) => {
return {
'asset.id': event['service.name']!,
'asset.type': 'service',
'asset.identifying_metadata': ['service.name'],
'asset.first_seen': firstSeen,
'asset.last_seen': lastSeen,
'asset.signalTypes': {
'asset.traces': true,
},
'service.environment': event['service.environment'],
'service.name': event['service.name']!,
'service.node.name': event['service.node.name'],
'service.language.name': event['service.language.name'],
};
},
},
noop,
identity
);
}

View file

@ -1,116 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Client } from '@elastic/elasticsearch';
import {
ApmFields,
AssetDocument,
ESDocumentWithOperation,
LogDocument,
} from '@kbn/apm-synthtrace-client';
import { merge } from 'lodash';
import { PassThrough, pipeline, Readable, Transform } from 'stream';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getDedotTransform } from '../shared/get_dedot_transform';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';
import { fork } from '../utils/stream_utils';
import { createLogsServiceAssetsAggregator } from './aggregators/create_logs_service_assets_aggregator';
import { createTracesServiceAssetsAggregator } from './aggregators/create_traces_service_assets_aggregator';
export type AssetsSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;
export class AssetsSynthtraceEsClient extends SynthtraceEsClient<AssetDocument> {
constructor(options: { client: Client; logger: Logger } & AssetsSynthtraceEsClientOptions) {
super({
...options,
pipeline: assetsPipeline(),
});
this.indices = ['assets'];
}
}
function assetsPipeline() {
return (base: Readable) => {
const aggregators = [
createTracesServiceAssetsAggregator(),
createLogsServiceAssetsAggregator(),
];
return pipeline(
base,
getSerializeTransform(),
fork(new PassThrough({ objectMode: true }), ...aggregators),
getAssetsFilterTransform(),
getMergeAssetsTransform(),
getRoutingTransform(),
getDedotTransform(),
(err: unknown) => {
if (err) {
throw err;
}
}
);
};
}
function getAssetsFilterTransform() {
return new Transform({
objectMode: true,
transform(
document: ESDocumentWithOperation<AssetDocument | ApmFields | LogDocument>,
encoding,
callback
) {
if ('asset.id' in document) {
callback(null, document);
} else {
callback();
}
},
});
}
function getMergeAssetsTransform() {
const mergedDocuments: Record<string, AssetDocument> = {};
return new Transform({
objectMode: true,
transform(nextDocument: ESDocumentWithOperation<AssetDocument>, encoding, callback) {
const assetId = nextDocument['asset.id'];
if (!mergedDocuments[assetId]) {
mergedDocuments[assetId] = { ...nextDocument };
} else {
const mergedDocument = mergedDocuments[assetId];
mergedDocument['asset.signalTypes'] = merge(
mergedDocument['asset.signalTypes'],
nextDocument['asset.signalTypes']
);
}
callback();
},
flush(callback) {
Object.values(mergedDocuments).forEach((item) => this.push(item));
callback();
},
});
}
function getRoutingTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<AssetDocument>, encoding, callback) {
if ('asset.type' in document) {
document._index = `assets`;
} else {
throw new Error(`Cannot determine index for event ${JSON.stringify(document)}`);
}
callback(null, document);
},
});
}

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { Client } from '@elastic/elasticsearch';
import { EntityFields, ESDocumentWithOperation } from '@kbn/apm-synthtrace-client';
import { pipeline, Readable, Transform } from 'stream';
import { SynthtraceEsClient, SynthtraceEsClientOptions } from '../shared/base_client';
import { getDedotTransform } from '../shared/get_dedot_transform';
import { getSerializeTransform } from '../shared/get_serialize_transform';
import { Logger } from '../utils/create_logger';
export type EntitiesSynthtraceEsClientOptions = Omit<SynthtraceEsClientOptions, 'pipeline'>;
export class EntitiesSynthtraceEsClient extends SynthtraceEsClient<EntityFields> {
constructor(options: { client: Client; logger: Logger } & EntitiesSynthtraceEsClientOptions) {
super({
...options,
pipeline: entitiesPipeline(),
});
this.indices = ['.entities.v1.latest.builtin*'];
}
}
function entitiesPipeline() {
return (base: Readable) => {
return pipeline(
base,
getSerializeTransform(),
lastSeenTimestampTransform(),
getRoutingTransform(),
getDedotTransform(),
(err: unknown) => {
if (err) {
throw err;
}
}
);
};
}
function lastSeenTimestampTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<EntityFields>, encoding, callback) {
const timestamp = document['@timestamp'];
if (timestamp) {
const isoString = new Date(timestamp).toISOString();
document['entity.lastSeenTimestamp'] = isoString;
document['event.ingested'] = isoString;
delete document['@timestamp'];
}
callback(null, document);
},
});
}
function getRoutingTransform() {
return new Transform({
objectMode: true,
transform(document: ESDocumentWithOperation<EntityFields>, encoding, callback) {
const entityType: string | undefined = document['entity.type'];
if (entityType === undefined) {
throw new Error(`entity.type was not defined: ${JSON.stringify(document)}`);
}
const entityIndexName = `${entityType}s`;
document._action = {
index: {
_index: `.entities.v1.latest.builtin_${entityIndexName}_from_ecs_data`,
_id: document['entity.id'],
},
};
callback(null, document);
},
});
}

View file

@ -48,11 +48,7 @@ export class SynthtraceEsClient<TFields extends Fields> {
}
async clean() {
this.logger.info(
`Cleaning data streams "${this.dataStreams.join(',')}" and indices "${this.indices.join(
','
)}"`
);
this.logger.info(`Cleaning data streams: "${this.dataStreams.join(',')}"`);
const resolvedIndices = this.indices.length
? (
@ -65,6 +61,10 @@ export class SynthtraceEsClient<TFields extends Fields> {
).indices.map((index: { name: string }) => index.name)
: [];
if (resolvedIndices.length) {
this.logger.info(`Cleaning indices: "${resolvedIndices.join(',')}"`);
}
await Promise.all([
...(this.dataStreams.length
? [

View file

@ -1,94 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { appendHash, AssetDocument, Fields } from '@kbn/apm-synthtrace-client';
import { Duplex, PassThrough } from 'stream';
export function createAssetsAggregatorFactory<TFields extends Fields>() {
return function <TAsset extends AssetDocument>(
{
filter,
getAggregateKey,
init,
}: {
filter: (event: TFields) => boolean;
getAggregateKey: (event: TFields) => string;
init: (event: TFields, firstSeen: string, lastSeen: string) => TAsset;
},
reduce: (asset: TAsset, event: TFields) => void,
serialize: (asset: TAsset) => TAsset
) {
const assets: Map<string, TAsset> = new Map();
let toFlush: TAsset[] = [];
let cb: (() => void) | undefined;
function flush(stream: Duplex, includeCurrentAssets: boolean, callback?: () => void) {
const allItems = [...toFlush];
toFlush = [];
if (includeCurrentAssets) {
allItems.push(...assets.values());
assets.clear();
}
while (allItems.length) {
const next = allItems.shift()!;
const serialized = serialize(next);
const shouldWriteNext = stream.push(serialized);
if (!shouldWriteNext) {
toFlush = allItems;
cb = callback;
return;
}
}
const next = cb;
cb = undefined;
next?.();
callback?.();
}
const timeRanges: number[] = [];
return new PassThrough({
objectMode: true,
read() {
flush(this, false, cb);
},
final(callback) {
flush(this, true, callback);
},
write(event: TFields, encoding, callback) {
if (!filter(event)) {
callback();
return;
}
timeRanges.push(event['@timestamp']!);
const firstSeen = new Date(Math.min(...timeRanges)).toISOString();
const lastSeen = new Date(Math.max(...timeRanges)).toISOString();
const key = appendHash(getAggregateKey(event), '');
let asset = assets.get(key);
if (asset) {
// @ts-ignore
asset['asset.last_seen'] = lastSeen;
} else {
asset = init({ ...event }, firstSeen, lastSeen);
assets.set(key, asset);
}
reduce(asset, event);
callback();
},
});
};
}

View file

@ -9,72 +9,54 @@
import {
apm,
ApmFields,
generateLongId,
generateShortId,
infra,
Instance,
log,
Serializable,
entities,
EntityFields,
} from '@kbn/apm-synthtrace-client';
import { random } from 'lodash';
import { Readable } from 'stream';
import { Scenario } from '../cli/scenario';
import { IndexTemplateName } from '../lib/logs/custom_logsdb_index_templates';
import { getSynthtraceEnvironment } from '../lib/utils/get_synthtrace_environment';
import { withClient } from '../lib/utils/with_client';
import { parseLogsScenarioOpts } from './helpers/logs_scenario_opts_parser';
import { IndexTemplateName } from '../lib/logs/custom_logsdb_index_templates';
const ENVIRONMENT = getSynthtraceEnvironment(__filename);
const scenario: Scenario<ApmFields> = async (runOptions) => {
const { logger, scenarioOpts } = runOptions;
const { numServices = 3, numHosts = 10 } = runOptions.scenarioOpts || {};
const { isLogsDb } = parseLogsScenarioOpts(scenarioOpts);
const MESSAGE_LOG_LEVELS = [
{ message: 'A simple log with something random <random> in the middle', level: 'info' },
{ message: 'Yet another debug log', level: 'debug' },
{ message: 'Error with certificate: "ca_trusted_fingerprint"', level: 'error' },
];
const SYNTH_JAVA_TRACE_ENTITY_ID = generateShortId();
const SYNTH_NODE_TRACES_LOGS_ENTITY_ID = generateShortId();
const SYNTH_GO_LOGS_ENTITY_ID = generateShortId();
const scenario: Scenario<Partial<EntityFields>> = async (runOptions) => {
const { logger } = runOptions;
const { isLogsDb } = parseLogsScenarioOpts(runOptions.scenarioOpts);
return {
bootstrap: async ({ logsEsClient }) => {
bootstrap: async ({ entitiesKibanaClient, logsEsClient }) => {
await entitiesKibanaClient.installEntityIndexPatterns();
if (isLogsDb) await logsEsClient.createIndexTemplate(IndexTemplateName.LogsDb);
},
generate: ({
range,
clients: { apmEsClient, assetsEsClient, logsEsClient, infraEsClient },
}) => {
generate: ({ range, clients: { entitiesEsClient, logsEsClient, apmEsClient } }) => {
const transactionName = '240rpm/75% 1000ms';
const entityHistoryTimestamps = range.interval('1m').rate(1);
const successfulTimestamps = range.interval('1m').rate(1);
const failedTimestamps = range.interval('1m').rate(1);
const serviceNames = [...Array(numServices).keys()].map((index) => `apm-only-${index}`);
serviceNames.push('multi-signal-service');
const HOSTS = Array(numHosts)
.fill(0)
.map((_, idx) => infra.host(`my-host-${idx}`));
const hosts = range
.interval('30s')
.rate(1)
.generator((timestamp) =>
HOSTS.flatMap((host) => [
host.cpu().timestamp(timestamp),
host.memory().timestamp(timestamp),
host.network().timestamp(timestamp),
host.load().timestamp(timestamp),
host.filesystem().timestamp(timestamp),
host.diskio().timestamp(timestamp),
])
);
const instances = serviceNames.map((serviceName) =>
apm
.service({ name: serviceName, environment: ENVIRONMENT, agentName: 'nodejs' })
.instance('instance')
);
const instanceSpans = (instance: Instance, index: number) => {
const instanceSpans = (instance: Instance) => {
const successfulTraceEvents = successfulTimestamps.generator((timestamp) =>
instance
.transaction({ transactionName })
.timestamp(timestamp)
.duration(random(100, (index % 4) * 1000, false))
.duration(1000)
.success()
.children(
instance
@ -128,13 +110,25 @@ const scenario: Scenario<ApmFields> = async (runOptions) => {
return [...successfulTraceEvents, ...failedTraceEvents, ...metricsets];
};
const MESSAGE_LOG_LEVELS = [
{ message: 'A simple log with something random <random> in the middle', level: 'info' },
{ message: 'Yet another debug log', level: 'debug' },
{ message: 'Error with certificate: "ca_trusted_fingerprint"', level: 'error' },
];
const SYNTH_JAVA_TRACE = 'synth-java-trace';
const apmOnlyInstance = apm
.service({ name: SYNTH_JAVA_TRACE, agentName: 'java', environment: ENVIRONMENT })
.instance('intance');
const apmOnlyEvents = instanceSpans(apmOnlyInstance);
const synthJavaTraces = entities.serviceEntity({
serviceName: SYNTH_JAVA_TRACE,
agentName: ['java'],
dataStreamType: ['traces'],
environment: ENVIRONMENT,
entityId: SYNTH_JAVA_TRACE_ENTITY_ID,
});
const logsWithTraces = range
const SYNTH_NODE_TRACE_LOGS = 'synth-node-trace-logs';
const apmAndLogsInstance = apm
.service({ name: SYNTH_NODE_TRACE_LOGS, agentName: 'nodejs', environment: ENVIRONMENT })
.instance('intance');
const apmAndLogsApmEvents = instanceSpans(apmAndLogsInstance);
const apmAndLogsLogsEvents = range
.interval('1m')
.rate(1)
.generator((timestamp) => {
@ -153,14 +147,14 @@ const scenario: Scenario<ApmFields> = async (runOptions) => {
.create({ isLogsDb })
.message(message.replace('<random>', generateShortId()))
.logLevel(level)
.service('multi-signal-service')
.service(SYNTH_NODE_TRACE_LOGS)
.defaults({
'trace.id': generateShortId(),
'agent.name': 'nodejs',
'orchestrator.cluster.name': CLUSTER.clusterName,
'orchestrator.cluster.id': CLUSTER.clusterId,
'orchestrator.namespace': CLUSTER.namespace,
'container.name': `${serviceNames[0]}-${generateShortId()}`,
'container.name': `${SYNTH_NODE_TRACE_LOGS}-${generateShortId()}`,
'orchestrator.resource.id': generateShortId(),
'cloud.provider': 'gcp',
'cloud.region': 'eu-central-1',
@ -173,8 +167,16 @@ const scenario: Scenario<ApmFields> = async (runOptions) => {
.timestamp(timestamp);
});
});
const synthNodeTracesLogs = entities.serviceEntity({
serviceName: SYNTH_NODE_TRACE_LOGS,
agentName: ['nodejs'],
dataStreamType: ['traces', 'logs'],
environment: ENVIRONMENT,
entityId: SYNTH_NODE_TRACES_LOGS_ENTITY_ID,
});
const logsOnly = range
const SYNTH_GO_LOGS = 'synth-go-logs';
const logsEvents = range
.interval('1m')
.rate(1)
.generator((timestamp) => {
@ -193,57 +195,67 @@ const scenario: Scenario<ApmFields> = async (runOptions) => {
.create({ isLogsDb })
.message(message.replace('<random>', generateShortId()))
.logLevel(level)
.service('logs-only-services')
.service(SYNTH_GO_LOGS)
.defaults({
'trace.id': generateShortId(),
'agent.name': 'nodejs',
'orchestrator.cluster.name': CLUSTER.clusterName,
'orchestrator.cluster.id': CLUSTER.clusterId,
'orchestrator.namespace': CLUSTER.namespace,
'container.name': `logs-only-${generateShortId()}`,
'container.name': `${SYNTH_GO_LOGS}-${generateShortId()}`,
'orchestrator.resource.id': generateShortId(),
'cloud.provider': 'gcp',
'cloud.region': 'eu-central-1',
'cloud.availability_zone': 'eu-central-1a',
'log.level': 'error',
'cloud.project.id': generateShortId(),
'cloud.instance.id': generateShortId(),
'log.file.path': `/logs/${generateLongId()}/error.txt`,
'log.level': 'error',
})
.timestamp(timestamp);
});
});
const synthGoTraces = entities.serviceEntity({
serviceName: SYNTH_GO_LOGS,
agentName: ['go'],
dataStreamType: ['logs'],
environment: ENVIRONMENT,
entityId: SYNTH_GO_LOGS_ENTITY_ID,
});
function* createGeneratorFromArray(arr: Array<Serializable<any>>) {
yield* arr;
}
const entitiesEvents = entityHistoryTimestamps.generator((timestamp) => {
return [
synthNodeTracesLogs.timestamp(timestamp),
synthJavaTraces.timestamp(timestamp),
synthGoTraces.timestamp(timestamp),
];
});
const logsValuesArray = [...logsWithTraces, ...logsOnly];
const logsGen = createGeneratorFromArray(logsValuesArray);
const logsGenAssets = createGeneratorFromArray(logsValuesArray);
const traces = instances.flatMap((instance, index) => instanceSpans(instance, index));
const tracesGen = createGeneratorFromArray(traces);
const tracesGenAssets = createGeneratorFromArray(traces);
const apmPython = apm
.service({ name: 'synth-python', agentName: 'python', environment: ENVIRONMENT })
.instance('intance');
const apmPythonEvents = instanceSpans(apmPython);
return [
withClient(
assetsEsClient,
logger.perf('generating_assets_events', () =>
Readable.from(Array.from(logsGenAssets).concat(Array.from(tracesGenAssets)))
)
entitiesEsClient,
logger.perf('generating_entities_events', () => entitiesEvents)
),
withClient(
logsEsClient,
logger.perf('generating_logs', () => logsGen)
logger.perf('generating_logs', () =>
Readable.from(Array.from(apmAndLogsLogsEvents).concat(Array.from(logsEvents)))
)
),
withClient(
apmEsClient,
logger.perf('generating_apm_events', () => tracesGen)
),
withClient(
infraEsClient,
logger.perf('generating_infra_hosts', () => hosts)
logger.perf('generating_apm_events', () =>
Readable.from(
Array.from(apmOnlyEvents).concat(
Array.from(apmAndLogsApmEvents).concat(Array.from(apmPythonEvents))
)
)
)
),
];
},

View file

@ -11,7 +11,7 @@ import {
ApmSynthtraceEsClient,
ApmSynthtraceKibanaClient,
LogsSynthtraceEsClient,
AssetsSynthtraceEsClient,
EntitiesSynthtraceEsClient,
createLogger,
LogLevel,
} from '@kbn/apm-synthtrace';
@ -83,9 +83,9 @@ export interface CreateTest {
context: InheritedFtrProviderContext
) => Promise<LogsSynthtraceEsClient>;
synthtraceEsClient: (context: InheritedFtrProviderContext) => Promise<ApmSynthtraceEsClient>;
assetsSynthtraceEsClient: (
entitiesSynthtraceEsClient: (
context: InheritedFtrProviderContext
) => Promise<AssetsSynthtraceEsClient>;
) => Promise<EntitiesSynthtraceEsClient>;
apmSynthtraceEsClient: (context: InheritedFtrProviderContext) => Promise<ApmSynthtraceEsClient>;
synthtraceKibanaClient: (
context: InheritedFtrProviderContext
@ -132,8 +132,8 @@ export function createTestConfig(
logger: createLogger(LogLevel.info),
refreshAfterIndex: true,
}),
assetsSynthtraceEsClient: (context: InheritedFtrProviderContext) =>
new AssetsSynthtraceEsClient({
entitiesSynthtraceEsClient: (context: InheritedFtrProviderContext) =>
new EntitiesSynthtraceEsClient({
client: context.getService('es'),
logger: createLogger(LogLevel.info),
refreshAfterIndex: true,