mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Data-Forge] Optimizing the event generation code (#218688)
## Summary This PR refactors the `createEvents` function to improve the readability the execution along with optimizations. Part of the refactor is to remove the recursiveness to reduce the chances of "blowing the stack". I also added a back-off feature to the queue so when it's saturated the process will wait progressively longer until things improve. I also changed the how the continuous "wait" works. If it falls too far behind, it will index as fast as it can to catch up and vise-versa it will wait longer if it get's too far ahead of the current time. To test the back-off, run the following command. If you keep increasing the events-per-cycle, eventually it will saturate the queue and start backing off. ``` node x-pack/scripts/data_forge.js --dataset fake_hosts --lookback now-5m --events-per-cycle 10000 --index-interval 10000 ``` This also includes an optimizations for the `fake_hosts` dataset. Before this change, `randomBetween` would create arrays with all the possible numbers then randomly sample them 🤮 (lazy programming). The new implementation just calculates the random value between the min and max, mods that number by the `step` argument, then subtracts the remainder to bring the number down to the closes step. For example if you do `randomBetween(0, 10, 5)` you will get either `0`, `5`, or `10`. I also refactored how the network values so the counters for `system.network.[in|out].bytes` match the `host.network.[ingress|egress].bytes` gauges so if you where to compare them on a Lens visualization, the values would match (as they should). And last but not least... I added a new option `--slash-logs` that will index this data directly to `logs` without installing index templates, pipelines, or components. This is useful for testing the new Streams feature. Use [the commands from this Gist](https://gist.github.com/simianhacker/4116a6def82f1270722fef1366e3d422) to enable and setup streams, then test with: ``` node x-pack/scripts/data_forge.js --dataset fake_stack --lookback now-5m --events-per-cycle 10 --index-interval 10000 --slash-logs ```
This commit is contained in:
parent
678b53a1a7
commit
54ab27581b
22 changed files with 332 additions and 178 deletions
|
@ -37,4 +37,5 @@ export const DEFAULTS = {
|
|||
EPHEMERAL_PROJECT_IDS: 0,
|
||||
ALIGN_EVENTS_TO_INTERVAL: true,
|
||||
CARDINALITY: 1,
|
||||
SLASH_LOGS: false,
|
||||
};
|
||||
|
|
|
@ -5,8 +5,6 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { faker } from '@faker-js/faker';
|
||||
import { sample, range, memoize } from 'lodash';
|
||||
import { GeneratorFunction } from '../../types';
|
||||
import { replaceMetricsWithShapes } from '../../lib/replace_metrics_with_shapes';
|
||||
|
||||
|
@ -14,18 +12,30 @@ export { indexTemplate } from './ecs';
|
|||
|
||||
const createGroupIndex = (index: number) => Math.floor(index / 1000) * 1000;
|
||||
|
||||
const randomBetween = (start = 0, end = 1, step = 0.1) => sample(range(start, end, step));
|
||||
const randomBetween = (min = 0, max = 1, step = 1) => {
|
||||
const value = Math.random() * (max - min + 1) + min;
|
||||
return value - (value % step);
|
||||
};
|
||||
|
||||
let networkDataCount = 0;
|
||||
const generateNetworkData = memoize((_timestamp: string) => {
|
||||
networkDataCount += Math.floor(10000 * Math.random());
|
||||
return networkDataCount;
|
||||
});
|
||||
const networkDataCount: Record<string, number> = {};
|
||||
const generateNetworkData = (host: string, name: string, value: number) => {
|
||||
const key = `${host}:${name}`;
|
||||
if (networkDataCount[key] == null) {
|
||||
networkDataCount[key] = 0;
|
||||
}
|
||||
if (networkDataCount[key] + value > Number.MAX_SAFE_INTEGER) {
|
||||
networkDataCount[key] = 0;
|
||||
}
|
||||
networkDataCount[key] += value;
|
||||
return networkDataCount[key];
|
||||
};
|
||||
|
||||
export const generateEvent: GeneratorFunction = (config, schedule, index, timestamp) => {
|
||||
const groupIndex = createGroupIndex(index);
|
||||
const interval = schedule.interval ?? config.indexing.interval;
|
||||
const scenario = config.indexing.scenario || 'fake_hosts';
|
||||
const rxBytes = randomBetween(100, 1000, 1);
|
||||
const txBytes = randomBetween(100, 1000, 1);
|
||||
const docs = [
|
||||
{
|
||||
namespace: 'fake_hosts',
|
||||
|
@ -52,29 +62,29 @@ export const generateEvent: GeneratorFunction = (config, schedule, index, timest
|
|||
cores: 4,
|
||||
total: {
|
||||
norm: {
|
||||
pct: randomBetween(),
|
||||
pct: randomBetween(0, 1, 0.01),
|
||||
},
|
||||
},
|
||||
user: {
|
||||
pct: randomBetween(1, 4),
|
||||
pct: randomBetween(1, 4, 0.01),
|
||||
},
|
||||
system: {
|
||||
pct: randomBetween(1, 4),
|
||||
pct: randomBetween(1, 4, 0.01),
|
||||
},
|
||||
},
|
||||
load: {
|
||||
1: randomBetween(1, 4),
|
||||
1: randomBetween(1, 4, 0.01),
|
||||
},
|
||||
memory: {
|
||||
actual: {
|
||||
used: {
|
||||
pct: randomBetween(1, 4),
|
||||
pct: randomBetween(1, 4, 0.01),
|
||||
},
|
||||
},
|
||||
},
|
||||
filesystem: {
|
||||
used: {
|
||||
pct: randomBetween(1, 4),
|
||||
pct: randomBetween(1, 4, 0.01),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -95,10 +105,10 @@ export const generateEvent: GeneratorFunction = (config, schedule, index, timest
|
|||
network: {
|
||||
name: `network-${index}`,
|
||||
ingress: {
|
||||
bytes: parseInt(faker.string.numeric(3), 10),
|
||||
bytes: rxBytes,
|
||||
},
|
||||
egress: {
|
||||
bytes: parseInt(faker.string.numeric(3), 10),
|
||||
bytes: txBytes,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -115,15 +125,15 @@ export const generateEvent: GeneratorFunction = (config, schedule, index, timest
|
|||
network: {
|
||||
name: 'eth0',
|
||||
in: {
|
||||
bytes: generateNetworkData(timestamp.toISOString()),
|
||||
bytes: generateNetworkData(`host-${index}`, 'eth0-rx', rxBytes),
|
||||
},
|
||||
out: {
|
||||
bytes: generateNetworkData(timestamp.toISOString()),
|
||||
bytes: generateNetworkData(`host-${index}`, 'eth0-tx', txBytes),
|
||||
},
|
||||
},
|
||||
core: {
|
||||
system: {
|
||||
ticks: randomBetween(1_000_000, 1_500_100),
|
||||
ticks: randomBetween(1_000_000, 1_500_100, 1),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -143,6 +153,12 @@ export const generateEvent: GeneratorFunction = (config, schedule, index, timest
|
|||
mac: ['00-00-5E-00-53-23', '00-00-5E-00-53-24'],
|
||||
network: {
|
||||
name: `network-${index}`,
|
||||
ingress: {
|
||||
bytes: rxBytes,
|
||||
},
|
||||
egress: {
|
||||
bytes: txBytes,
|
||||
},
|
||||
},
|
||||
},
|
||||
event: {
|
||||
|
@ -158,15 +174,15 @@ export const generateEvent: GeneratorFunction = (config, schedule, index, timest
|
|||
network: {
|
||||
name: 'eth1',
|
||||
in: {
|
||||
bytes: generateNetworkData(timestamp.toISOString()),
|
||||
bytes: generateNetworkData(`host-${index}`, 'eth1-rx', rxBytes),
|
||||
},
|
||||
out: {
|
||||
bytes: generateNetworkData(timestamp.toISOString()),
|
||||
bytes: generateNetworkData(`host-${index}`, 'eth1-tx', txBytes),
|
||||
},
|
||||
},
|
||||
core: {
|
||||
system: {
|
||||
ticks: randomBetween(1_000_000, 1_500_100),
|
||||
ticks: randomBetween(1_000_000, 1_500_100, 1),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -48,6 +48,7 @@ export function cliOptionsToPartialConfig(options: CliOptions) {
|
|||
reduceWeekendTrafficBy: options.reduceWeekendTrafficBy,
|
||||
ephemeralProjectIds: options.ephemeralProjectIds,
|
||||
alignEventsToInterval: options.alignEventsToInterval === true,
|
||||
slashLogs: options.slashLogs,
|
||||
},
|
||||
schedule: [schedule],
|
||||
};
|
||||
|
|
|
@ -64,6 +64,7 @@ export function createConfig(partialConfig: PartialConfig = {}) {
|
|||
ephemeralProjectIds: DEFAULTS.EPHEMERAL_PROJECT_IDS,
|
||||
alignEventsToInterval: DEFAULTS.ALIGN_EVENTS_TO_INTERVAL,
|
||||
artificialIndexDelay: 0,
|
||||
slashLogs: DEFAULTS.SLASH_LOGS,
|
||||
...(partialConfig.indexing ?? {}),
|
||||
},
|
||||
schedule: partialConfig.schedule ?? [schedule],
|
||||
|
|
|
@ -1,149 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { Moment } from 'moment';
|
||||
import moment from 'moment';
|
||||
import { isNumber, random, range } from 'lodash';
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
import { Client } from '@elastic/elasticsearch';
|
||||
import {
|
||||
Config,
|
||||
Doc,
|
||||
EventsPerCycle,
|
||||
EventsPerCycleTransitionDefRT,
|
||||
ParsedSchedule,
|
||||
} from '../types';
|
||||
import { generateEvents } from '../data_sources';
|
||||
import { createQueue } from './queue';
|
||||
import { wait } from './wait';
|
||||
import { isWeekendTraffic } from './is_weekend';
|
||||
import { createExponentialFunction, createLinearFunction, createSineFunction } from './data_shapes';
|
||||
|
||||
function createEventsPerCycleFn(
|
||||
schedule: ParsedSchedule,
|
||||
eventsPerCycle: EventsPerCycle,
|
||||
logger: ToolingLog
|
||||
): (timestamp: Moment) => number {
|
||||
if (EventsPerCycleTransitionDefRT.is(eventsPerCycle) && isNumber(schedule.end)) {
|
||||
const startPoint = { x: schedule.start, y: eventsPerCycle.start };
|
||||
const endPoint = { x: schedule.end, y: eventsPerCycle.end };
|
||||
if (eventsPerCycle.method === 'exp') {
|
||||
return createExponentialFunction(startPoint, endPoint);
|
||||
}
|
||||
if (eventsPerCycle.method === 'sine') {
|
||||
return createSineFunction(startPoint, endPoint, eventsPerCycle.options?.period ?? 60);
|
||||
}
|
||||
return createLinearFunction(startPoint, endPoint);
|
||||
} else if (EventsPerCycleTransitionDefRT.is(eventsPerCycle) && schedule.end === false) {
|
||||
logger.warning('EventsPerCycle must be a number if the end value of schedule is false.');
|
||||
}
|
||||
|
||||
return (_timestamp: Moment) =>
|
||||
EventsPerCycleTransitionDefRT.is(eventsPerCycle) ? eventsPerCycle.end : eventsPerCycle;
|
||||
}
|
||||
|
||||
export async function createEvents(
|
||||
config: Config,
|
||||
client: Client,
|
||||
schedule: ParsedSchedule,
|
||||
end: Moment | false,
|
||||
currentTimestamp: Moment,
|
||||
logger: ToolingLog,
|
||||
continueIndexing = false
|
||||
): Promise<void> {
|
||||
const queue = createQueue(config, client, logger);
|
||||
|
||||
if (
|
||||
!queue.paused &&
|
||||
schedule.delayInMinutes &&
|
||||
schedule.delayEveryMinutes &&
|
||||
currentTimestamp.minute() % schedule.delayEveryMinutes === 0
|
||||
) {
|
||||
logger.info('Pausing queue');
|
||||
queue.pause();
|
||||
setTimeout(() => {
|
||||
logger.info('Resuming queue');
|
||||
queue.resume();
|
||||
}, schedule.delayInMinutes * 60 * 1000);
|
||||
}
|
||||
|
||||
const eventsPerCycle = schedule.eventsPerCycle ?? config.indexing.eventsPerCycle;
|
||||
const interval = schedule.interval ?? config.indexing.interval;
|
||||
const calculateEventsPerCycle = createEventsPerCycleFn(schedule, eventsPerCycle, logger);
|
||||
const totalEvents = calculateEventsPerCycle(currentTimestamp);
|
||||
const endTs = end === false ? moment() : end;
|
||||
|
||||
if (totalEvents > 0) {
|
||||
let epc = schedule.randomness
|
||||
? random(
|
||||
Math.round(totalEvents - totalEvents * schedule.randomness),
|
||||
Math.round(totalEvents + totalEvents * schedule.randomness)
|
||||
)
|
||||
: totalEvents;
|
||||
if (config.indexing.reduceWeekendTrafficBy && isWeekendTraffic(currentTimestamp)) {
|
||||
logger.info(
|
||||
`Reducing traffic from ${epc} to ${epc * (1 - config.indexing.reduceWeekendTrafficBy)}`
|
||||
);
|
||||
epc = epc * (1 - config.indexing.reduceWeekendTrafficBy);
|
||||
}
|
||||
|
||||
// When --align-events-to-interval is set, we will index all the events on the same
|
||||
// timestamp. Otherwise they will be distributed across the interval randomly.
|
||||
let events: Doc[];
|
||||
const eventTimestamp = currentTimestamp
|
||||
.clone()
|
||||
.subtract(config.indexing.artificialIndexDelay + interval);
|
||||
if (config.indexing.alignEventsToInterval) {
|
||||
events = range(epc)
|
||||
.map((i) => {
|
||||
const generateEvent = generateEvents[config.indexing.dataset] || generateEvents.fake_logs;
|
||||
return generateEvent(config, schedule, i, eventTimestamp);
|
||||
})
|
||||
.flat();
|
||||
} else {
|
||||
events = range(epc)
|
||||
.map(() =>
|
||||
moment(random(eventTimestamp.valueOf(), eventTimestamp.valueOf() + interval - 1))
|
||||
)
|
||||
.sort()
|
||||
.map((ts, i) => {
|
||||
const generateEvent = generateEvents[config.indexing.dataset] || generateEvents.fake_logs;
|
||||
return generateEvent(config, schedule, i, ts);
|
||||
})
|
||||
.flat();
|
||||
}
|
||||
await queue.push(events);
|
||||
await queue.drain();
|
||||
} else {
|
||||
logger.info({ took: 0, latency: 0, indexed: 0 }, 'Indexing 0 documents.');
|
||||
}
|
||||
|
||||
if (currentTimestamp.isBefore(endTs)) {
|
||||
return createEvents(
|
||||
config,
|
||||
client,
|
||||
schedule,
|
||||
end,
|
||||
currentTimestamp.add(interval, 'ms'),
|
||||
logger,
|
||||
continueIndexing
|
||||
);
|
||||
}
|
||||
if (currentTimestamp.isSameOrAfter(endTs) && continueIndexing) {
|
||||
await wait(interval, logger);
|
||||
return createEvents(
|
||||
config,
|
||||
client,
|
||||
schedule,
|
||||
end,
|
||||
currentTimestamp.add(interval, 'ms'),
|
||||
logger,
|
||||
continueIndexing
|
||||
);
|
||||
}
|
||||
logger.info(`Indexing complete for ${schedule.template} events.`);
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import moment from 'moment';
|
||||
import { random } from 'lodash';
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
import { Config, ParsedSchedule } from '../../../types';
|
||||
import { isWeekendTraffic } from '../../is_weekend';
|
||||
|
||||
export function computeEventsPerCycle(
|
||||
config: Config,
|
||||
schedule: ParsedSchedule,
|
||||
totalEvents: number,
|
||||
now: number,
|
||||
logger: ToolingLog
|
||||
) {
|
||||
const numberOfEvents = schedule.randomness
|
||||
? random(
|
||||
Math.round(totalEvents - totalEvents * schedule.randomness),
|
||||
Math.round(totalEvents + totalEvents * schedule.randomness)
|
||||
)
|
||||
: totalEvents;
|
||||
|
||||
if (config.indexing.reduceWeekendTrafficBy && isWeekendTraffic(moment(now))) {
|
||||
logger.info(
|
||||
`Reducing traffic from ${numberOfEvents} to ${
|
||||
numberOfEvents * (1 - config.indexing.reduceWeekendTrafficBy)
|
||||
}`
|
||||
);
|
||||
return numberOfEvents * (1 - config.indexing.reduceWeekendTrafficBy);
|
||||
}
|
||||
return numberOfEvents;
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { isNumber } from 'lodash';
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
import { Config, EventsPerCycleTransitionDefRT, ParsedSchedule } from '../../../types';
|
||||
import {
|
||||
createExponentialFunction,
|
||||
createLinearFunction,
|
||||
createSineFunction,
|
||||
} from '../../data_shapes';
|
||||
|
||||
export function computeTotalEvents(
|
||||
config: Config,
|
||||
schedule: ParsedSchedule,
|
||||
logger: ToolingLog,
|
||||
startTimestamp: number
|
||||
): number {
|
||||
const eventsPerCycle = schedule.eventsPerCycle ?? config.indexing.eventsPerCycle;
|
||||
if (EventsPerCycleTransitionDefRT.is(eventsPerCycle) && isNumber(schedule.end)) {
|
||||
const startPoint = { x: schedule.start, y: eventsPerCycle.start };
|
||||
const endPoint = { x: schedule.end, y: eventsPerCycle.end };
|
||||
if (eventsPerCycle.method === 'exp') {
|
||||
return createExponentialFunction(startPoint, endPoint)(startTimestamp);
|
||||
}
|
||||
if (eventsPerCycle.method === 'sine') {
|
||||
return createSineFunction(
|
||||
startPoint,
|
||||
endPoint,
|
||||
eventsPerCycle.options?.period ?? 60
|
||||
)(startTimestamp);
|
||||
}
|
||||
return createLinearFunction(startPoint, endPoint)(startTimestamp);
|
||||
} else if (EventsPerCycleTransitionDefRT.is(eventsPerCycle) && schedule.end === false) {
|
||||
logger.warning('EventsPerCycle must be a number if the end value of schedule is false.');
|
||||
}
|
||||
|
||||
return EventsPerCycleTransitionDefRT.is(eventsPerCycle) ? eventsPerCycle.end : eventsPerCycle;
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import moment from 'moment';
|
||||
import { QueueObject } from 'async';
|
||||
import { Config, Doc, ParsedSchedule } from '../../../types';
|
||||
import { generateEvents } from '../../../data_sources';
|
||||
|
||||
export function generateAndQueueEvents(
|
||||
config: Config,
|
||||
schedule: ParsedSchedule,
|
||||
queue: QueueObject<Doc>,
|
||||
timestamps: number[]
|
||||
) {
|
||||
const generateEvent = generateEvents[config.indexing.dataset] || generateEvents.fake_logs;
|
||||
for (let i = 0; i < timestamps.length; i++) {
|
||||
queue.push(generateEvent(config, schedule, i, moment(timestamps[i])));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { random } from 'lodash';
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
import { Config, ParsedSchedule } from '../../../types';
|
||||
import { computeEventsPerCycle } from './compute_events_per_cycle';
|
||||
import { getInterval } from './get_interval';
|
||||
|
||||
export function generateTimestamps(
|
||||
config: Config,
|
||||
schedule: ParsedSchedule,
|
||||
logger: ToolingLog,
|
||||
totalEvents: number,
|
||||
now: number
|
||||
) {
|
||||
const interval = getInterval(config, schedule);
|
||||
const epc = computeEventsPerCycle(config, schedule, totalEvents, now, logger);
|
||||
const timestamps = [];
|
||||
for (let i = 0; i < epc; i++) {
|
||||
const eventTimestamp = now - (config.indexing.artificialIndexDelay + interval);
|
||||
if (config.indexing.alignEventsToInterval) {
|
||||
timestamps.push(eventTimestamp);
|
||||
} else {
|
||||
timestamps.push(random(eventTimestamp, eventTimestamp + interval - 1));
|
||||
}
|
||||
}
|
||||
return timestamps;
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { Config, ParsedSchedule } from '../../../types';
|
||||
|
||||
export function getInterval(config: Config, schedule: ParsedSchedule) {
|
||||
return schedule.interval ?? config.indexing.interval;
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { QueueObject } from 'async';
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
import { Doc, ParsedSchedule } from '../../../types';
|
||||
|
||||
export function tryPausingQueue(
|
||||
queue: QueueObject<Doc>,
|
||||
logger: ToolingLog,
|
||||
schedule: ParsedSchedule,
|
||||
unixTs: number
|
||||
) {
|
||||
const timestamp = new Date(unixTs);
|
||||
if (
|
||||
!queue.paused &&
|
||||
schedule.delayInMinutes &&
|
||||
schedule.delayEveryMinutes &&
|
||||
timestamp.getMinutes() % schedule.delayEveryMinutes === 0
|
||||
) {
|
||||
const delayInMinutes = schedule.delayInMinutes || 0;
|
||||
logger.info('Pausing queue');
|
||||
queue.pause();
|
||||
setTimeout(() => {
|
||||
logger.info('Resuming queue');
|
||||
queue.resume();
|
||||
}, delayInMinutes * 60 * 1000);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Moment } from 'moment';
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
import { Client } from '@elastic/elasticsearch';
|
||||
import { Config, ParsedSchedule } from '../../types';
|
||||
import { createQueue } from '../queue';
|
||||
import { wait } from '../wait';
|
||||
import { computeTotalEvents } from './helpers/compute_total_events';
|
||||
import { generateTimestamps } from './helpers/generate_timestamps';
|
||||
import { getInterval } from './helpers/get_interval';
|
||||
import { generateAndQueueEvents } from './helpers/generate_and_queue_events';
|
||||
import { tryPausingQueue } from './helpers/try_pausing_queue';
|
||||
|
||||
const INITIAL_BACK_OFF_INTERVAL = 1000;
|
||||
const MAX_BACK_OFF_INTERVAL = 60_000;
|
||||
|
||||
export async function createEvents(
|
||||
config: Config,
|
||||
client: Client,
|
||||
schedule: ParsedSchedule,
|
||||
endTimestamp: Moment | false,
|
||||
startTimestamp: Moment,
|
||||
logger: ToolingLog,
|
||||
continueIndexing = false
|
||||
): Promise<void> {
|
||||
let canPush = true;
|
||||
let backOff = INITIAL_BACK_OFF_INTERVAL;
|
||||
const queue = createQueue(config, client, logger);
|
||||
queue.saturated(() => {
|
||||
canPush = false;
|
||||
});
|
||||
queue.unsaturated(() => {
|
||||
canPush = true;
|
||||
backOff = INITIAL_BACK_OFF_INTERVAL;
|
||||
});
|
||||
|
||||
const interval = getInterval(config, schedule);
|
||||
const endTs = endTimestamp === false ? Date.now() : endTimestamp.valueOf();
|
||||
let now = startTimestamp.valueOf();
|
||||
|
||||
while (true) {
|
||||
// This is a non-blocking pause to the queue. We will still generate events
|
||||
// but this will simulate a network disruption simular to what we see with
|
||||
// when Filebeat loses network connectivity
|
||||
tryPausingQueue(queue, logger, schedule, now);
|
||||
|
||||
const totalEvents = computeTotalEvents(config, schedule, logger, now);
|
||||
|
||||
// If we have events, pre-generate the timestamps and queue the events.
|
||||
if (totalEvents > 0 && canPush) {
|
||||
const timestamps = generateTimestamps(config, schedule, logger, totalEvents, now);
|
||||
generateAndQueueEvents(config, schedule, queue, timestamps);
|
||||
} else if (!canPush) {
|
||||
// If we the queue is saturated we need to backoff.
|
||||
logger.info(`Queue saturated, backing off for ${backOff}`);
|
||||
await wait(backOff);
|
||||
// Back-off longer the next time.
|
||||
backOff = Math.min(backOff + INITIAL_BACK_OFF_INTERVAL * 0.5, MAX_BACK_OFF_INTERVAL);
|
||||
continue;
|
||||
} else {
|
||||
logger.info({ took: 0, latency: 0, indexed: 0 }, 'Indexing 0 documents.');
|
||||
}
|
||||
|
||||
// Once we reach the end of the schedule, we need to check to see if we should
|
||||
// keep indexing OR completely stop.
|
||||
if (now >= endTs) {
|
||||
// Stop indexing?
|
||||
if (!continueIndexing) {
|
||||
break;
|
||||
}
|
||||
// Figure out how far OR behind the current indexing is and adjust
|
||||
// the wait for the next cycle to fire. This should keep the indexing
|
||||
// from falling behind. This will also catch up as fast as it can.
|
||||
const behindBy = Date.now() - now;
|
||||
await wait(Math.max(0, interval - behindBy));
|
||||
now += interval;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Advance timestamp and keep indexing
|
||||
now += interval;
|
||||
}
|
||||
logger.info(`Indexing complete for ${schedule.template} events.`);
|
||||
}
|
|
@ -12,7 +12,7 @@ export function createExponentialFunction(start: Point, end: Point) {
|
|||
const totalPoints = end.x - start.x;
|
||||
const ratio = end.y / start.y;
|
||||
const exponent = Math.log(ratio) / (totalPoints - 1);
|
||||
return (timestamp: Moment) => {
|
||||
return (timestamp: Moment | number) => {
|
||||
const x = timestamp.valueOf() - start.x;
|
||||
return start.y * Math.exp(exponent * x);
|
||||
};
|
||||
|
|
|
@ -11,7 +11,7 @@ import { Point } from '../../types';
|
|||
export function createLinearFunction(start: Point, end: Point) {
|
||||
const slope = (end.y - start.y) / (end.x - start.x);
|
||||
const intercept = start.y - slope * start.x;
|
||||
return (timestamp: Moment) => {
|
||||
return (timestamp: Moment | number) => {
|
||||
return slope * timestamp.valueOf() + intercept;
|
||||
};
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ export function createSineFunction(start: Point, end: Point, period = 60) {
|
|||
const midline = start.y;
|
||||
const amplitude = (end.y - start.y) / 2;
|
||||
const offset = caluclateOffset(amplitude, start.y);
|
||||
return (timestamp: Moment) => {
|
||||
return (timestamp: Moment | number) => {
|
||||
const x = (timestamp.valueOf() - start.x) / 1000;
|
||||
const y = midline + amplitude * Math.sin(((2 * Math.PI) / period) * x) + offset;
|
||||
return y;
|
||||
|
|
|
@ -30,10 +30,13 @@ const eventIngestedCommonComponentTemplate = {
|
|||
};
|
||||
|
||||
export async function installDefaultComponentTemplate(
|
||||
_config: Config,
|
||||
config: Config,
|
||||
client: Client,
|
||||
logger: ToolingLog
|
||||
) {
|
||||
if (config.indexing.slashLogs) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
logger.info('Installing base component template: kbn-data-forge_base');
|
||||
await client.cluster.putComponentTemplate({
|
||||
name: `kbn-data-forge@mappings`,
|
||||
|
|
|
@ -16,6 +16,9 @@ export async function installIndexTemplate(
|
|||
client: Client,
|
||||
logger: ToolingLog
|
||||
): Promise<void> {
|
||||
if (config.indexing.slashLogs) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
const { dataset } = config.indexing;
|
||||
const templates = indexTemplates[dataset];
|
||||
const templateNames = templates.map((templateDef) => templateDef.name).join(',');
|
||||
|
|
|
@ -94,6 +94,11 @@ export function parseCliOptions(): CliOptions {
|
|||
'The number of ephemeral projects to create. This is only enabled for the "fake_stack" dataset. It will create project IDs that will last 5 to 12 hours.',
|
||||
parseCliInt,
|
||||
DEFAULTS.EPHEMERAL_PROJECT_IDS
|
||||
)
|
||||
.option(
|
||||
'--slash-logs',
|
||||
'This will index everything through Streams slash logs endpoint',
|
||||
DEFAULTS.SLASH_LOGS
|
||||
);
|
||||
|
||||
program.parse(process.argv);
|
||||
|
|
|
@ -18,6 +18,9 @@ type CargoQueue = ReturnType<typeof cargoQueue<Doc, Error>>;
|
|||
let queue: CargoQueue;
|
||||
|
||||
function calculateIndexName(config: Config, doc: Doc) {
|
||||
if (config.indexing.slashLogs) {
|
||||
return 'logs';
|
||||
}
|
||||
if (doc.data_stream?.dataset) {
|
||||
const { dataset } = doc.data_stream;
|
||||
const type = doc.data_stream.type ?? 'logs';
|
||||
|
|
|
@ -5,9 +5,7 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
export async function wait(delay: number, logger: ToolingLog) {
|
||||
logger.info(`Waiting ${delay}ms`);
|
||||
export async function wait(delay: number) {
|
||||
await new Promise((resolve) => {
|
||||
setTimeout(resolve, delay);
|
||||
});
|
||||
|
|
|
@ -127,6 +127,7 @@ export const ConfigRT = rt.type({
|
|||
ephemeralProjectIds: rt.number,
|
||||
alignEventsToInterval: rt.boolean,
|
||||
artificialIndexDelay: rt.number,
|
||||
slashLogs: rt.boolean,
|
||||
}),
|
||||
schedule: rt.array(ScheduleRT),
|
||||
});
|
||||
|
@ -187,4 +188,5 @@ export interface CliOptions {
|
|||
ephemeralProjectIds: number;
|
||||
alignEventsToInterval: boolean;
|
||||
scheduleEnd?: string;
|
||||
slashLogs: boolean;
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ export class SLoDataService {
|
|||
ephemeralProjectIds: DEFAULTS.EPHEMERAL_PROJECT_IDS,
|
||||
alignEventsToInterval: DEFAULTS.ALIGN_EVENTS_TO_INTERVAL,
|
||||
scheduleEnd: 'now+10m',
|
||||
slashLogs: false,
|
||||
}).then((res) => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log(res);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue