Synthtrace: make streamprocessors optional (#132889)

This commit is contained in:
Martijn Laarman 2022-05-25 14:25:12 +02:00 committed by GitHub
parent 2143c6bba5
commit b5ab073189
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 12 deletions

View file

@ -137,11 +137,12 @@ Note:
### Setup options
| Option | Type | Default | Description |
|------------------------|-----------|:-----------|---------------------------------------------------------------------------------------------------------|
| `--numShards` | [number] | | Updates the component templates to update the number of primary shards, requires cloudId to be provided |
| `--clean` | [boolean] | `false` | Clean APM data before indexing new data |
| `--workers` | [number] | | Amount of Node.js worker threads |
| `--logLevel` | [enum] | `info` | Log level |
| `--gcpRepository` | [string] | | Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format |
| Option | Type | Default | Description |
|-------------------|-----------|:-----------|---------------------------------------------------------------------------------------------------------|
| `--numShards` | [number] | | Updates the component templates to update the number of primary shards, requires cloudId to be provided |
| `--clean` | [boolean] | `false` | Clean APM data before indexing new data |
| `--workers` | [number] | | Amount of Node.js worker threads |
| `--logLevel` | [enum] | `info` | Log level |
| `--gcpRepository` | [string] | | Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format |
| `-p` | [string] | | Specify multiple sets of streamaggregators to be included in the StreamProcessor |

View file

@ -258,7 +258,19 @@ export class ApmSynthtraceEsClient {
});
this.logger.info(`Created index template for ${datastreamName}-*`);
await this.client.indices.createDataStream({ name: datastreamName + '-default' });
const dataStreamWithNamespace = datastreamName + '-default';
const getDataStreamResponse = await this.client.indices.getDataStream(
{
name: dataStreamWithNamespace,
},
{ ignore: [404] }
);
if (getDataStreamResponse.data_streams && getDataStreamResponse.data_streams.length === 0) {
await this.client.indices.createDataStream({ name: dataStreamWithNamespace });
this.logger.info(`Created data stream: ${dataStreamWithNamespace}.`);
} else {
this.logger.info(`Data stream: ${dataStreamWithNamespace} already exists.`);
}
await aggregator.bootstrapElasticsearch(this.client);
}

View file

@ -109,7 +109,13 @@ function options(y: Argv) {
'Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format',
string: true,
})
.option('streamProcessors', {
describe:
'Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format',
string: true,
array: true,
alias: 'p',
})
.conflicts('target', 'cloudId')
.conflicts('kibana', 'cloudId')
.conflicts('local', 'target')
@ -187,8 +193,22 @@ yargs(process.argv.slice(2))
if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) {
await apmEsClient.updateComponentTemplates(runOptions.numShards);
}
const aggregators: StreamAggregator[] = [new ServiceLatencyAggregator()];
const aggregators: StreamAggregator[] = [];
const registry = new Map<string, () => StreamAggregator[]>([
['service', () => [new ServiceLatencyAggregator()]],
]);
if (runOptions.streamProcessors && runOptions.streamProcessors.length > 0) {
for (const processorName of runOptions.streamProcessors) {
const factory = registry.get(processorName);
if (factory) {
aggregators.push(...factory());
} else {
throw new Error(
`No processor named ${processorName} configured on known processor registry`
);
}
}
}
if (argv.clean) {
await apmEsClient.clean(aggregators.map((a) => a.getDataStreamName() + '-*'));
}

View file

@ -50,7 +50,8 @@ export function parseRunCliFlags(flags: RunCliFlags) {
'scenarioOpts',
'forceLegacyIndices',
'dryRun',
'gcpRepository'
'gcpRepository',
'streamProcessors'
),
logLevel: parsedLogLevel,
file: parsedFile,