mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[EEM] Add option to enable backfill transform (#188379)
## Summary This PR adds 2 new optional settings for the history section of the entity definition to enable a backfill transform: - `history.settings.backfillSyncDelay` – A duration format, i.e. `15m`, that enables the backfill transform and sets the sync delay to whatever duration the user has configured - `history.settings.backfilLookbackPeriod` – Controls how far back the transform will start processing documents. The idea behind this transform is that it will run with a longer delay than the default transform. If there are events that show up after the default transform's checkpoint has moved on, the backfill transform will make a second sweep to backfill any data the default transform had missed. ### Testing Save the following config to `fake_logs.delayed.yaml` ```YAML --- elasticsearch: installKibanaUser: false kibana: installAssets: true host: "http://localhost:5601/kibana" indexing: dataset: "fake_logs" eventsPerCycle: 100 artificialIndexDelay: 300000 schedule: - template: "good" start: "now-1d" end: false eventsPerCycle: 100 ``` run `node x-pack/scripts/data_forge.js --config fake_logs.delayed.yaml` then run the following in Kibana's "Dev Tools": ```JSON POST kbn:/internal/api/entities/definition { "id": "fake-logs-services-no-backfill", "name": "Services for Fake Logs", "type": "service", "version": "0.0.1", "indexPatterns": ["kbn-data-forge-fake_logs.*"], "history": { "timestampField": "@timestamp", "interval": "1m" }, "identityFields": ["labels.groupId", "labels.eventId"], "displayNameTemplate": "{{labels.groupId}}:{{labels.eventId}}", "metadata": [ "host.name" ], "metrics": [ { "name": "latency", "equation": "A", "metrics": [ { "name": "A", "aggregation": "avg", "field": "event.duration" } ] }, { "name": "logRate", "equation": "A", "metrics": [ { "name": "A", "aggregation": "doc_count", "filter": "log.level: *" } ] }, { "name": "errorRate", "equation": "A", "metrics": [ { "name": "A", "aggregation": "doc_count", "filter": "log.level: \"error\"" } ] } ] } POST kbn:/internal/api/entities/definition { "id": "fake-logs-services-with-backfill", "name": "Services for Fake Logs", "type": "service", "version": "0.0.1", "indexPatterns": ["kbn-data-forge-fake_logs.*"], "history": { "timestampField": "@timestamp", "interval": "1m", "settings": { "backfillSyncDelay": "10m", "backfillLookback": "24h" } }, "identityFields": ["labels.groupId", "labels.eventId"], "displayNameTemplate": "{{labels.groupId}}:{{labels.eventId}}", "metadata": [ "host.name" ], "metrics": [ { "name": "latency", "equation": "A", "metrics": [ { "name": "A", "aggregation": "avg", "field": "event.duration" } ] }, { "name": "logRate", "equation": "A", "metrics": [ { "name": "A", "aggregation": "doc_count", "filter": "log.level: *" } ] }, { "name": "errorRate", "equation": "A", "metrics": [ { "name": "A", "aggregation": "doc_count", "filter": "log.level: \"error\"" } ] } ] } ``` The first transform should end up giving you history every 5 minutes, the second will backfill and give you history every minute up until ~10 minutes. If you where to create a dashboard with the document counts for the last hour, it would look like this: 
This commit is contained in:
parent
f918fdc4da
commit
6e09aef3be
19 changed files with 615 additions and 10 deletions
132
x-pack/packages/kbn-entities-schema/src/schema/__snapshots__/common.test.ts.snap
generated
Normal file
132
x-pack/packages/kbn-entities-schema/src/schema/__snapshots__/common.test.ts.snap
generated
Normal file
|
@ -0,0 +1,132 @@
|
|||
// Jest Snapshot v1, https://goo.gl/fbAQLP
|
||||
|
||||
exports[`schemas metadataSchema should error on empty string 1`] = `
|
||||
Object {
|
||||
"error": [ZodError: [
|
||||
{
|
||||
"path": [
|
||||
"source"
|
||||
],
|
||||
"code": "custom",
|
||||
"message": "source should not be empty"
|
||||
},
|
||||
{
|
||||
"path": [
|
||||
"destination"
|
||||
],
|
||||
"code": "custom",
|
||||
"message": "destination should not be empty"
|
||||
}
|
||||
]],
|
||||
"success": false,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas metadataSchema should error on empty string for destination 1`] = `
|
||||
Object {
|
||||
"error": [ZodError: [
|
||||
{
|
||||
"path": [
|
||||
"destination"
|
||||
],
|
||||
"code": "custom",
|
||||
"message": "destination should not be empty"
|
||||
}
|
||||
]],
|
||||
"success": false,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas metadataSchema should error on empty string for source 1`] = `
|
||||
Object {
|
||||
"error": [ZodError: [
|
||||
{
|
||||
"path": [
|
||||
"source"
|
||||
],
|
||||
"code": "custom",
|
||||
"message": "source should not be empty"
|
||||
},
|
||||
{
|
||||
"path": [
|
||||
"destination"
|
||||
],
|
||||
"code": "custom",
|
||||
"message": "destination should not be empty"
|
||||
}
|
||||
]],
|
||||
"success": false,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas metadataSchema should error when limit is too low 1`] = `
|
||||
Object {
|
||||
"error": [ZodError: [
|
||||
{
|
||||
"path": [
|
||||
"limit"
|
||||
],
|
||||
"code": "custom",
|
||||
"message": "limit should be greater than 1"
|
||||
}
|
||||
]],
|
||||
"success": false,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas metadataSchema should parse successfully with a source and desitination 1`] = `
|
||||
Object {
|
||||
"data": Object {
|
||||
"destination": "hostName",
|
||||
"limit": 1000,
|
||||
"source": "host.name",
|
||||
},
|
||||
"success": true,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas metadataSchema should parse successfully with an valid string 1`] = `
|
||||
Object {
|
||||
"data": Object {
|
||||
"destination": "host.name",
|
||||
"limit": 1000,
|
||||
"source": "host.name",
|
||||
},
|
||||
"success": true,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas metadataSchema should parse successfully with just a source 1`] = `
|
||||
Object {
|
||||
"data": Object {
|
||||
"destination": "host.name",
|
||||
"limit": 1000,
|
||||
"source": "host.name",
|
||||
},
|
||||
"success": true,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas metadataSchema should parse successfully with valid object 1`] = `
|
||||
Object {
|
||||
"data": Object {
|
||||
"destination": "hostName",
|
||||
"limit": 1000,
|
||||
"source": "host.name",
|
||||
},
|
||||
"success": true,
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`schemas semVerSchema should not validate with 0.9 1`] = `
|
||||
Object {
|
||||
"error": [ZodError: [
|
||||
{
|
||||
"code": "custom",
|
||||
"message": "The string does use the Semantic Versioning (Semver) format of {major}.{minor}.{patch} (e.g., 1.0.0), ensure each part contains only digits.",
|
||||
"path": []
|
||||
}
|
||||
]],
|
||||
"success": false,
|
||||
}
|
||||
`;
|
106
x-pack/packages/kbn-entities-schema/src/schema/common.test.ts
Normal file
106
x-pack/packages/kbn-entities-schema/src/schema/common.test.ts
Normal file
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { SafeParseSuccess } from 'zod';
|
||||
import { durationSchema, metadataSchema, semVerSchema } from './common';
|
||||
import moment from 'moment';
|
||||
|
||||
describe('schemas', () => {
|
||||
describe('metadataSchema', () => {
|
||||
it('should error on empty string', () => {
|
||||
const result = metadataSchema.safeParse('');
|
||||
expect(result.success).toBeFalsy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
it('should error on empty string for source', () => {
|
||||
const result = metadataSchema.safeParse({ source: '' });
|
||||
expect(result.success).toBeFalsy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
it('should error on empty string for destination', () => {
|
||||
const result = metadataSchema.safeParse({ source: 'host.name', destination: '', limit: 10 });
|
||||
expect(result.success).toBeFalsy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
it('should error when limit is too low', () => {
|
||||
const result = metadataSchema.safeParse({
|
||||
source: 'host.name',
|
||||
destination: 'host.name',
|
||||
limit: 0,
|
||||
});
|
||||
expect(result.success).toBeFalsy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
it('should parse successfully with an valid string', () => {
|
||||
const result = metadataSchema.safeParse('host.name');
|
||||
expect(result.success).toBeTruthy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
it('should parse successfully with just a source', () => {
|
||||
const result = metadataSchema.safeParse({ source: 'host.name' });
|
||||
expect(result.success).toBeTruthy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
it('should parse successfully with a source and desitination', () => {
|
||||
const result = metadataSchema.safeParse({ source: 'host.name', destination: 'hostName' });
|
||||
expect(result.success).toBeTruthy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
it('should parse successfully with valid object', () => {
|
||||
const result = metadataSchema.safeParse({
|
||||
source: 'host.name',
|
||||
destination: 'hostName',
|
||||
size: 1,
|
||||
});
|
||||
expect(result.success).toBeTruthy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
});
|
||||
describe('durationSchema', () => {
|
||||
it('should work with 1m', () => {
|
||||
const result = durationSchema.safeParse('1m');
|
||||
expect(result.success).toBeTruthy();
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('1m');
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(60);
|
||||
});
|
||||
it('should work with 10s', () => {
|
||||
const result = durationSchema.safeParse('10s');
|
||||
expect(result.success).toBeTruthy();
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('10s');
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(10);
|
||||
});
|
||||
it('should work with 999h', () => {
|
||||
const result = durationSchema.safeParse('999h');
|
||||
expect(result.success).toBeTruthy();
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('999h');
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(999 * 60 * 60);
|
||||
});
|
||||
it('should work with 90d', () => {
|
||||
const result = durationSchema.safeParse('90d');
|
||||
expect(result.success).toBeTruthy();
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('90d');
|
||||
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(
|
||||
90 * 24 * 60 * 60
|
||||
);
|
||||
});
|
||||
it('should not work with 1ms', () => {
|
||||
const result = durationSchema.safeParse('1ms');
|
||||
expect(result.success).toBeFalsy();
|
||||
});
|
||||
});
|
||||
describe('semVerSchema', () => {
|
||||
it('should validate with 999.999.999', () => {
|
||||
const result = semVerSchema.safeParse('999.999.999');
|
||||
expect(result.success).toBeTruthy();
|
||||
});
|
||||
it('should not validate with 0.9', () => {
|
||||
const result = semVerSchema.safeParse('0.9');
|
||||
expect(result.success).toBeFalsy();
|
||||
expect(result).toMatchSnapshot();
|
||||
});
|
||||
});
|
||||
});
|
|
@ -45,7 +45,7 @@ export const docCountMetricSchema = z.object({
|
|||
|
||||
export const durationSchema = z
|
||||
.string()
|
||||
.regex(/\d+[m|d|s|h]/)
|
||||
.regex(/^\d+[m|d|s|h]$/)
|
||||
.transform((val: string) => {
|
||||
const parts = val.match(/(\d+)([m|s|h|d])/);
|
||||
if (parts === null) {
|
||||
|
@ -93,7 +93,30 @@ export const metadataSchema = z
|
|||
destination: metadata.destination ?? metadata.source,
|
||||
limit: metadata.limit ?? 1000,
|
||||
}))
|
||||
.or(z.string().transform((value) => ({ source: value, destination: value, limit: 1000 })));
|
||||
.or(z.string().transform((value) => ({ source: value, destination: value, limit: 1000 })))
|
||||
.superRefine((value, ctx) => {
|
||||
if (value.limit < 1) {
|
||||
ctx.addIssue({
|
||||
path: ['limit'],
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: 'limit should be greater than 1',
|
||||
});
|
||||
}
|
||||
if (value.source.length === 0) {
|
||||
ctx.addIssue({
|
||||
path: ['source'],
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: 'source should not be empty',
|
||||
});
|
||||
}
|
||||
if (value.destination.length === 0) {
|
||||
ctx.addIssue({
|
||||
path: ['destination'],
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: 'destination should not be empty',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
export const identityFieldsSchema = z
|
||||
.object({
|
||||
|
|
|
@ -40,6 +40,9 @@ export const entityDefinitionSchema = z.object({
|
|||
syncField: z.optional(z.string()),
|
||||
syncDelay: z.optional(z.string()),
|
||||
frequency: z.optional(z.string()),
|
||||
backfillSyncDelay: z.optional(z.string()),
|
||||
backfillLookbackPeriod: z.optional(durationSchema),
|
||||
backfillFrequency: z.optional(z.string()),
|
||||
})
|
||||
),
|
||||
}),
|
||||
|
|
|
@ -22,6 +22,8 @@ export const ENTITY_HISTORY_BASE_COMPONENT_TEMPLATE_V1 =
|
|||
`${ENTITY_BASE_PREFIX}_${ENTITY_SCHEMA_VERSION_V1}_${ENTITY_HISTORY}_base` as const;
|
||||
export const ENTITY_HISTORY_PREFIX_V1 =
|
||||
`${ENTITY_BASE_PREFIX}-${ENTITY_SCHEMA_VERSION_V1}-${ENTITY_HISTORY}` as const;
|
||||
export const ENTITY_HISTORY_BACKFILL_PREFIX_V1 =
|
||||
`${ENTITY_BASE_PREFIX}-${ENTITY_SCHEMA_VERSION_V1}-${ENTITY_HISTORY}-backfill` as const;
|
||||
export const ENTITY_HISTORY_INDEX_PREFIX_V1 =
|
||||
`${ENTITY_INDEX_PREFIX}.${ENTITY_SCHEMA_VERSION_V1}.${ENTITY_HISTORY}` as const;
|
||||
|
||||
|
|
|
@ -27,6 +27,24 @@ export async function createAndInstallHistoryTransform(
|
|||
}
|
||||
}
|
||||
|
||||
export async function createAndInstallHistoryBackfillTransform(
|
||||
esClient: ElasticsearchClient,
|
||||
definition: EntityDefinition,
|
||||
logger: Logger
|
||||
) {
|
||||
try {
|
||||
const historyTransform = generateHistoryTransform(definition, true);
|
||||
await retryTransientEsErrors(() => esClient.transform.putTransform(historyTransform), {
|
||||
logger,
|
||||
});
|
||||
} catch (e) {
|
||||
logger.error(
|
||||
`Cannot create entity history backfill transform for [${definition.id}] entity definition`
|
||||
);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
export async function createAndInstallLatestTransform(
|
||||
esClient: ElasticsearchClient,
|
||||
definition: EntityDefinition,
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { entityDefinitionSchema } from '@kbn/entities-schema';
|
||||
export const entityDefinitionWithBackfill = entityDefinitionSchema.parse({
|
||||
id: 'admin-console-services',
|
||||
version: '999.999.999',
|
||||
name: 'Services for Admin Console',
|
||||
type: 'service',
|
||||
indexPatterns: ['kbn-data-forge-fake_stack.*'],
|
||||
history: {
|
||||
timestampField: '@timestamp',
|
||||
interval: '1m',
|
||||
settings: {
|
||||
backfillSyncDelay: '15m',
|
||||
backfillLookbackPeriod: '72h',
|
||||
backfillFrequency: '5m',
|
||||
},
|
||||
},
|
||||
identityFields: ['log.logger', { field: 'event.category', optional: true }],
|
||||
displayNameTemplate: '{{log.logger}}{{#event.category}}:{{.}}{{/event.category}}',
|
||||
metadata: ['tags', 'host.name', 'host.os.name', { source: '_index', destination: 'sourceIndex' }],
|
||||
metrics: [
|
||||
{
|
||||
name: 'logRate',
|
||||
equation: 'A',
|
||||
metrics: [
|
||||
{
|
||||
name: 'A',
|
||||
aggregation: 'doc_count',
|
||||
filter: 'log.level: *',
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
name: 'errorRate',
|
||||
equation: 'A',
|
||||
metrics: [
|
||||
{
|
||||
name: 'A',
|
||||
aggregation: 'doc_count',
|
||||
filter: 'log.level: "ERROR"',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
});
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
import { EntityDefinition } from '@kbn/entities-schema';
|
||||
import {
|
||||
ENTITY_HISTORY_BACKFILL_PREFIX_V1,
|
||||
ENTITY_HISTORY_INDEX_PREFIX_V1,
|
||||
ENTITY_HISTORY_PREFIX_V1,
|
||||
ENTITY_LATEST_INDEX_PREFIX_V1,
|
||||
|
@ -18,6 +19,11 @@ function generateHistoryId(definition: EntityDefinition) {
|
|||
return `${ENTITY_HISTORY_PREFIX_V1}-${definition.id}`;
|
||||
}
|
||||
|
||||
// History Backfill
|
||||
export function generateHistoryBackfillTransformId(definition: EntityDefinition) {
|
||||
return `${ENTITY_HISTORY_BACKFILL_PREFIX_V1}-${definition.id}`;
|
||||
}
|
||||
|
||||
export const generateHistoryTransformId = generateHistoryId;
|
||||
export const generateHistoryIngestPipelineId = generateHistoryId;
|
||||
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { EntityDefinition } from '@kbn/entities-schema';
|
||||
|
||||
export function isBackfillEnabled(definition: EntityDefinition) {
|
||||
return definition.history.settings?.backfillSyncDelay != null;
|
||||
}
|
|
@ -26,7 +26,7 @@ function createMetadataPainlessScript(definition: EntityDefinition) {
|
|||
}
|
||||
|
||||
return definition.metadata.reduce((acc, def) => {
|
||||
const destination = def.destination || def.source;
|
||||
const destination = def.destination;
|
||||
const optionalFieldPath = destination.replaceAll('.', '?.');
|
||||
const next = `
|
||||
if (ctx.entity?.metadata?.${optionalFieldPath} != null) {
|
||||
|
|
|
@ -18,6 +18,7 @@ import {
|
|||
createAndInstallLatestIngestPipeline,
|
||||
} from './create_and_install_ingest_pipeline';
|
||||
import {
|
||||
createAndInstallHistoryBackfillTransform,
|
||||
createAndInstallHistoryTransform,
|
||||
createAndInstallLatestTransform,
|
||||
} from './create_and_install_transform';
|
||||
|
@ -28,10 +29,12 @@ import { findEntityDefinitions } from './find_entity_definition';
|
|||
import { saveEntityDefinition } from './save_entity_definition';
|
||||
import { startTransform } from './start_transform';
|
||||
import {
|
||||
stopAndDeleteHistoryBackfillTransform,
|
||||
stopAndDeleteHistoryTransform,
|
||||
stopAndDeleteLatestTransform,
|
||||
} from './stop_and_delete_transform';
|
||||
import { uninstallEntityDefinition } from './uninstall_entity_definition';
|
||||
import { isBackfillEnabled } from './helpers/is_backfill_enabled';
|
||||
import { deleteTemplate, upsertTemplate } from '../manage_index_templates';
|
||||
import { getEntitiesLatestIndexTemplateConfig } from '../../templates/entities_latest_template';
|
||||
import { getEntitiesHistoryIndexTemplateConfig } from '../../templates/entities_history_template';
|
||||
|
@ -56,6 +59,7 @@ export async function installEntityDefinition({
|
|||
},
|
||||
transforms: {
|
||||
history: false,
|
||||
backfill: false,
|
||||
latest: false,
|
||||
},
|
||||
definition: false,
|
||||
|
@ -98,6 +102,10 @@ export async function installEntityDefinition({
|
|||
logger.debug(`Installing transforms for definition ${definition.id}`);
|
||||
await createAndInstallHistoryTransform(esClient, entityDefinition, logger);
|
||||
installState.transforms.history = true;
|
||||
if (isBackfillEnabled(entityDefinition)) {
|
||||
await createAndInstallHistoryBackfillTransform(esClient, entityDefinition, logger);
|
||||
installState.transforms.backfill = true;
|
||||
}
|
||||
await createAndInstallLatestTransform(esClient, entityDefinition, logger);
|
||||
installState.transforms.latest = true;
|
||||
|
||||
|
@ -120,6 +128,10 @@ export async function installEntityDefinition({
|
|||
await stopAndDeleteHistoryTransform(esClient, definition, logger);
|
||||
}
|
||||
|
||||
if (installState.transforms.backfill) {
|
||||
await stopAndDeleteHistoryBackfillTransform(esClient, definition, logger);
|
||||
}
|
||||
|
||||
if (installState.transforms.latest) {
|
||||
await stopAndDeleteLatestTransform(esClient, definition, logger);
|
||||
}
|
||||
|
|
|
@ -8,10 +8,12 @@
|
|||
import { ElasticsearchClient, Logger } from '@kbn/core/server';
|
||||
import { EntityDefinition } from '@kbn/entities-schema';
|
||||
import {
|
||||
generateHistoryBackfillTransformId,
|
||||
generateHistoryTransformId,
|
||||
generateLatestTransformId,
|
||||
} from './helpers/generate_component_id';
|
||||
import { retryTransientEsErrors } from './helpers/retry';
|
||||
import { isBackfillEnabled } from './helpers/is_backfill_enabled';
|
||||
|
||||
export async function startTransform(
|
||||
esClient: ElasticsearchClient,
|
||||
|
@ -26,6 +28,17 @@ export async function startTransform(
|
|||
esClient.transform.startTransform({ transform_id: historyTransformId }, { ignore: [409] }),
|
||||
{ logger }
|
||||
);
|
||||
if (isBackfillEnabled(definition)) {
|
||||
const historyBackfillTransformId = generateHistoryBackfillTransformId(definition);
|
||||
await retryTransientEsErrors(
|
||||
() =>
|
||||
esClient.transform.startTransform(
|
||||
{ transform_id: historyBackfillTransformId },
|
||||
{ ignore: [409] }
|
||||
),
|
||||
{ logger }
|
||||
);
|
||||
}
|
||||
await retryTransientEsErrors(
|
||||
() =>
|
||||
esClient.transform.startTransform({ transform_id: latestTransformId }, { ignore: [409] }),
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
import { ElasticsearchClient, Logger } from '@kbn/core/server';
|
||||
import { EntityDefinition } from '@kbn/entities-schema';
|
||||
import {
|
||||
generateHistoryBackfillTransformId,
|
||||
generateHistoryTransformId,
|
||||
generateLatestTransformId,
|
||||
} from './helpers/generate_component_id';
|
||||
|
@ -42,6 +43,35 @@ export async function stopAndDeleteHistoryTransform(
|
|||
}
|
||||
}
|
||||
|
||||
export async function stopAndDeleteHistoryBackfillTransform(
|
||||
esClient: ElasticsearchClient,
|
||||
definition: EntityDefinition,
|
||||
logger: Logger
|
||||
) {
|
||||
try {
|
||||
const historyBackfillTransformId = generateHistoryBackfillTransformId(definition);
|
||||
await retryTransientEsErrors(
|
||||
() =>
|
||||
esClient.transform.stopTransform(
|
||||
{ transform_id: historyBackfillTransformId, wait_for_completion: true, force: true },
|
||||
{ ignore: [409, 404] }
|
||||
),
|
||||
{ logger }
|
||||
);
|
||||
await retryTransientEsErrors(
|
||||
() =>
|
||||
esClient.transform.deleteTransform(
|
||||
{ transform_id: historyBackfillTransformId, force: true },
|
||||
{ ignore: [404] }
|
||||
),
|
||||
{ logger }
|
||||
);
|
||||
} catch (e) {
|
||||
logger.error(`Cannot stop or delete history backfill transform [${definition.id}]`);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
export async function stopAndDeleteLatestTransform(
|
||||
esClient: ElasticsearchClient,
|
||||
definition: EntityDefinition,
|
||||
|
|
|
@ -1,6 +1,153 @@
|
|||
// Jest Snapshot v1, https://goo.gl/fbAQLP
|
||||
|
||||
exports[`generateHistoryTransform(definition) should generate a valid latest transform 1`] = `
|
||||
exports[`generateHistoryTransform(definition) should generate a valid history backfill transform 1`] = `
|
||||
Object {
|
||||
"_meta": Object {
|
||||
"definitionVersion": "999.999.999",
|
||||
"managed": false,
|
||||
},
|
||||
"defer_validation": true,
|
||||
"dest": Object {
|
||||
"index": ".entities.v1.history.noop",
|
||||
"pipeline": "entities-v1-history-admin-console-services",
|
||||
},
|
||||
"frequency": "5m",
|
||||
"pivot": Object {
|
||||
"aggs": Object {
|
||||
"_errorRate_A": Object {
|
||||
"filter": Object {
|
||||
"bool": Object {
|
||||
"minimum_should_match": 1,
|
||||
"should": Array [
|
||||
Object {
|
||||
"match_phrase": Object {
|
||||
"log.level": "ERROR",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
"_logRate_A": Object {
|
||||
"filter": Object {
|
||||
"bool": Object {
|
||||
"minimum_should_match": 1,
|
||||
"should": Array [
|
||||
Object {
|
||||
"exists": Object {
|
||||
"field": "log.level",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
"entity.lastSeenTimestamp": Object {
|
||||
"max": Object {
|
||||
"field": "@timestamp",
|
||||
},
|
||||
},
|
||||
"entity.metadata.host.name": Object {
|
||||
"terms": Object {
|
||||
"field": "host.name",
|
||||
"size": 1000,
|
||||
},
|
||||
},
|
||||
"entity.metadata.host.os.name": Object {
|
||||
"terms": Object {
|
||||
"field": "host.os.name",
|
||||
"size": 1000,
|
||||
},
|
||||
},
|
||||
"entity.metadata.sourceIndex": Object {
|
||||
"terms": Object {
|
||||
"field": "_index",
|
||||
"size": 1000,
|
||||
},
|
||||
},
|
||||
"entity.metadata.tags": Object {
|
||||
"terms": Object {
|
||||
"field": "tags",
|
||||
"size": 1000,
|
||||
},
|
||||
},
|
||||
"entity.metrics.errorRate": Object {
|
||||
"bucket_script": Object {
|
||||
"buckets_path": Object {
|
||||
"A": "_errorRate_A>_count",
|
||||
},
|
||||
"script": Object {
|
||||
"lang": "painless",
|
||||
"source": "params.A",
|
||||
},
|
||||
},
|
||||
},
|
||||
"entity.metrics.logRate": Object {
|
||||
"bucket_script": Object {
|
||||
"buckets_path": Object {
|
||||
"A": "_logRate_A>_count",
|
||||
},
|
||||
"script": Object {
|
||||
"lang": "painless",
|
||||
"source": "params.A",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"group_by": Object {
|
||||
"@timestamp": Object {
|
||||
"date_histogram": Object {
|
||||
"field": "@timestamp",
|
||||
"fixed_interval": "1m",
|
||||
},
|
||||
},
|
||||
"entity.identity.event.category": Object {
|
||||
"terms": Object {
|
||||
"field": "event.category",
|
||||
"missing_bucket": true,
|
||||
},
|
||||
},
|
||||
"entity.identity.log.logger": Object {
|
||||
"terms": Object {
|
||||
"field": "log.logger",
|
||||
"missing_bucket": false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"settings": Object {
|
||||
"deduce_mappings": false,
|
||||
"unattended": true,
|
||||
},
|
||||
"source": Object {
|
||||
"index": Array [
|
||||
"kbn-data-forge-fake_stack.*",
|
||||
],
|
||||
"query": Object {
|
||||
"bool": Object {
|
||||
"filter": Array [
|
||||
Object {
|
||||
"range": Object {
|
||||
"@timestamp": Object {
|
||||
"gte": "now-72h",
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
"sync": Object {
|
||||
"time": Object {
|
||||
"delay": "15m",
|
||||
"field": "@timestamp",
|
||||
},
|
||||
},
|
||||
"transform_id": "entities-v1-history-backfill-admin-console-services",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`generateHistoryTransform(definition) should generate a valid history transform 1`] = `
|
||||
Object {
|
||||
"_meta": Object {
|
||||
"definitionVersion": "999.999.999",
|
||||
|
|
|
@ -6,11 +6,16 @@
|
|||
*/
|
||||
|
||||
import { entityDefinition } from '../helpers/fixtures/entity_definition';
|
||||
import { entityDefinitionWithBackfill } from '../helpers/fixtures/entity_definition_with_backfill';
|
||||
import { generateHistoryTransform } from './generate_history_transform';
|
||||
|
||||
describe('generateHistoryTransform(definition)', () => {
|
||||
it('should generate a valid latest transform', () => {
|
||||
it('should generate a valid history transform', () => {
|
||||
const transform = generateHistoryTransform(entityDefinition);
|
||||
expect(transform).toMatchSnapshot();
|
||||
});
|
||||
it('should generate a valid history backfill transform', () => {
|
||||
const transform = generateHistoryTransform(entityDefinitionWithBackfill, true);
|
||||
expect(transform).toMatchSnapshot();
|
||||
});
|
||||
});
|
||||
|
|
|
@ -21,19 +21,50 @@ import {
|
|||
generateHistoryTransformId,
|
||||
generateHistoryIngestPipelineId,
|
||||
generateHistoryIndexName,
|
||||
generateHistoryBackfillTransformId,
|
||||
} from '../helpers/generate_component_id';
|
||||
import { isBackfillEnabled } from '../helpers/is_backfill_enabled';
|
||||
|
||||
export function generateHistoryTransform(
|
||||
definition: EntityDefinition
|
||||
definition: EntityDefinition,
|
||||
backfill = false
|
||||
): TransformPutTransformRequest {
|
||||
if (backfill && !isBackfillEnabled(definition)) {
|
||||
throw new Error(
|
||||
'This function was called with backfill=true without history.settings.backfillSyncDelay'
|
||||
);
|
||||
}
|
||||
|
||||
const filter: QueryDslQueryContainer[] = [];
|
||||
|
||||
if (definition.filter) {
|
||||
filter.push(getElasticsearchQueryOrThrow(definition.filter));
|
||||
}
|
||||
|
||||
if (backfill && definition.history.settings?.backfillLookbackPeriod) {
|
||||
filter.push({
|
||||
range: {
|
||||
[definition.history.timestampField]: {
|
||||
gte: `now-${definition.history.settings?.backfillLookbackPeriod.toJSON()}`,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const syncDelay = backfill
|
||||
? definition.history.settings?.backfillSyncDelay
|
||||
: definition.history.settings?.syncDelay;
|
||||
|
||||
const transformId = backfill
|
||||
? generateHistoryBackfillTransformId(definition)
|
||||
: generateHistoryTransformId(definition);
|
||||
|
||||
const frequency = backfill
|
||||
? definition.history.settings?.backfillFrequency
|
||||
: definition.history.settings?.frequency;
|
||||
|
||||
return {
|
||||
transform_id: generateHistoryTransformId(definition),
|
||||
transform_id: transformId,
|
||||
_meta: {
|
||||
definitionVersion: definition.version,
|
||||
managed: definition.managed,
|
||||
|
@ -53,11 +84,11 @@ export function generateHistoryTransform(
|
|||
index: `${generateHistoryIndexName({ id: 'noop' } as EntityDefinition)}`,
|
||||
pipeline: generateHistoryIngestPipelineId(definition),
|
||||
},
|
||||
frequency: definition.history.settings?.frequency ?? ENTITY_DEFAULT_HISTORY_FREQUENCY,
|
||||
frequency: frequency || ENTITY_DEFAULT_HISTORY_FREQUENCY,
|
||||
sync: {
|
||||
time: {
|
||||
field: definition.history.settings?.syncField ?? definition.history.timestampField,
|
||||
delay: definition.history.settings?.syncDelay ?? ENTITY_DEFAULT_HISTORY_SYNC_DELAY,
|
||||
delay: syncDelay || ENTITY_DEFAULT_HISTORY_SYNC_DELAY,
|
||||
},
|
||||
},
|
||||
settings: {
|
||||
|
|
|
@ -34,7 +34,7 @@ export function generateLatestMetadataAggregations(definition: EntityDefinition)
|
|||
return definition.metadata.reduce(
|
||||
(aggs, metadata) => ({
|
||||
...aggs,
|
||||
[`entity.metadata.${metadata.destination ?? metadata.source}`]: {
|
||||
[`entity.metadata.${metadata.destination}`]: {
|
||||
filter: {
|
||||
range: {
|
||||
'event.ingested': {
|
||||
|
|
|
@ -18,9 +18,11 @@ import { deleteIndices } from './delete_index';
|
|||
import { deleteHistoryIngestPipeline, deleteLatestIngestPipeline } from './delete_ingest_pipeline';
|
||||
import { findEntityDefinitions } from './find_entity_definition';
|
||||
import {
|
||||
stopAndDeleteHistoryBackfillTransform,
|
||||
stopAndDeleteHistoryTransform,
|
||||
stopAndDeleteLatestTransform,
|
||||
} from './stop_and_delete_transform';
|
||||
import { isBackfillEnabled } from './helpers/is_backfill_enabled';
|
||||
import { deleteTemplate } from '../manage_index_templates';
|
||||
|
||||
export async function uninstallEntityDefinition({
|
||||
|
@ -37,6 +39,9 @@ export async function uninstallEntityDefinition({
|
|||
deleteData?: boolean;
|
||||
}) {
|
||||
await stopAndDeleteHistoryTransform(esClient, definition, logger);
|
||||
if (isBackfillEnabled(definition)) {
|
||||
await stopAndDeleteHistoryBackfillTransform(esClient, definition, logger);
|
||||
}
|
||||
await stopAndDeleteLatestTransform(esClient, definition, logger);
|
||||
await deleteHistoryIngestPipeline(esClient, definition, logger);
|
||||
await deleteLatestIngestPipeline(esClient, definition, logger);
|
||||
|
|
|
@ -13,6 +13,7 @@ import { EntitySecurityException } from '../../lib/entities/errors/entity_securi
|
|||
import { InvalidTransformError } from '../../lib/entities/errors/invalid_transform_error';
|
||||
import { readEntityDefinition } from '../../lib/entities/read_entity_definition';
|
||||
import {
|
||||
stopAndDeleteHistoryBackfillTransform,
|
||||
stopAndDeleteHistoryTransform,
|
||||
stopAndDeleteLatestTransform,
|
||||
} from '../../lib/entities/stop_and_delete_transform';
|
||||
|
@ -26,11 +27,13 @@ import {
|
|||
createAndInstallLatestIngestPipeline,
|
||||
} from '../../lib/entities/create_and_install_ingest_pipeline';
|
||||
import {
|
||||
createAndInstallHistoryBackfillTransform,
|
||||
createAndInstallHistoryTransform,
|
||||
createAndInstallLatestTransform,
|
||||
} from '../../lib/entities/create_and_install_transform';
|
||||
import { startTransform } from '../../lib/entities/start_transform';
|
||||
import { EntityDefinitionNotFound } from '../../lib/entities/errors/entity_not_found';
|
||||
import { isBackfillEnabled } from '../../lib/entities/helpers/is_backfill_enabled';
|
||||
|
||||
export function resetEntityDefinitionRoute<T extends RequestHandlerContext>({
|
||||
router,
|
||||
|
@ -52,6 +55,9 @@ export function resetEntityDefinitionRoute<T extends RequestHandlerContext>({
|
|||
|
||||
// Delete the transform and ingest pipeline
|
||||
await stopAndDeleteHistoryTransform(esClient, definition, logger);
|
||||
if (isBackfillEnabled(definition)) {
|
||||
await stopAndDeleteHistoryBackfillTransform(esClient, definition, logger);
|
||||
}
|
||||
await stopAndDeleteLatestTransform(esClient, definition, logger);
|
||||
await deleteHistoryIngestPipeline(esClient, definition, logger);
|
||||
await deleteLatestIngestPipeline(esClient, definition, logger);
|
||||
|
@ -61,6 +67,9 @@ export function resetEntityDefinitionRoute<T extends RequestHandlerContext>({
|
|||
await createAndInstallHistoryIngestPipeline(esClient, definition, logger);
|
||||
await createAndInstallLatestIngestPipeline(esClient, definition, logger);
|
||||
await createAndInstallHistoryTransform(esClient, definition, logger);
|
||||
if (isBackfillEnabled(definition)) {
|
||||
await createAndInstallHistoryBackfillTransform(esClient, definition, logger);
|
||||
}
|
||||
await createAndInstallLatestTransform(esClient, definition, logger);
|
||||
await startTransform(esClient, definition, logger);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue