[Rule Migration] Add PackageClient to fetch integrations (#207197)

## Summary

Awaiting the changes to the EPR API to include datasets we temporarily
included a JSON file to simulate the expected outcome so we could move
forward while waiting.

Since the changes is now done, the calls to
https://epr.elastic.co/search? now returns data_streams in their
responses so this PR replaces the temp JSON with the proper
PackageClient implementation.

Tested a few migrations with both prebuilt rule matches and without, and
seems to be working as intended. The integration count also seems to be
consistent with what to be expected.
This commit is contained in:
Marius Iversen 2025-01-21 16:46:44 +01:00 committed by GitHub
parent 80500895fe
commit 1e5abbe8ee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 49 additions and 6916 deletions

View file

@ -9,16 +9,11 @@ import type { PackageList } from '@kbn/fleet-plugin/common';
import type { RuleMigrationIntegration } from '../types';
import { RuleMigrationsDataBaseClient } from './rule_migrations_data_base_client';
/* This will be removed once the package registry changes is performed */
import integrationsFile from './integrations_temp.json';
/* The minimum score required for a integration to be considered correct, might need to change this later */
const MIN_SCORE = 40 as const;
/* The number of integrations the RAG will return, sorted by score */
const RETURNED_INTEGRATIONS = 5 as const;
/* This is a temp implementation to allow further development until https://github.com/elastic/package-registry/issues/1252 */
const INTEGRATIONS = integrationsFile as RuleMigrationIntegration[];
/* BULK_MAX_SIZE defines the number to break down the bulk operations by.
* The 500 number was chosen as a reasonable number to avoid large payloads. It can be adjusted if needed.
*/
@ -28,32 +23,52 @@ export class RuleMigrationsDataIntegrationsClient extends RuleMigrationsDataBase
}
/** Indexes an array of integrations to be used with ELSER semantic search queries */
async create(): Promise<void> {
async populate(): Promise<void> {
const index = await this.getIndexName();
await this.esClient
.bulk(
{
refresh: 'wait_for',
operations: INTEGRATIONS.flatMap((integration) => [
{ update: { _index: index, _id: integration.id } },
{
doc: {
title: integration.title,
description: integration.description,
data_streams: integration.data_streams,
elser_embedding: integration.elser_embedding,
'@timestamp': new Date().toISOString(),
const packages = await this.dependencies.packageService?.asInternalUser.getPackages();
if (packages) {
const ragIntegrations = packages.map<RuleMigrationIntegration>((pkg) => ({
title: pkg.title,
id: pkg.name,
description: pkg?.description || '',
data_streams:
pkg.data_streams?.map((stream) => ({
dataset: stream.dataset,
index_pattern: `${stream.type}-${stream.dataset}-*`,
title: stream.title,
})) || [],
elser_embedding: [
pkg.title,
pkg.description,
...(pkg.data_streams?.map((stream) => stream.title) || []),
].join(' - '),
}));
await this.esClient
.bulk(
{
refresh: 'wait_for',
operations: ragIntegrations.flatMap((integration) => [
{ update: { _index: index, _id: integration.id } },
{
doc: {
title: integration.title,
description: integration.description,
data_streams: integration.data_streams,
elser_embedding: integration.elser_embedding,
},
doc_as_upsert: true,
},
doc_as_upsert: true,
},
]),
},
{ requestTimeout: 10 * 60 * 1000 }
)
.catch((error) => {
this.logger.error(`Error preparing integrations for SIEM migration ${error.message}`);
throw error;
});
]),
},
{ requestTimeout: 10 * 60 * 1000 }
)
.catch((error) => {
this.logger.error(`Error populating integrations for migration ${error.message}`);
throw error;
});
} else {
this.logger.warn('Package service not available, not able not populate integrations index');
}
}
/** Based on a LLM generated semantic string, returns the 5 best results with a score above 40 */

View file

@ -58,7 +58,7 @@ export const integrationsFieldMap: FieldMap<SchemaFieldMapKeys<RuleMigrationInte
id: { type: 'keyword', required: true },
title: { type: 'text', required: true },
description: { type: 'text', required: true },
data_streams: { type: 'nested', array: true, required: true },
data_streams: { type: 'object', array: true, required: true },
'data_streams.dataset': { type: 'keyword', required: true },
'data_streams.title': { type: 'text', required: true },
'data_streams.index_pattern': { type: 'keyword', required: true },

View file

@ -12,8 +12,7 @@ export class IntegrationRetriever {
constructor(private readonly clients: RuleMigrationsRetrieverClients) {}
public async populateIndex() {
// TODO: use Fleet API client for integration retrieval as an argument once feature is available
return this.clients.data.integrations.create();
return this.clients.data.integrations.populate();
}
public async getIntegrations(semanticString: string): Promise<RuleMigrationIntegration[]> {

View file

@ -8,12 +8,12 @@
import type { AuthenticatedUser, Logger } from '@kbn/core/server';
import { AbortError, abortSignalToPromise } from '@kbn/kibana-utils-plugin/server';
import type { RunnableConfig } from '@langchain/core/runnables';
import type { RuleMigrationFilters } from '../../../../../common/siem_migrations/types';
import {
SiemMigrationStatus,
SiemMigrationTaskStatus,
} from '../../../../../common/siem_migrations/constants';
import type { RuleMigrationTaskStats } from '../../../../../common/siem_migrations/model/rule_migration.gen';
import type { RuleMigrationFilters } from '../../../../../common/siem_migrations/types';
import type { RuleMigrationsDataClient } from '../data/rule_migrations_data_client';
import type { RuleMigrationDataStats } from '../data/rule_migrations_data_rules_client';
import type { SiemRuleMigrationsClientDependencies } from '../types';
@ -22,8 +22,8 @@ import type { MigrateRuleState } from './agent/types';
import { RuleMigrationsRetriever } from './retrievers';
import type {
MigrationAgent,
RuleMigrationTaskStartParams,
RuleMigrationTaskCreateAgentParams,
RuleMigrationTaskStartParams,
RuleMigrationTaskStartResult,
RuleMigrationTaskStopResult,
} from './types';
@ -169,7 +169,7 @@ export class RuleMigrationsTaskClient {
this.logger.info(`Abort signal received, stopping migration ID:${migrationId}`);
return;
} else {
this.logger.error(`Error processing migration ID:${migrationId}`, error);
this.logger.error(`Error processing migration ID:${migrationId} ${error}`);
}
} finally {
this.migrationsRunning.delete(migrationId);