[Data Forge] Add artificial delay feature (#187901)

## Summary

This PR adds a new setting, `indexing.artificialIndexDelay`, to the
indexing configuration to control how much artificial delay to add to
the timestamps. This PR also adds a "final" ingest pipeline to each data
source along with injecting a new base `component_template` which
includes the `event.ingested` field.

The artificial delay is useful for testing transforms on data that has a
significant delays. It also allows us to test if we miss data when
syncing on the transforms using `event.ingested`.

- Installs default ingest pipeline to add event.ingested to each
document
- Adds final_pipeline to each install_index_template
- Inject base component_template to each index_template at install time
- Add artificial delay for "current" events, historical events are
ingested without delay.
- Change index math to produce monthly indices

### How to test:

Copy the following to `fake_logs.delayed.yaml`:

```YAML
---
elasticsearch:
  installKibanaUser: false

kibana:
  installAssets: true
  host: "http://localhost:5601/kibana"

indexing:
  dataset: "fake_logs"
  eventsPerCycle: 100
  artificialIndexDelay: 300000

schedule:
  - template: "good"
    start: "now-1h"
    end: false
    eventsPerCycle: 100
```
Then run `node x-pack/scripts/data_forge.js --config
fake_logs.delayed.yaml`. This should index an hour of data immediately,
then add a 300s delay when indexing in "real time". The logs will look
like:

```
 info Starting index to http://localhost:9200 with a payload size of 10000 using 5 workers to index 100 events per cycle
 info Installing index templates (fake_logs)
 info Installing components for fake_logs (fake_logs_8.0.0_base,fake_logs_8.0.0_event,fake_logs_8.0.0_log,fake_logs_8.0.0_host,fake_logs_8.0.0_metricset)
 info Installing index template (fake_logs)
 info Indexing "good" events from 2024-07-09T16:23:36.803Z to indefinitely
 info Delaying 100 by 300000ms
 info Waiting 60000ms
 info { took: 2418721239, latency: 541, indexed: 6000 } Indexing 6000 documents.
...
```
Then after `300s`, it will index another `100` documents every `60s`.
You can also inspect the delay per minute using the following ES|QL in
Discover:
```
FROM kbn-data-forge-fake_logs.fake_logs-* | eval diff=DATE_DIFF("seconds", @timestamp, event.ingested) | STATS delay=AVG(diff) by timestamp=BUCKET(@timestamp, 1 minute)
```
This should give you a chart that looks something like this:

<img width="1413" alt="image"
src="2f48cb85-a410-487e-8f3b-41311ff95186">


There should also be a 5 minute gap at the end in Discover:

<img width="1413" alt="image"
src="660acc87-6958-4ce9-a544-d66d56f805dd">

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Chris Cowan 2024-07-15 10:49:34 -06:00 committed by GitHub
parent 9291a4f484
commit 2fac5e8462
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 332 additions and 17 deletions

View file

@ -20,6 +20,18 @@
level: custom
type: float
description: "Percentage of CPU usage by system processes"
- name: load.1
level: custom
type: float
description: "Load 1m by system processes"
- name: memory.actual.used.pct
level: custom
type: float
description: "Percentage of actual memory by system processes"
- name: filesystem.used.pct
level: custom
type: float
description: "Percentage of filesytem used by system processes"
- name: network.name
type: keyword
level: custom

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -398,6 +398,21 @@
type: float
description: Percentage of CPU usage by user processes
default_field: false
- name: filesystem.used.pct
level: custom
type: float
description: Percentage of filesytem used by system processes
default_field: false
- name: load.1
level: custom
type: float
description: Load 1m by system processes
default_field: false
- name: memory.actual.used.pct
level: custom
type: float
description: Percentage of actual memory by system processes
default_field: false
- name: network.in.bytes
level: custom
type: long

View file

@ -46,6 +46,9 @@ ECS_Version,Indexed,Field_Set,Field,Type,Level,Normalization,Example,Description
8.0.0,true,system,system.cpu.system.pct,float,custom,,,Percentage of CPU usage by system processes
8.0.0,true,system,system.cpu.total.norm.pct,float,custom,,,Percentage of CPU usage
8.0.0,true,system,system.cpu.user.pct,float,custom,,,Percentage of CPU usage by user processes
8.0.0,true,system,system.filesystem.used.pct,float,custom,,,Percentage of filesytem used by system processes
8.0.0,true,system,system.load.1,float,custom,,,Load 1m by system processes
8.0.0,true,system,system.memory.actual.used.pct,float,custom,,,Percentage of actual memory by system processes
8.0.0,true,system,system.network.in.bytes,long,custom,,,Number of incoming bytes
8.0.0,true,system,system.network.name,keyword,custom,,,Name of the network interface
8.0.0,true,system,system.network.out.bytes,long,custom,,,Number of outgoing bytes

1 ECS_Version Indexed Field_Set Field Type Level Normalization Example Description
46 8.0.0 true system system.cpu.system.pct float custom Percentage of CPU usage by system processes
47 8.0.0 true system system.cpu.total.norm.pct float custom Percentage of CPU usage
48 8.0.0 true system system.cpu.user.pct float custom Percentage of CPU usage by user processes
49 8.0.0 true system system.filesystem.used.pct float custom Percentage of filesytem used by system processes
50 8.0.0 true system system.load.1 float custom Load 1m by system processes
51 8.0.0 true system system.memory.actual.used.pct float custom Percentage of actual memory by system processes
52 8.0.0 true system system.network.in.bytes long custom Number of incoming bytes
53 8.0.0 true system system.network.name keyword custom Name of the network interface
54 8.0.0 true system system.network.out.bytes long custom Number of outgoing bytes

View file

@ -554,6 +554,33 @@ system.cpu.user.pct:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -638,6 +638,33 @@ system:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -554,6 +554,33 @@ system.cpu.user.pct:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -638,6 +638,33 @@ system:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -39,6 +39,39 @@
}
}
},
"filesystem": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
},
"load": {
"properties": {
"1": {
"type": "float"
}
}
},
"memory": {
"properties": {
"actual": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
}
}
},
"network": {
"properties": {
"in": {

View file

@ -41,6 +41,7 @@
"settings": {
"index": {
"codec": "best_compression",
"final_pipeline": "kbn-data-forge-add-event-ingested",
"mapping": {
"total_fields": {
"limit": 2000

View file

@ -264,6 +264,39 @@
}
}
},
"filesystem": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
},
"load": {
"properties": {
"1": {
"type": "float"
}
}
},
"memory": {
"properties": {
"actual": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
}
}
},
"network": {
"properties": {
"in": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -40,6 +40,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -43,6 +43,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -38,6 +38,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -39,6 +39,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -39,6 +39,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -40,6 +40,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -63,6 +63,7 @@ export function createConfig(partialConfig: PartialConfig = {}) {
reduceWeekendTrafficBy: DEFAULTS.REDUCE_WEEKEND_TRAFFIC_BY,
ephemeralProjectIds: DEFAULTS.EPHEMERAL_PROJECT_IDS,
alignEventsToInterval: DEFAULTS.ALIGN_EVENTS_TO_INTERVAL,
artificialIndexDelay: 0,
...(partialConfig.indexing ?? {}),
},
schedule: partialConfig.schedule ?? [schedule],

View file

@ -10,7 +10,13 @@ import moment from 'moment';
import { isNumber, random, range } from 'lodash';
import { ToolingLog } from '@kbn/tooling-log';
import { Client } from '@elastic/elasticsearch';
import { Config, EventsPerCycle, EventsPerCycleTransitionDefRT, ParsedSchedule } from '../types';
import {
Config,
Doc,
EventsPerCycle,
EventsPerCycleTransitionDefRT,
ParsedSchedule,
} from '../types';
import { generateEvents } from '../data_sources';
import { createQueue } from './queue';
import { wait } from './wait';
@ -69,6 +75,7 @@ export async function createEvents(
const interval = schedule.interval ?? config.indexing.interval;
const calculateEventsPerCycle = createEventsPerCycleFn(schedule, eventsPerCycle, logger);
const totalEvents = calculateEventsPerCycle(currentTimestamp);
const endTs = end === false ? moment() : end;
if (totalEvents > 0) {
let epc = schedule.randomness
@ -86,34 +93,34 @@ export async function createEvents(
// When --align-events-to-interval is set, we will index all the events on the same
// timestamp. Otherwise they will be distributed across the interval randomly.
let events: Doc[];
const eventTimestamp = currentTimestamp
.clone()
.subtract(config.indexing.artificialIndexDelay + interval);
if (config.indexing.alignEventsToInterval) {
range(epc)
events = range(epc)
.map((i) => {
const generateEvent = generateEvents[config.indexing.dataset] || generateEvents.fake_logs;
return generateEvent(config, schedule, i, currentTimestamp);
return generateEvent(config, schedule, i, eventTimestamp);
})
.flat()
.forEach((event) => queue.push(event));
.flat();
} else {
range(epc)
events = range(epc)
.map(() =>
moment(random(currentTimestamp.valueOf(), currentTimestamp.valueOf() + interval - 1))
moment(random(eventTimestamp.valueOf(), eventTimestamp.valueOf() + interval - 1))
)
.sort()
.map((ts, i) => {
const generateEvent = generateEvents[config.indexing.dataset] || generateEvents.fake_logs;
return generateEvent(config, schedule, i, ts);
})
.flat()
.forEach((event) => queue.push(event));
.flat();
}
await queue.drain();
await queue.push(events);
} else {
logger.info({ took: 0, latency: 0, indexed: 0 }, 'Indexing 0 documents.');
}
const endTs = end === false ? moment() : end;
if (currentTimestamp.isBefore(endTs)) {
return createEvents(
config,

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import { Config } from '../types';
const eventIngestedCommonComponentTemplate = {
_meta: {
documentation: 'https://www.elastic.co/guide/en/ecs/current/ecs-event.html',
ecs_version: '8.0.0',
},
template: {
mappings: {
properties: {
event: {
properties: {
ingested: {
type: 'date',
},
},
},
},
},
},
};
export async function installDefaultComponentTemplate(
_config: Config,
client: Client,
logger: ToolingLog
) {
logger.info('Installing base component template: kbn-data-forge_base');
await client.cluster.putComponentTemplate({
name: `kbn-data-forge_base`,
...eventIngestedCommonComponentTemplate,
});
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import { Config } from '../types';
const processors = [
{
set: {
field: 'event.ingested',
value: '{{{_ingest.timestamp}}}',
},
},
];
export async function installDefaultIngestPipeline(
_config: Config,
client: Client,
logger: ToolingLog
) {
logger.info('Installing default ingest pipeline: kbn-data-forge-add-event-ingested');
return client.ingest.putPipeline({
id: 'kbn-data-forge-add-event-ingested',
processors,
version: 1,
});
}

View file

@ -7,6 +7,7 @@
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import { isArray } from 'lodash';
import { indexTemplates } from '../data_sources';
import { Config } from '../types';
@ -26,9 +27,14 @@ export async function installIndexTemplate(
await client.cluster.putComponentTemplate({ name: component.name, ...component.template });
}
logger.info(`Installing index template (${indexTemplateDef.namespace})`);
// Clone the template and add the base component name
const template = { ...indexTemplateDef.template };
if (isArray(template.composed_of)) {
template.composed_of.push('kbn-data-forge_base');
}
await client.indices.putIndexTemplate({
name: indexTemplateDef.namespace,
body: indexTemplateDef.template,
body: template,
});
}
}

View file

@ -26,7 +26,7 @@ export const createQueue = (config: Config, client: Client, logger: ToolingLog):
docs.forEach((doc) => {
const namespace = `${config.indexing.dataset}.${doc.namespace}`;
const indexName = `${INDEX_PREFIX}-${namespace}-${moment(doc['@timestamp']).format(
'YYYY-MM-DD'
'YYYY-MM-01'
)}`;
indices.add(indexName);
body.push({ create: { _index: indexName } });

View file

@ -13,8 +13,12 @@ import { installAssets } from './lib/install_assets';
import { indexSchedule } from './lib/index_schedule';
import { installIndexTemplate } from './lib/install_index_template';
import { indices } from './lib/indices';
import { installDefaultIngestPipeline } from './lib/install_default_ingest_pipeline';
import { installDefaultComponentTemplate } from './lib/install_default_component_template';
export async function run(config: Config, client: Client, logger: ToolingLog) {
await installDefaultComponentTemplate(config, client, logger);
await installDefaultIngestPipeline(config, client, logger);
await installIndexTemplate(config, client, logger);
if (config.elasticsearch.installKibanaUser) {
await setupKibanaSystemUser(config, client, logger);
@ -23,6 +27,6 @@ export async function run(config: Config, client: Client, logger: ToolingLog) {
await indexSchedule(config, client, logger);
const indicesCreated = [...indices];
indices.clear();
await client.indices.refresh({ index: indicesCreated });
await client.indices.refresh({ index: indicesCreated, ignore_unavailable: true });
return indicesCreated;
}

View file

@ -120,6 +120,7 @@ export const ConfigRT = rt.type({
reduceWeekendTrafficBy: rt.number,
ephemeralProjectIds: rt.number,
alignEventsToInterval: rt.boolean,
artificialIndexDelay: rt.number,
}),
schedule: rt.array(ScheduleRT),
});

View file

@ -69,8 +69,12 @@ export async function waitForDocumentInIndex<T>({
}): Promise<SearchResponse<T, Record<string, AggregationsAggregate>>> {
return await retry<SearchResponse<T, Record<string, AggregationsAggregate>>>({
test: async () => {
const response = await esClient.search<T>({ index: indexName, rest_total_hits_as_int: true });
if (!response.hits.total || response.hits.total < docCountTarget) {
const response = await esClient.search<T>({
index: indexName,
rest_total_hits_as_int: true,
ignore_unavailable: true,
});
if (!response.hits.total || (response.hits.total as number) < docCountTarget) {
throw new Error(
`Number of hits does not match expectation (total: ${response.hits.total}, target: ${docCountTarget})`
);