WrapHits / bulk changes

This commit is contained in:
Madison Caldwell 2021-07-14 09:15:45 -04:00
parent b8185f2be2
commit a8c0b4e681
6 changed files with 53 additions and 36 deletions

View file

@ -24,24 +24,40 @@ export const createPersistenceRuleTypeFactory: CreatePersistenceRuleTypeFactory
...options,
services: {
...options.services,
alertWithPersistence: (alerts) => {
alertWithPersistence: async (alerts) => {
alerts.forEach((alert) => currentAlerts.push(alert));
return alerts.map((alert) =>
alertInstanceFactory(alert['kibana.rac.alert.uuid']! as string)
);
const numAlerts = currentAlerts.length;
logger.debug(`Found ${numAlerts} alerts.`);
if (ruleDataClient.isWriteEnabled() && numAlerts) {
const response = await ruleDataClient.getWriter().bulk({
body: currentAlerts.flatMap((event) => [{ index: {} }, event]),
});
alerts.map((alert) =>
alertInstanceFactory(alert['kibana.rac.alert.uuid']! as string)
);
return response;
/*
return {
createdSignals: alerts.map((alert) =>
alertInstanceFactory(alert['kibana.rac.alert.uuid']! as string)
),
errors: [],
};
*/
}
/*
return {
createdSignals: 0,
errors: [],
};
*/
},
},
});
const numAlerts = currentAlerts.length;
logger.debug(`Found ${numAlerts} alerts.`);
if (ruleDataClient.isWriteEnabled() && numAlerts) {
await ruleDataClient.getWriter().bulk({
body: currentAlerts.flatMap((event) => [{ index: {} }, event]),
});
}
return state;
},
};

View file

@ -5,6 +5,8 @@
* 2.0.
*/
import { ApiResponse } from '@elastic/elasticsearch';
import { BulkResponse } from '@elastic/elasticsearch/api/types';
import { Logger } from '@kbn/logging';
import { ESSearchRequest } from 'src/core/types/elasticsearch';
import {
@ -12,13 +14,15 @@ import {
AlertInstanceContext,
AlertInstanceState,
AlertTypeParams,
AlertTypeState,
} from '../../../alerting/server';
import { AlertTypeWithExecutor } from '../types';
import { RuleDataClient } from '../../target/types/server';
export type PersistenceAlertService<TAlertInstanceContext extends Record<string, unknown>> = (
alerts: Array<Record<string, unknown>>
) => Array<AlertInstance<AlertInstanceState, TAlertInstanceContext, string>>;
// ) => Array<AlertInstance<AlertInstanceState, TAlertInstanceContext, string>>;
) => Promise<ApiResponse<BulkResponse, unknown>>;
export type PersistenceAlertQueryService = (
query: ESSearchRequest

View file

@ -133,9 +133,12 @@ export const createEqlAlertType = (createOptions: {
}
if (alerts.length > 0) {
/*
alertWithPersistence(alerts).forEach((alert) => {
alert.scheduleActions('default', { server: 'server-test' });
});
*/
const bulkResponse = await alertWithPersistence(alerts);
}
return result;

View file

@ -40,23 +40,6 @@ export const bulkCreateFactory = <TAlertInstanceContext extends AlertInstanceCon
};
}
/*
export interface BulkRequest<TSource = unknown> extends RequestBase {
index?: IndexName
type?: Type
pipeline?: string
refresh?: Refresh
routing?: Routing
_source?: boolean | Fields
_source_excludes?: Fields
_source_includes?: Fields
timeout?: Time
wait_for_active_shards?: WaitForActiveShards
require_alias?: boolean
body?: (BulkOperationContainer | TSource)[]
}
*/
const start = performance.now();
const { body: response } = await alertWithPersistence(wrappedDocs);

View file

@ -5,10 +5,11 @@
* 2.0.
*/
import { TechnicalRuleFieldMaps } from '../../../../../../rule_registry/common/assets/field_maps/technical_rule_field_map';
import type { ConfigType } from '../../../../config';
import { buildBulkBody } from '../../signals/build_bulk_body';
import { filterDuplicateSignals } from '../../signals/filter_duplicate_signals';
import { SearchAfterAndBulkCreateParams, WrapHits, WrappedSignalHit } from '../../signals/types';
import { SearchAfterAndBulkCreateParams } from '../../signals/types';
import { generateId } from '../../signals/utils';
export const wrapHitsFactory = ({
@ -19,8 +20,8 @@ export const wrapHitsFactory = ({
ruleSO: SearchAfterAndBulkCreateParams['ruleSO'];
signalsIndex: string;
mergeStrategy: ConfigType['alertMergeStrategy'];
}): WrapHits => (events) => {
const wrappedDocs: WrappedSignalHit[] = events.flatMap((doc) => [
}) => (events) => {
const wrappedDocs: TechnicalRuleFieldMaps[] = events.flatMap((doc) => [
{
_index: signalsIndex,
_id: generateId(
@ -33,5 +34,6 @@ export const wrapHitsFactory = ({
},
]);
return filterDuplicateSignals(ruleSO.id, wrappedDocs, false);
// return filterDuplicateSignals(ruleSO.id, wrappedDocs, false);
return wrappedDocs;
};

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { estypes } from '@elastic/elasticsearch';
import { Logger } from '@kbn/logging';
import { ExceptionListItemSchema } from '@kbn/securitysolution-io-ts-list-types';
import { Moment } from 'moment';
@ -17,6 +18,7 @@ import {
} from '../../../../../alerting/common';
import { AlertType } from '../../../../../alerting/server';
import { ListClient } from '../../../../../lists/server';
import { TechnicalRuleFieldMaps } from '../../../../../rule_registry/common/assets/field_maps/technical_rule_field_map';
import {
AlertTypeWithExecutor,
PersistenceServices,
@ -57,7 +59,8 @@ export interface RunOpts<TParams extends RuleParams> {
from: Moment;
maxSignals: number;
};
wrapHits: WrapHits;
// wrapHits: WrapHits;
wrapHits: (hits: Array<estypes.SearchHit<RACAlert>>) => WrappedRACAlert[];
}
export type SecurityAlertTypeExecutor<
@ -97,3 +100,9 @@ export type CreateSecurityRuleTypeFactory = (options: {
type: SecurityAlertTypeWithExecutor<TState, TServices, TParams, TAlertInstanceContext>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
) => AlertTypeWithExecutor<TState, TParams, TAlertInstanceContext, any>;
export interface RACAlert extends TechnicalRuleFieldMaps {
todo?: undefined;
}
export type WrappedRACAlert = RACAlert;