[8.15] [Data Forge] Add artificial delay feature (#187901) (#188333)

# Backport

This will backport the following commits from `main` to `8.15`:
- [[Data Forge] Add artificial delay feature
(#187901)](https://github.com/elastic/kibana/pull/187901)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Chris
Cowan","email":"chris@elastic.co"},"sourceCommit":{"committedDate":"2024-07-15T16:49:34Z","message":"[Data
Forge] Add artificial delay feature (#187901)\n\n## Summary\r\n\r\nThis
PR adds a new setting, `indexing.artificialIndexDelay`, to
the\r\nindexing configuration to control how much artificial delay to
add to\r\nthe timestamps. This PR also adds a \"final\" ingest pipeline
to each data\r\nsource along with injecting a new base
`component_template` which\r\nincludes the `event.ingested`
field.\r\n\r\nThe artificial delay is useful for testing transforms on
data that has a\r\nsignificant delays. It also allows us to test if we
miss data when\r\nsyncing on the transforms using
`event.ingested`.\r\n\r\n- Installs default ingest pipeline to add
event.ingested to each\r\ndocument\r\n- Adds final_pipeline to each
install_index_template\r\n- Inject base component_template to each
index_template at install time\r\n- Add artificial delay for \"current\"
events, historical events are\r\ningested without delay.\r\n- Change
index math to produce monthly indices\r\n\r\n### How to
test:\r\n\r\nCopy the following to
`fake_logs.delayed.yaml`:\r\n\r\n```YAML\r\n---\r\nelasticsearch:\r\n
installKibanaUser: false\r\n\r\nkibana:\r\n installAssets: true\r\n
host: \"http://localhost:5601/kibana\"\r\n\r\nindexing:\r\n dataset:
\"fake_logs\"\r\n eventsPerCycle: 100\r\n artificialIndexDelay:
300000\r\n\r\nschedule:\r\n - template: \"good\"\r\n start:
\"now-1h\"\r\n end: false\r\n eventsPerCycle: 100\r\n```\r\nThen run
`node x-pack/scripts/data_forge.js --config\r\nfake_logs.delayed.yaml`.
This should index an hour of data immediately,\r\nthen add a 300s delay
when indexing in \"real time\". The logs will
look\r\nlike:\r\n\r\n```\r\n info Starting index to
http://localhost:9200 with a payload size of 10000 using 5 workers to
index 100 events per cycle\r\n info Installing index templates
(fake_logs)\r\n info Installing components for fake_logs
(fake_logs_8.0.0_base,fake_logs_8.0.0_event,fake_logs_8.0.0_log,fake_logs_8.0.0_host,fake_logs_8.0.0_metricset)\r\n
info Installing index template (fake_logs)\r\n info Indexing \"good\"
events from 2024-07-09T16:23:36.803Z to indefinitely\r\n info Delaying
100 by 300000ms\r\n info Waiting 60000ms\r\n info { took: 2418721239,
latency: 541, indexed: 6000 } Indexing 6000
documents.\r\n...\r\n```\r\nThen after `300s`, it will index another
`100` documents every `60s`.\r\nYou can also inspect the delay per
minute using the following ES|QL in\r\nDiscover:\r\n```\r\nFROM
kbn-data-forge-fake_logs.fake_logs-* | eval diff=DATE_DIFF(\"seconds\",
@timestamp, event.ingested) | STATS delay=AVG(diff) by
timestamp=BUCKET(@timestamp, 1 minute)\r\n```\r\nThis should give you a
chart that looks something like this:\r\n\r\n<img width=\"1413\"
alt=\"image\"\r\nsrc=\"2f48cb85-a410-487e-8f3b-41311ff95186\">\r\n\r\n\r\nThere
should also be a 5 minute gap at the end in Discover:\r\n\r\n<img
width=\"1413\"
alt=\"image\"\r\nsrc=\"660acc87-6958-4ce9-a544-d66d56f805dd\">\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"2fac5e8462beb1da5223b7097dab2cfd9011e035","branchLabelMapping":{"^v8.16.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","v8.15.0","v8.16.0","Feature:EEM"],"title":"[Data
Forge] Add artificial delay
feature","number":187901,"url":"https://github.com/elastic/kibana/pull/187901","mergeCommit":{"message":"[Data
Forge] Add artificial delay feature (#187901)\n\n## Summary\r\n\r\nThis
PR adds a new setting, `indexing.artificialIndexDelay`, to
the\r\nindexing configuration to control how much artificial delay to
add to\r\nthe timestamps. This PR also adds a \"final\" ingest pipeline
to each data\r\nsource along with injecting a new base
`component_template` which\r\nincludes the `event.ingested`
field.\r\n\r\nThe artificial delay is useful for testing transforms on
data that has a\r\nsignificant delays. It also allows us to test if we
miss data when\r\nsyncing on the transforms using
`event.ingested`.\r\n\r\n- Installs default ingest pipeline to add
event.ingested to each\r\ndocument\r\n- Adds final_pipeline to each
install_index_template\r\n- Inject base component_template to each
index_template at install time\r\n- Add artificial delay for \"current\"
events, historical events are\r\ningested without delay.\r\n- Change
index math to produce monthly indices\r\n\r\n### How to
test:\r\n\r\nCopy the following to
`fake_logs.delayed.yaml`:\r\n\r\n```YAML\r\n---\r\nelasticsearch:\r\n
installKibanaUser: false\r\n\r\nkibana:\r\n installAssets: true\r\n
host: \"http://localhost:5601/kibana\"\r\n\r\nindexing:\r\n dataset:
\"fake_logs\"\r\n eventsPerCycle: 100\r\n artificialIndexDelay:
300000\r\n\r\nschedule:\r\n - template: \"good\"\r\n start:
\"now-1h\"\r\n end: false\r\n eventsPerCycle: 100\r\n```\r\nThen run
`node x-pack/scripts/data_forge.js --config\r\nfake_logs.delayed.yaml`.
This should index an hour of data immediately,\r\nthen add a 300s delay
when indexing in \"real time\". The logs will
look\r\nlike:\r\n\r\n```\r\n info Starting index to
http://localhost:9200 with a payload size of 10000 using 5 workers to
index 100 events per cycle\r\n info Installing index templates
(fake_logs)\r\n info Installing components for fake_logs
(fake_logs_8.0.0_base,fake_logs_8.0.0_event,fake_logs_8.0.0_log,fake_logs_8.0.0_host,fake_logs_8.0.0_metricset)\r\n
info Installing index template (fake_logs)\r\n info Indexing \"good\"
events from 2024-07-09T16:23:36.803Z to indefinitely\r\n info Delaying
100 by 300000ms\r\n info Waiting 60000ms\r\n info { took: 2418721239,
latency: 541, indexed: 6000 } Indexing 6000
documents.\r\n...\r\n```\r\nThen after `300s`, it will index another
`100` documents every `60s`.\r\nYou can also inspect the delay per
minute using the following ES|QL in\r\nDiscover:\r\n```\r\nFROM
kbn-data-forge-fake_logs.fake_logs-* | eval diff=DATE_DIFF(\"seconds\",
@timestamp, event.ingested) | STATS delay=AVG(diff) by
timestamp=BUCKET(@timestamp, 1 minute)\r\n```\r\nThis should give you a
chart that looks something like this:\r\n\r\n<img width=\"1413\"
alt=\"image\"\r\nsrc=\"2f48cb85-a410-487e-8f3b-41311ff95186\">\r\n\r\n\r\nThere
should also be a 5 minute gap at the end in Discover:\r\n\r\n<img
width=\"1413\"
alt=\"image\"\r\nsrc=\"660acc87-6958-4ce9-a544-d66d56f805dd\">\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"2fac5e8462beb1da5223b7097dab2cfd9011e035"}},"sourceBranch":"main","suggestedTargetBranches":["8.15"],"targetPullRequestStates":[{"branch":"8.15","label":"v8.15.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"},{"branch":"main","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/187901","number":187901,"mergeCommit":{"message":"[Data
Forge] Add artificial delay feature (#187901)\n\n## Summary\r\n\r\nThis
PR adds a new setting, `indexing.artificialIndexDelay`, to
the\r\nindexing configuration to control how much artificial delay to
add to\r\nthe timestamps. This PR also adds a \"final\" ingest pipeline
to each data\r\nsource along with injecting a new base
`component_template` which\r\nincludes the `event.ingested`
field.\r\n\r\nThe artificial delay is useful for testing transforms on
data that has a\r\nsignificant delays. It also allows us to test if we
miss data when\r\nsyncing on the transforms using
`event.ingested`.\r\n\r\n- Installs default ingest pipeline to add
event.ingested to each\r\ndocument\r\n- Adds final_pipeline to each
install_index_template\r\n- Inject base component_template to each
index_template at install time\r\n- Add artificial delay for \"current\"
events, historical events are\r\ningested without delay.\r\n- Change
index math to produce monthly indices\r\n\r\n### How to
test:\r\n\r\nCopy the following to
`fake_logs.delayed.yaml`:\r\n\r\n```YAML\r\n---\r\nelasticsearch:\r\n
installKibanaUser: false\r\n\r\nkibana:\r\n installAssets: true\r\n
host: \"http://localhost:5601/kibana\"\r\n\r\nindexing:\r\n dataset:
\"fake_logs\"\r\n eventsPerCycle: 100\r\n artificialIndexDelay:
300000\r\n\r\nschedule:\r\n - template: \"good\"\r\n start:
\"now-1h\"\r\n end: false\r\n eventsPerCycle: 100\r\n```\r\nThen run
`node x-pack/scripts/data_forge.js --config\r\nfake_logs.delayed.yaml`.
This should index an hour of data immediately,\r\nthen add a 300s delay
when indexing in \"real time\". The logs will
look\r\nlike:\r\n\r\n```\r\n info Starting index to
http://localhost:9200 with a payload size of 10000 using 5 workers to
index 100 events per cycle\r\n info Installing index templates
(fake_logs)\r\n info Installing components for fake_logs
(fake_logs_8.0.0_base,fake_logs_8.0.0_event,fake_logs_8.0.0_log,fake_logs_8.0.0_host,fake_logs_8.0.0_metricset)\r\n
info Installing index template (fake_logs)\r\n info Indexing \"good\"
events from 2024-07-09T16:23:36.803Z to indefinitely\r\n info Delaying
100 by 300000ms\r\n info Waiting 60000ms\r\n info { took: 2418721239,
latency: 541, indexed: 6000 } Indexing 6000
documents.\r\n...\r\n```\r\nThen after `300s`, it will index another
`100` documents every `60s`.\r\nYou can also inspect the delay per
minute using the following ES|QL in\r\nDiscover:\r\n```\r\nFROM
kbn-data-forge-fake_logs.fake_logs-* | eval diff=DATE_DIFF(\"seconds\",
@timestamp, event.ingested) | STATS delay=AVG(diff) by
timestamp=BUCKET(@timestamp, 1 minute)\r\n```\r\nThis should give you a
chart that looks something like this:\r\n\r\n<img width=\"1413\"
alt=\"image\"\r\nsrc=\"2f48cb85-a410-487e-8f3b-41311ff95186\">\r\n\r\n\r\nThere
should also be a 5 minute gap at the end in Discover:\r\n\r\n<img
width=\"1413\"
alt=\"image\"\r\nsrc=\"660acc87-6958-4ce9-a544-d66d56f805dd\">\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"2fac5e8462beb1da5223b7097dab2cfd9011e035"}}]}]
BACKPORT-->

Co-authored-by: Chris Cowan <chris@elastic.co>
This commit is contained in:
Kibana Machine 2024-07-15 20:44:31 +02:00 committed by GitHub
parent 1fadaaf8eb
commit 4a9ca6f8d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 332 additions and 17 deletions

View file

@ -20,6 +20,18 @@
level: custom
type: float
description: "Percentage of CPU usage by system processes"
- name: load.1
level: custom
type: float
description: "Load 1m by system processes"
- name: memory.actual.used.pct
level: custom
type: float
description: "Percentage of actual memory by system processes"
- name: filesystem.used.pct
level: custom
type: float
description: "Percentage of filesytem used by system processes"
- name: network.name
type: keyword
level: custom

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -398,6 +398,21 @@
type: float
description: Percentage of CPU usage by user processes
default_field: false
- name: filesystem.used.pct
level: custom
type: float
description: Percentage of filesytem used by system processes
default_field: false
- name: load.1
level: custom
type: float
description: Load 1m by system processes
default_field: false
- name: memory.actual.used.pct
level: custom
type: float
description: Percentage of actual memory by system processes
default_field: false
- name: network.in.bytes
level: custom
type: long

View file

@ -46,6 +46,9 @@ ECS_Version,Indexed,Field_Set,Field,Type,Level,Normalization,Example,Description
8.0.0,true,system,system.cpu.system.pct,float,custom,,,Percentage of CPU usage by system processes
8.0.0,true,system,system.cpu.total.norm.pct,float,custom,,,Percentage of CPU usage
8.0.0,true,system,system.cpu.user.pct,float,custom,,,Percentage of CPU usage by user processes
8.0.0,true,system,system.filesystem.used.pct,float,custom,,,Percentage of filesytem used by system processes
8.0.0,true,system,system.load.1,float,custom,,,Load 1m by system processes
8.0.0,true,system,system.memory.actual.used.pct,float,custom,,,Percentage of actual memory by system processes
8.0.0,true,system,system.network.in.bytes,long,custom,,,Number of incoming bytes
8.0.0,true,system,system.network.name,keyword,custom,,,Name of the network interface
8.0.0,true,system,system.network.out.bytes,long,custom,,,Number of outgoing bytes

1 ECS_Version Indexed Field_Set Field Type Level Normalization Example Description
46 8.0.0 true system system.cpu.system.pct float custom Percentage of CPU usage by system processes
47 8.0.0 true system system.cpu.total.norm.pct float custom Percentage of CPU usage
48 8.0.0 true system system.cpu.user.pct float custom Percentage of CPU usage by user processes
49 8.0.0 true system system.filesystem.used.pct float custom Percentage of filesytem used by system processes
50 8.0.0 true system system.load.1 float custom Load 1m by system processes
51 8.0.0 true system system.memory.actual.used.pct float custom Percentage of actual memory by system processes
52 8.0.0 true system system.network.in.bytes long custom Number of incoming bytes
53 8.0.0 true system system.network.name keyword custom Name of the network interface
54 8.0.0 true system system.network.out.bytes long custom Number of outgoing bytes

View file

@ -554,6 +554,33 @@ system.cpu.user.pct:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -638,6 +638,33 @@ system:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -554,6 +554,33 @@ system.cpu.user.pct:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -638,6 +638,33 @@ system:
normalize: []
short: Percentage of CPU usage by user processes
type: float
system.filesystem.used.pct:
dashed_name: system-filesystem-used-pct
description: Percentage of filesytem used by system processes
flat_name: system.filesystem.used.pct
level: custom
name: filesystem.used.pct
normalize: []
short: Percentage of filesytem used by system processes
type: float
system.load.1:
dashed_name: system-load-1
description: Load 1m by system processes
flat_name: system.load.1
level: custom
name: load.1
normalize: []
short: Load 1m by system processes
type: float
system.memory.actual.used.pct:
dashed_name: system-memory-actual-used-pct
description: Percentage of actual memory by system processes
flat_name: system.memory.actual.used.pct
level: custom
name: memory.actual.used.pct
normalize: []
short: Percentage of actual memory by system processes
type: float
system.network.in.bytes:
dashed_name: system-network-in-bytes
description: Number of incoming bytes

View file

@ -39,6 +39,39 @@
}
}
},
"filesystem": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
},
"load": {
"properties": {
"1": {
"type": "float"
}
}
},
"memory": {
"properties": {
"actual": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
}
}
},
"network": {
"properties": {
"in": {

View file

@ -41,6 +41,7 @@
"settings": {
"index": {
"codec": "best_compression",
"final_pipeline": "kbn-data-forge-add-event-ingested",
"mapping": {
"total_fields": {
"limit": 2000

View file

@ -264,6 +264,39 @@
}
}
},
"filesystem": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
},
"load": {
"properties": {
"1": {
"type": "float"
}
}
},
"memory": {
"properties": {
"actual": {
"properties": {
"used": {
"properties": {
"pct": {
"type": "float"
}
}
}
}
}
}
},
"network": {
"properties": {
"in": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -40,6 +40,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -43,6 +43,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -38,6 +38,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -39,6 +39,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -39,6 +39,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -4,6 +4,7 @@
"template": {
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec" : "best_compression",
"mapping": {
"total_fields": {

View file

@ -40,6 +40,7 @@
},
"settings": {
"index": {
"final_pipeline": "kbn-data-forge-add-event-ingested",
"codec": "best_compression",
"mapping": {
"total_fields": {

View file

@ -63,6 +63,7 @@ export function createConfig(partialConfig: PartialConfig = {}) {
reduceWeekendTrafficBy: DEFAULTS.REDUCE_WEEKEND_TRAFFIC_BY,
ephemeralProjectIds: DEFAULTS.EPHEMERAL_PROJECT_IDS,
alignEventsToInterval: DEFAULTS.ALIGN_EVENTS_TO_INTERVAL,
artificialIndexDelay: 0,
...(partialConfig.indexing ?? {}),
},
schedule: partialConfig.schedule ?? [schedule],

View file

@ -10,7 +10,13 @@ import moment from 'moment';
import { isNumber, random, range } from 'lodash';
import { ToolingLog } from '@kbn/tooling-log';
import { Client } from '@elastic/elasticsearch';
import { Config, EventsPerCycle, EventsPerCycleTransitionDefRT, ParsedSchedule } from '../types';
import {
Config,
Doc,
EventsPerCycle,
EventsPerCycleTransitionDefRT,
ParsedSchedule,
} from '../types';
import { generateEvents } from '../data_sources';
import { createQueue } from './queue';
import { wait } from './wait';
@ -69,6 +75,7 @@ export async function createEvents(
const interval = schedule.interval ?? config.indexing.interval;
const calculateEventsPerCycle = createEventsPerCycleFn(schedule, eventsPerCycle, logger);
const totalEvents = calculateEventsPerCycle(currentTimestamp);
const endTs = end === false ? moment() : end;
if (totalEvents > 0) {
let epc = schedule.randomness
@ -86,34 +93,34 @@ export async function createEvents(
// When --align-events-to-interval is set, we will index all the events on the same
// timestamp. Otherwise they will be distributed across the interval randomly.
let events: Doc[];
const eventTimestamp = currentTimestamp
.clone()
.subtract(config.indexing.artificialIndexDelay + interval);
if (config.indexing.alignEventsToInterval) {
range(epc)
events = range(epc)
.map((i) => {
const generateEvent = generateEvents[config.indexing.dataset] || generateEvents.fake_logs;
return generateEvent(config, schedule, i, currentTimestamp);
return generateEvent(config, schedule, i, eventTimestamp);
})
.flat()
.forEach((event) => queue.push(event));
.flat();
} else {
range(epc)
events = range(epc)
.map(() =>
moment(random(currentTimestamp.valueOf(), currentTimestamp.valueOf() + interval - 1))
moment(random(eventTimestamp.valueOf(), eventTimestamp.valueOf() + interval - 1))
)
.sort()
.map((ts, i) => {
const generateEvent = generateEvents[config.indexing.dataset] || generateEvents.fake_logs;
return generateEvent(config, schedule, i, ts);
})
.flat()
.forEach((event) => queue.push(event));
.flat();
}
await queue.drain();
await queue.push(events);
} else {
logger.info({ took: 0, latency: 0, indexed: 0 }, 'Indexing 0 documents.');
}
const endTs = end === false ? moment() : end;
if (currentTimestamp.isBefore(endTs)) {
return createEvents(
config,

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import { Config } from '../types';
const eventIngestedCommonComponentTemplate = {
_meta: {
documentation: 'https://www.elastic.co/guide/en/ecs/current/ecs-event.html',
ecs_version: '8.0.0',
},
template: {
mappings: {
properties: {
event: {
properties: {
ingested: {
type: 'date',
},
},
},
},
},
},
};
export async function installDefaultComponentTemplate(
_config: Config,
client: Client,
logger: ToolingLog
) {
logger.info('Installing base component template: kbn-data-forge_base');
await client.cluster.putComponentTemplate({
name: `kbn-data-forge_base`,
...eventIngestedCommonComponentTemplate,
});
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import { Config } from '../types';
const processors = [
{
set: {
field: 'event.ingested',
value: '{{{_ingest.timestamp}}}',
},
},
];
export async function installDefaultIngestPipeline(
_config: Config,
client: Client,
logger: ToolingLog
) {
logger.info('Installing default ingest pipeline: kbn-data-forge-add-event-ingested');
return client.ingest.putPipeline({
id: 'kbn-data-forge-add-event-ingested',
processors,
version: 1,
});
}

View file

@ -7,6 +7,7 @@
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import { isArray } from 'lodash';
import { indexTemplates } from '../data_sources';
import { Config } from '../types';
@ -26,9 +27,14 @@ export async function installIndexTemplate(
await client.cluster.putComponentTemplate({ name: component.name, ...component.template });
}
logger.info(`Installing index template (${indexTemplateDef.namespace})`);
// Clone the template and add the base component name
const template = { ...indexTemplateDef.template };
if (isArray(template.composed_of)) {
template.composed_of.push('kbn-data-forge_base');
}
await client.indices.putIndexTemplate({
name: indexTemplateDef.namespace,
body: indexTemplateDef.template,
body: template,
});
}
}

View file

@ -26,7 +26,7 @@ export const createQueue = (config: Config, client: Client, logger: ToolingLog):
docs.forEach((doc) => {
const namespace = `${config.indexing.dataset}.${doc.namespace}`;
const indexName = `${INDEX_PREFIX}-${namespace}-${moment(doc['@timestamp']).format(
'YYYY-MM-DD'
'YYYY-MM-01'
)}`;
indices.add(indexName);
body.push({ create: { _index: indexName } });

View file

@ -13,8 +13,12 @@ import { installAssets } from './lib/install_assets';
import { indexSchedule } from './lib/index_schedule';
import { installIndexTemplate } from './lib/install_index_template';
import { indices } from './lib/indices';
import { installDefaultIngestPipeline } from './lib/install_default_ingest_pipeline';
import { installDefaultComponentTemplate } from './lib/install_default_component_template';
export async function run(config: Config, client: Client, logger: ToolingLog) {
await installDefaultComponentTemplate(config, client, logger);
await installDefaultIngestPipeline(config, client, logger);
await installIndexTemplate(config, client, logger);
if (config.elasticsearch.installKibanaUser) {
await setupKibanaSystemUser(config, client, logger);
@ -23,6 +27,6 @@ export async function run(config: Config, client: Client, logger: ToolingLog) {
await indexSchedule(config, client, logger);
const indicesCreated = [...indices];
indices.clear();
await client.indices.refresh({ index: indicesCreated });
await client.indices.refresh({ index: indicesCreated, ignore_unavailable: true });
return indicesCreated;
}

View file

@ -120,6 +120,7 @@ export const ConfigRT = rt.type({
reduceWeekendTrafficBy: rt.number,
ephemeralProjectIds: rt.number,
alignEventsToInterval: rt.boolean,
artificialIndexDelay: rt.number,
}),
schedule: rt.array(ScheduleRT),
});

View file

@ -69,8 +69,12 @@ export async function waitForDocumentInIndex<T>({
}): Promise<SearchResponse<T, Record<string, AggregationsAggregate>>> {
return await retry<SearchResponse<T, Record<string, AggregationsAggregate>>>({
test: async () => {
const response = await esClient.search<T>({ index: indexName, rest_total_hits_as_int: true });
if (!response.hits.total || response.hits.total < docCountTarget) {
const response = await esClient.search<T>({
index: indexName,
rest_total_hits_as_int: true,
ignore_unavailable: true,
});
if (!response.hits.total || (response.hits.total as number) < docCountTarget) {
throw new Error(
`Number of hits does not match expectation (total: ${response.hits.total}, target: ${docCountTarget})`
);