[eem] add default lookback (#189395)

Closes https://github.com/elastic/kibana/issues/187348

This changes adds an optional `history.settings.lookbackPeriod` property
that will default to `1h` if none is provided. The main point is to
prevent accidental processing of the entire dataset when creating a
definition.

I took the opportunity to do some refactoring:
- `durationSchema` was transforming a literal duration (eg `1h`) into a
`moment.Duration` with overriden `toJSON` property. since we don't use
any of the `moment` functionalities in consuming code the schema now
returns the raw string after regex validation
- split the `generateHistoryTransform` in `generateHistoryTransform` and
`generateBackfillHistoryTransform`
This commit is contained in:
Kevin Lacabane 2024-07-31 14:13:39 +02:00 committed by GitHub
parent 22de72d022
commit 1e23b6dc3d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 166 additions and 69 deletions

View file

@ -5,9 +5,7 @@
* 2.0.
*/
import { SafeParseSuccess } from 'zod';
import { durationSchema, metadataSchema, semVerSchema } from './common';
import moment from 'moment';
import { durationSchema, metadataSchema, semVerSchema, historySettingsSchema } from './common';
describe('schemas', () => {
describe('metadataSchema', () => {
@ -60,38 +58,46 @@ describe('schemas', () => {
expect(result).toMatchSnapshot();
});
});
describe('durationSchema', () => {
it('should work with 1m', () => {
const result = durationSchema.safeParse('1m');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('1m');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(60);
expect(result.data).toBe('1m');
});
it('should work with 10s', () => {
const result = durationSchema.safeParse('10s');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('10s');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(10);
expect(result.data).toBe('10s');
});
it('should work with 999h', () => {
const result = durationSchema.safeParse('999h');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('999h');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(999 * 60 * 60);
expect(result.data).toBe('999h');
});
it('should work with 90d', () => {
const result = durationSchema.safeParse('90d');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('90d');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(
90 * 24 * 60 * 60
);
expect(result.data).toBe('90d');
});
it('should not work with 1ms', () => {
const result = durationSchema.safeParse('1ms');
expect(result.success).toBeFalsy();
});
it('should not work with invalid values', () => {
let result = durationSchema.safeParse('PT1H');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse('1H');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse('1f');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse('foo');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse(' 1h ');
expect(result.success).toBeFalsy();
});
});
describe('semVerSchema', () => {
it('should validate with 999.999.999', () => {
const result = semVerSchema.safeParse('999.999.999');
@ -103,4 +109,30 @@ describe('schemas', () => {
expect(result).toMatchSnapshot();
});
});
describe('historySettingsSchema', () => {
it('should return default values when not defined', () => {
let result = historySettingsSchema.safeParse(undefined);
expect(result.success).toBeTruthy();
expect(result.data).toEqual({ lookbackPeriod: '1h' });
result = historySettingsSchema.safeParse({ syncDelay: '1m' });
expect(result.success).toBeTruthy();
expect(result.data).toEqual({ syncDelay: '1m', lookbackPeriod: '1h' });
});
it('should return user defined values when defined', () => {
const result = historySettingsSchema.safeParse({
lookbackPeriod: '30m',
syncField: 'event.ingested',
syncDelay: '5m',
});
expect(result.success).toBeTruthy();
expect(result.data).toEqual({
lookbackPeriod: '30m',
syncField: 'event.ingested',
syncDelay: '5m',
});
});
});
});

View file

@ -44,20 +44,21 @@ export const docCountMetricSchema = z.object({
filter: filterSchema,
});
export const durationSchema = z
.string()
.regex(/^\d+[m|d|s|h]$/)
.transform((val: string) => {
const parts = val.match(/(\d+)([m|s|h|d])/);
if (parts === null) {
throw new Error('Unable to parse duration');
}
const value = parseInt(parts[1], 10);
const unit = parts[2] as 'm' | 's' | 'h' | 'd';
const duration = moment.duration(value, unit);
duration.toJSON = () => val;
return duration;
});
export const durationSchema = z.string().regex(/^\d+[m|d|s|h]$/);
export const durationSchemaWithMinimum = (minimumMinutes: number) =>
durationSchema.refine(
(val: string) => {
const parts = val.match(/(\d+)([m|s|h|d])/);
if (parts === null) {
throw new Error('Unable to parse duration');
}
const value = parseInt(parts[1], 10);
const unit = parts[2] as 'm' | 's' | 'h' | 'd';
return moment.duration(value, unit).asMinutes() >= minimumMinutes;
},
{ message: `can not be less than ${minimumMinutes}m` }
);
export const percentileMetricSchema = z.object({
name: metricNameSchema,
@ -131,3 +132,22 @@ export const semVerSchema = z.string().refine((maybeSemVer) => semVerRegex.test(
message:
'The string does use the Semantic Versioning (Semver) format of {major}.{minor}.{patch} (e.g., 1.0.0), ensure each part contains only digits.',
});
export const historySettingsSchema = z
.optional(
z.object({
syncField: z.optional(z.string()),
syncDelay: z.optional(durationSchema),
lookbackPeriod: z.optional(durationSchema).default('1h'),
frequency: z.optional(durationSchema),
backfillSyncDelay: z.optional(durationSchema),
backfillLookbackPeriod: z.optional(durationSchema),
backfillFrequency: z.optional(durationSchema),
})
)
.transform((settings) => {
return {
...settings,
lookbackPeriod: settings?.lookbackPeriod || durationSchema.parse('1h'),
};
});

View file

@ -14,6 +14,8 @@ import {
durationSchema,
identityFieldsSchema,
semVerSchema,
historySettingsSchema,
durationSchemaWithMinimum,
} from './common';
export const entityDefinitionSchema = z.object({
@ -32,27 +34,16 @@ export const entityDefinitionSchema = z.object({
managed: z.optional(z.boolean()).default(false),
history: z.object({
timestampField: z.string(),
interval: durationSchema.refine((val) => val.asMinutes() >= 1, {
message: 'The history.interval can not be less than 1m',
}),
settings: z.optional(
z.object({
syncField: z.optional(z.string()),
syncDelay: z.optional(z.string()),
frequency: z.optional(z.string()),
backfillSyncDelay: z.optional(z.string()),
backfillLookbackPeriod: z.optional(durationSchema),
backfillFrequency: z.optional(z.string()),
})
),
interval: durationSchemaWithMinimum(1),
settings: historySettingsSchema,
}),
latest: z.optional(
z.object({
settings: z.optional(
z.object({
syncField: z.optional(z.string()),
syncDelay: z.optional(z.string()),
frequency: z.optional(z.string()),
syncDelay: z.optional(durationSchema),
frequency: z.optional(durationSchema),
})
),
})

View file

@ -27,12 +27,12 @@ export const builtInServicesFromLogsEntityDefinition: EntityDefinition =
'This definition extracts service entities from common data streams by looking for the ECS field service.name',
type: 'service',
managed: true,
filter: '@timestamp >= now-10m',
indexPatterns: ['logs-*', 'filebeat*', 'metrics-apm.service_transaction.1m*'],
history: {
timestampField: '@timestamp',
interval: '1m',
settings: {
lookbackPeriod: '10m',
frequency: '2m',
syncDelay: '2m',
},

View file

@ -9,7 +9,10 @@ import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { EntityDefinition } from '@kbn/entities-schema';
import { retryTransientEsErrors } from './helpers/retry';
import { generateLatestTransform } from './transform/generate_latest_transform';
import { generateHistoryTransform } from './transform/generate_history_transform';
import {
generateBackfillHistoryTransform,
generateHistoryTransform,
} from './transform/generate_history_transform';
export async function createAndInstallHistoryTransform(
esClient: ElasticsearchClient,
@ -33,7 +36,7 @@ export async function createAndInstallHistoryBackfillTransform(
logger: Logger
) {
try {
const historyTransform = generateHistoryTransform(definition, true);
const historyTransform = generateBackfillHistoryTransform(definition);
await retryTransientEsErrors(() => esClient.transform.putTransform(historyTransform), {
logger,
});

View file

@ -8,5 +8,5 @@
import { EntityDefinition } from '@kbn/entities-schema';
export function isBackfillEnabled(definition: EntityDefinition) {
return definition.history.settings?.backfillSyncDelay != null;
return definition.history.settings.backfillSyncDelay != null;
}

View file

@ -270,6 +270,19 @@ Object {
"index": Array [
"kbn-data-forge-fake_stack.*",
],
"query": Object {
"bool": Object {
"filter": Array [
Object {
"range": Object {
"@timestamp": Object {
"gte": "now-1h",
},
},
},
],
},
},
},
"sync": Object {
"time": Object {

View file

@ -7,7 +7,10 @@
import { entityDefinition } from '../helpers/fixtures/entity_definition';
import { entityDefinitionWithBackfill } from '../helpers/fixtures/entity_definition_with_backfill';
import { generateHistoryTransform } from './generate_history_transform';
import {
generateBackfillHistoryTransform,
generateHistoryTransform,
} from './generate_history_transform';
describe('generateHistoryTransform(definition)', () => {
it('should generate a valid history transform', () => {
@ -15,7 +18,7 @@ describe('generateHistoryTransform(definition)', () => {
expect(transform).toMatchSnapshot();
});
it('should generate a valid history backfill transform', () => {
const transform = generateHistoryTransform(entityDefinitionWithBackfill, true);
const transform = generateBackfillHistoryTransform(entityDefinitionWithBackfill);
expect(transform).toMatchSnapshot();
});
});

View file

@ -26,12 +26,37 @@ import {
import { isBackfillEnabled } from '../helpers/is_backfill_enabled';
export function generateHistoryTransform(
definition: EntityDefinition,
backfill = false
definition: EntityDefinition
): TransformPutTransformRequest {
if (backfill && !isBackfillEnabled(definition)) {
const filter: QueryDslQueryContainer[] = [];
if (definition.filter) {
filter.push(getElasticsearchQueryOrThrow(definition.filter));
}
filter.push({
range: {
[definition.history.timestampField]: {
gte: `now-${definition.history.settings.lookbackPeriod}`,
},
},
});
return generateTransformPutRequest({
definition,
filter,
transformId: generateHistoryTransformId(definition),
frequency: definition.history.settings.frequency,
syncDelay: definition.history.settings.syncDelay,
});
}
export function generateBackfillHistoryTransform(
definition: EntityDefinition
): TransformPutTransformRequest {
if (!isBackfillEnabled(definition)) {
throw new Error(
'This function was called with backfill=true without history.settings.backfillSyncDelay'
'generateBackfillHistoryTransform called without history.settings.backfillSyncDelay set'
);
}
@ -41,28 +66,38 @@ export function generateHistoryTransform(
filter.push(getElasticsearchQueryOrThrow(definition.filter));
}
if (backfill && definition.history.settings?.backfillLookbackPeriod) {
if (definition.history.settings.backfillLookbackPeriod) {
filter.push({
range: {
[definition.history.timestampField]: {
gte: `now-${definition.history.settings?.backfillLookbackPeriod.toJSON()}`,
gte: `now-${definition.history.settings.backfillLookbackPeriod}`,
},
},
});
}
const syncDelay = backfill
? definition.history.settings?.backfillSyncDelay
: definition.history.settings?.syncDelay;
const transformId = backfill
? generateHistoryBackfillTransformId(definition)
: generateHistoryTransformId(definition);
const frequency = backfill
? definition.history.settings?.backfillFrequency
: definition.history.settings?.frequency;
return generateTransformPutRequest({
definition,
filter,
transformId: generateHistoryBackfillTransformId(definition),
frequency: definition.history.settings.backfillFrequency,
syncDelay: definition.history.settings.backfillSyncDelay,
});
}
const generateTransformPutRequest = ({
definition,
filter,
transformId,
frequency,
syncDelay,
}: {
definition: EntityDefinition;
transformId: string;
filter: QueryDslQueryContainer[];
frequency?: string;
syncDelay?: string;
}) => {
return {
transform_id: transformId,
_meta: {
@ -87,7 +122,7 @@ export function generateHistoryTransform(
frequency: frequency || ENTITY_DEFAULT_HISTORY_FREQUENCY,
sync: {
time: {
field: definition.history.settings?.syncField ?? definition.history.timestampField,
field: definition.history.settings.syncField || definition.history.timestampField,
delay: syncDelay || ENTITY_DEFAULT_HISTORY_SYNC_DELAY,
},
},
@ -109,7 +144,7 @@ export function generateHistoryTransform(
['@timestamp']: {
date_histogram: {
field: definition.history.timestampField,
fixed_interval: definition.history.interval.toJSON(),
fixed_interval: definition.history.interval,
},
},
},
@ -124,4 +159,4 @@ export function generateHistoryTransform(
},
},
};
}
};

View file

@ -38,7 +38,7 @@ export function generateLatestMetadataAggregations(definition: EntityDefinition)
filter: {
range: {
'event.ingested': {
gte: `now-${definition.history.interval.toJSON()}`,
gte: `now-${definition.history.interval}`,
},
},
},