[8.x] [Security Solution][Detection Engine] Split search request building from search (#216887) (#218262)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Security Solution][Detection Engine] Split search request building
from search (#216887)](https://github.com/elastic/kibana/pull/216887)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Marshall
Main","email":"55718608+marshallmain@users.noreply.github.com"},"sourceCommit":{"committedDate":"2025-04-15T12:19:34Z","message":"[Security
Solution][Detection Engine] Split search request building from search
(#216887)\n\n## Summary\n\nThis PR better separates the request building
logic in the detection\nengine from query building logic, removes
outdated error checking logic,\nupdates the `singleSearchAfter` `search`
call to no longer use the\nlegacy `meta: true` param, and improves
search response type
inference.","sha":"dee4dfbe5995614b82792b692775c150dc79635e","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:Detection
Engine","backport:version","v9.1.0","v8.19.0"],"title":"[Security
Solution][Detection Engine] Split search request building from
search","number":216887,"url":"https://github.com/elastic/kibana/pull/216887","mergeCommit":{"message":"[Security
Solution][Detection Engine] Split search request building from search
(#216887)\n\n## Summary\n\nThis PR better separates the request building
logic in the detection\nengine from query building logic, removes
outdated error checking logic,\nupdates the `singleSearchAfter` `search`
call to no longer use the\nlegacy `meta: true` param, and improves
search response type
inference.","sha":"dee4dfbe5995614b82792b692775c150dc79635e"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/216887","number":216887,"mergeCommit":{"message":"[Security
Solution][Detection Engine] Split search request building from search
(#216887)\n\n## Summary\n\nThis PR better separates the request building
logic in the detection\nengine from query building logic, removes
outdated error checking logic,\nupdates the `singleSearchAfter` `search`
call to no longer use the\nlegacy `meta: true` param, and improves
search response type
inference.","sha":"dee4dfbe5995614b82792b692775c150dc79635e"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->
This commit is contained in:
Marshall Main 2025-04-16 13:06:08 -04:00 committed by GitHub
parent 1db9686db1
commit 17ba3ab768
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 779 additions and 1418 deletions

View file

@ -436,7 +436,7 @@ export const sampleDocRiskScore = (riskScore?: unknown): SignalSourceHit => ({
sort: [],
});
export const sampleEmptyDocSearchResults = (): SignalSearchResponse => ({
export const sampleEmptyDocSearchResults = () => ({
took: 10,
timed_out: false,
_shards: {
@ -446,7 +446,10 @@ export const sampleEmptyDocSearchResults = (): SignalSearchResponse => ({
skipped: 0,
},
hits: {
total: 0,
total: {
value: 0,
relation: 'eq' as const,
},
max_score: 100,
hits: [],
},

View file

@ -22,7 +22,7 @@ export const getEventList = async ({
eventListConfig,
indexFields,
sortOrder = 'desc',
}: EventsOptions): Promise<estypes.SearchResponse<EventDoc>> => {
}: EventsOptions): Promise<estypes.SearchResponse<EventDoc, unknown>> => {
const {
inputIndex,
ruleExecutionLogger,
@ -53,14 +53,13 @@ export const getEventList = async ({
fields: indexFields,
});
const { searchResult } = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: undefined,
searchAfterSortIds: searchAfter,
index: inputIndex,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
pageSize: calculatedPerPage,
size: calculatedPerPage,
filter: queryFilter,
primaryTimestamp,
secondaryTimestamp,
@ -70,6 +69,12 @@ export const getEventList = async ({
overrideBody: eventListConfig,
});
const { searchResult } = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
});
ruleExecutionLogger.debug(`Retrieved events items of size: ${searchResult.hits.hits.length}`);
return searchResult;
};
@ -98,6 +103,7 @@ export const getEventCount = async ({
fields: indexFields,
});
const eventSearchQueryBodyQuery = buildEventsSearchQuery({
aggregations: undefined,
index,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
@ -107,7 +113,7 @@ export const getEventCount = async ({
secondaryTimestamp,
searchAfterSortIds: undefined,
runtimeMappings: undefined,
}).body?.query;
}).query;
const response = await esClient.count({
body: { query: eventSearchQueryBodyQuery },
ignore_unavailable: true,

View file

@ -236,7 +236,7 @@ export interface SignalMatch {
export type GetDocumentListInterface = (params: {
searchAfter: estypes.SortResults | undefined;
}) => Promise<estypes.SearchResponse<EventDoc | ThreatListDoc>>;
}) => Promise<estypes.SearchResponse<EventDoc | ThreatListDoc, unknown>>;
export type CreateSignalInterface = (
params: EventItem[] | ThreatListItem[]

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { set } from '@kbn/safer-lodash-set';
import dateMath from '@kbn/datemath';
import type { KibanaRequest, SavedObjectsClientContract } from '@kbn/core/server';
import type { MlPluginSetup } from '@kbn/ml-plugin/server';
@ -54,9 +55,9 @@ export const findMlSignals = async ({
if (isLoggedRequestsEnabled) {
const searchQuery = buildAnomalyQuery(params);
searchQuery.index = '.ml-anomalies-*';
set(searchQuery, 'body.index', '.ml-anomalies-*');
loggedRequests.push({
request: logSearchRequest(searchQuery),
request: logSearchRequest(searchQuery.body),
description: i18n.ML_SEARCH_ANOMALIES_DESCRIPTION,
duration: anomalyResults.took,
request_type: 'findAnomalies',

View file

@ -11,16 +11,6 @@ import type { SignalSource } from '../types';
import type { GenericBulkCreateResponse } from '../factories/bulk_create_factory';
import type { NewTermsFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';
export type RecentTermsAggResult = ESSearchResponse<
SignalSource,
{ body: { aggregations: ReturnType<typeof buildRecentTermsAgg> } }
>;
export type NewTermsAggResult = ESSearchResponse<
SignalSource,
{ body: { aggregations: ReturnType<typeof buildNewTermsAgg> } }
>;
export type CompositeDocFetchAggResult = ESSearchResponse<
SignalSource,
{ body: { aggregations: ReturnType<typeof buildCompositeDocFetchAgg> } }

View file

@ -14,16 +14,12 @@ import { SERVER_APP_ID } from '../../../../../common/constants';
import { NewTermsRuleParams } from '../../rule_schema';
import type { SecurityAlertType } from '../types';
import { singleSearchAfter } from '../utils/single_search_after';
import { buildEventsSearchQuery } from '../utils/build_events_query';
import { getFilter } from '../utils/get_filter';
import { wrapNewTermsAlerts } from './wrap_new_terms_alerts';
import { bulkCreateSuppressedNewTermsAlertsInMemory } from './bulk_create_suppressed_alerts_in_memory';
import type { EventsAndTerms } from './types';
import type {
RecentTermsAggResult,
DocFetchAggResult,
NewTermsAggResult,
CreateAlertsHook,
} from './build_new_terms_aggregation';
import type { CreateAlertsHook } from './build_new_terms_aggregation';
import type { NewTermsFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';
import {
buildRecentTermsAgg,
@ -149,7 +145,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
alertSuppression: params.alertSuppression,
licensing,
});
let afterKey;
let afterKey: Record<string, string | number | null> | undefined;
const result = createSearchAfterReturnType();
@ -170,12 +166,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// PHASE 1: Fetch a page of terms using a composite aggregation. This will collect a page from
// all of the terms seen over the last rule interval. In the next phase we'll determine which
// ones are new.
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: firstPhaseLoggedRequests = [],
} = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: buildRecentTermsAgg({
fields: params.newTermsFields,
after: afterKey,
@ -185,13 +176,22 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// The time range for the initial composite aggregation is the rule interval, `from` and `to`
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
runtimeMappings,
});
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: firstPhaseLoggedRequests = [],
} = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findAllTerms',
@ -203,8 +203,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
: undefined,
});
loggedRequests.push(...firstPhaseLoggedRequests);
const searchResultWithAggs = searchResult as RecentTermsAggResult;
if (!searchResultWithAggs.aggregations) {
if (!searchResult.aggregations) {
throw new Error('Aggregations were missing on recent terms search result');
}
logger.debug(`Time spent on composite agg: ${searchDuration}`);
@ -214,10 +213,10 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// If the aggregation returns no after_key it signals that we've paged through all results
// and the current page is empty so we can immediately break.
if (searchResultWithAggs.aggregations.new_terms.after_key == null) {
if (searchResult.aggregations.new_terms.after_key == null) {
break;
}
const bucketsForField = searchResultWithAggs.aggregations.new_terms.buckets;
const bucketsForField = searchResult.aggregations.new_terms.buckets;
const createAlertsHook: CreateAlertsHook = async (aggResult) => {
const eventsAndTerms: EventsAndTerms[] = (
@ -311,12 +310,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// The aggregation filters out buckets for terms that exist prior to `tuple.from`, so the buckets in the
// response correspond to each new term.
const includeValues = transformBucketsToValues(params.newTermsFields, bucketsForField);
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
const pageSearchRequest = buildEventsSearchQuery({
aggregations: buildNewTermsAgg({
newValueWindowStart: tuple.from,
timestampField: aggregatableTimestampField,
@ -330,12 +324,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// in addition to the rule interval
from: parsedHistoryWindowSize.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: pageSearchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findNewTerms',
@ -350,8 +352,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
logger.debug(`Time spent on phase 2 terms agg: ${pageSearchDuration}`);
const pageSearchResultWithAggs = pageSearchResult as NewTermsAggResult;
if (!pageSearchResultWithAggs.aggregations) {
if (!pageSearchResult.aggregations) {
throw new Error('Aggregations were missing on new terms search result');
}
@ -359,17 +360,12 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// the rule interval for that term. This is the first document to contain the new term, and will
// become the basis of the resulting alert.
// One document could become multiple alerts if the document contains an array with multiple new terms.
if (pageSearchResultWithAggs.aggregations.new_terms.buckets.length > 0) {
const actualNewTerms = pageSearchResultWithAggs.aggregations.new_terms.buckets.map(
if (pageSearchResult.aggregations.new_terms.buckets.length > 0) {
const actualNewTerms = pageSearchResult.aggregations.new_terms.buckets.map(
(bucket) => bucket.key
);
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
const docFetchSearchRequest = buildEventsSearchQuery({
aggregations: buildDocFetchAgg({
timestampField: aggregatableTimestampField,
field: params.newTermsFields[0],
@ -381,12 +377,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// For phase 3, we go back to aggregating only over the rule interval - excluding the history window
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: docFetchSearchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findDocuments',
@ -401,13 +405,11 @@ export const createNewTermsAlertType = (): SecurityAlertType<
result.errors.push(...docFetchSearchErrors);
loggedRequests.push(...docFetchLoggedRequests);
const docFetchResultWithAggs = docFetchSearchResult as DocFetchAggResult;
if (!docFetchResultWithAggs.aggregations) {
if (!docFetchSearchResult.aggregations) {
throw new Error('Aggregations were missing on document fetch search result');
}
const bulkCreateResult = await createAlertsHook(docFetchResultWithAggs);
const bulkCreateResult = await createAlertsHook(docFetchSearchResult);
if (bulkCreateResult.alertsWereTruncated) {
result.warningMessages.push(
@ -420,7 +422,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
}
}
afterKey = searchResultWithAggs.aggregations.new_terms.after_key;
afterKey = searchResult.aggregations.new_terms.after_key;
}
scheduleNotificationResponseActionsService({

View file

@ -28,6 +28,7 @@ import {
stringifyAfterKey,
} from '../utils/utils';
import type { GenericBulkCreateResponse } from '../utils/bulk_create_with_suppression';
import { buildEventsSearchQuery } from '../utils/build_events_query';
import type {
SecurityRuleServices,
@ -135,12 +136,7 @@ const multiTermsCompositeNonRetryable = async ({
// PHASE 2: Take the page of results from Phase 1 and determine if each term exists in the history window.
// The aggregation filters out buckets for terms that exist prior to `tuple.from`, so the buckets in the
// response correspond to each new term.
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: buildCompositeNewTermsAgg({
newValueWindowStart: tuple.from,
timestampField: aggregatableTimestampField,
@ -155,12 +151,20 @@ const multiTermsCompositeNonRetryable = async ({
// in addition to the rule interval
from: parsedHistoryWindowSize.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilterForBatch,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findNewTerms',
@ -187,12 +191,7 @@ const multiTermsCompositeNonRetryable = async ({
// become the basis of the resulting alert.
// One document could become multiple alerts if the document contains an array with multiple new terms.
if (pageSearchResultWithAggs.aggregations.new_terms.buckets.length > 0) {
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
const searchRequestPhase3 = buildEventsSearchQuery({
aggregations: buildCompositeDocFetchAgg({
newValueWindowStart: tuple.from,
timestampField: aggregatableTimestampField,
@ -205,12 +204,20 @@ const multiTermsCompositeNonRetryable = async ({
index: inputIndex,
from: parsedHistoryWindowSize.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilterForBatch,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: searchRequestPhase3,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findDocuments',

View file

@ -10,7 +10,7 @@ import type moment from 'moment';
import type * as estypes from '@elastic/elasticsearch/lib/api/types';
import { withSecuritySpan } from '../../../../../utils/with_security_span';
import { buildTimeRangeFilter } from '../../utils/build_events_query';
import { buildEventsSearchQuery, buildTimeRangeFilter } from '../../utils/build_events_query';
import type {
SecurityRuleServices,
SecuritySharedParams,
@ -186,29 +186,31 @@ export const groupAndBulkCreate = async ({
missingBucket: suppressOnMissingFields,
});
const eventsSearchParams = {
const searchRequest = buildEventsSearchQuery({
aggregations: groupingAggregation,
searchAfterSortIds: undefined,
index: sharedParams.inputIndex,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger: sharedParams.ruleExecutionLogger,
filter,
pageSize: 0,
size: 0,
primaryTimestamp: sharedParams.primaryTimestamp,
secondaryTimestamp: sharedParams.secondaryTimestamp,
runtimeMappings: sharedParams.runtimeMappings,
additionalFilters: bucketHistoryFilter,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findDocuments',
description: i18n.FIND_EVENTS_DESCRIPTION,
}
: undefined,
};
});
const { searchResult, searchDuration, searchErrors, loggedRequests } =
await singleSearchAfter(eventsSearchParams);
await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger: sharedParams.ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findDocuments',
description: i18n.FIND_EVENTS_DESCRIPTION,
}
: undefined,
});
if (isLoggedRequestsEnabled) {
toReturn.loggedRequests = loggedRequests;

View file

@ -5,8 +5,6 @@
* 2.0.
*/
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { allowedExperimentalValues } from '../../../../../common/experimental_features';
import { createQueryAlertType } from './create_query_alert_type';
import { createRuleTypeMocks } from '../__mocks__/rule_type';
@ -82,27 +80,23 @@ describe('Custom Query Alerts', () => {
alerting.registerType(queryAlertType);
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [],
sequences: [],
events: [],
total: {
relation: 'eq',
value: 0,
},
services.scopedClusterClient.asCurrentUser.search.mockResolvedValue({
hits: {
hits: [],
total: {
relation: 'eq',
value: 0,
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
});
const params = getQueryRuleParams();
@ -124,27 +118,23 @@ describe('Custom Query Alerts', () => {
alerting.registerType(queryAlertType);
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [sampleDocNoSortId()],
sequences: [],
events: [],
total: {
relation: 'eq',
value: 1,
},
services.scopedClusterClient.asCurrentUser.search.mockResolvedValue({
hits: {
hits: [sampleDocNoSortId()],
total: {
relation: 'eq',
value: 1,
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
});
const params = getQueryRuleParams();
@ -167,27 +157,23 @@ describe('Custom Query Alerts', () => {
alerting.registerType(queryAlertType);
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [sampleDocNoSortId()],
sequences: [],
events: [],
total: {
relation: 'eq',
value: 1,
},
services.scopedClusterClient.asCurrentUser.search.mockResolvedValue({
hits: {
hits: [sampleDocNoSortId()],
total: {
relation: 'eq',
value: 1,
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
});
const params = getQueryRuleParams();

View file

@ -9,7 +9,7 @@ import type { SuppressionFieldsLatest } from '@kbn/rule-registry-plugin/common/s
import type { SearchHit } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { buildReasonMessageForThresholdAlert } from '../utils/reason_formatters';
import type { ThresholdBucket } from './types';
import type { ThresholdCompositeBucket } from './types';
import type { SecurityRuleServices, SecuritySharedParams } from '../types';
import type { ThresholdRuleParams } from '../../rule_schema';
import type { BaseFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';
@ -20,7 +20,7 @@ import { transformBulkCreatedItemsToHits } from './utils';
interface BulkCreateSuppressedThresholdAlertsParams {
sharedParams: SecuritySharedParams<ThresholdRuleParams>;
buckets: ThresholdBucket[];
buckets: ThresholdCompositeBucket[];
services: SecurityRuleServices;
startedAt: Date;
}

View file

@ -11,7 +11,7 @@ import type { ThresholdNormalized } from '../../../../../common/api/detection_en
import type { GenericBulkCreateResponse } from '../factories/bulk_create_factory';
import { calculateThresholdSignalUuid } from './utils';
import { buildReasonMessageForThresholdAlert } from '../utils/reason_formatters';
import type { ThresholdBucket } from './types';
import type { ThresholdCompositeBucket } from './types';
import type { SecurityRuleServices, SecuritySharedParams } from '../types';
import type { ThresholdRuleParams } from '../../rule_schema';
import type { BaseFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';
@ -19,13 +19,13 @@ import { bulkCreate, wrapHits } from '../factories';
interface BulkCreateThresholdSignalsParams {
sharedParams: SecuritySharedParams<ThresholdRuleParams>;
buckets: ThresholdBucket[];
buckets: ThresholdCompositeBucket[];
services: SecurityRuleServices;
startedAt: Date;
}
export const transformBucketIntoHit = (
bucket: ThresholdBucket,
bucket: ThresholdCompositeBucket,
inputIndex: string,
startedAt: Date,
from: Date,
@ -65,7 +65,7 @@ export const transformBucketIntoHit = (
};
export const getTransformedHits = (
buckets: ThresholdBucket[],
buckets: ThresholdCompositeBucket[],
inputIndex: string,
startedAt: Date,
from: Date,

View file

@ -1,410 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { RuleExecutorServicesMock } from '@kbn/alerting-plugin/server/mocks';
import { alertsMock } from '@kbn/alerting-plugin/server/mocks';
import { sampleEmptyDocSearchResults } from '../__mocks__/es_results';
import * as single_search_after from '../utils/single_search_after';
import { findThresholdSignals } from './find_threshold_signals';
import { TIMESTAMP } from '@kbn/rule-data-utils';
import { ruleExecutionLogMock } from '../../rule_monitoring/mocks';
import { buildTimestampRuntimeMapping } from '../utils';
import { TIMESTAMP_RUNTIME_FIELD } from '../constants';
import { getQueryFilter } from '../utils/get_query_filter';
import type { ESBoolQuery } from '../../../../../common/typed_json';
const mockSingleSearchAfter = jest.fn(async () => ({
searchResult: {
...sampleEmptyDocSearchResults(),
aggregations: {
thresholdTerms: {
buckets: [],
},
},
},
searchDuration: '0.0',
searchErrors: [],
}));
let filter: ESBoolQuery;
describe('findThresholdSignals', () => {
let mockService: RuleExecutorServicesMock;
const ruleExecutionLogger = ruleExecutionLogMock.forExecutors.create();
beforeEach(() => {
jest.clearAllMocks();
jest.spyOn(single_search_after, 'singleSearchAfter').mockImplementation(mockSingleSearchAfter);
mockService = alertsMock.createRuleExecutorServices();
const queryFilter = getQueryFilter({
query: '',
language: 'kuery',
filters: [],
index: ['*'],
exceptionFilter: undefined,
});
filter = queryFilter;
});
it('should generate a threshold signal query when only a value is provided', async () => {
await findThresholdSignals({
from: 'now-6m',
to: 'now',
maxSignals: 100,
inputIndexPattern: ['*'],
services: mockService,
ruleExecutionLogger,
filter,
threshold: {
field: [],
value: 100,
},
runtimeMappings: undefined,
primaryTimestamp: TIMESTAMP,
secondaryTimestamp: undefined,
aggregatableTimestampField: TIMESTAMP,
});
expect(mockSingleSearchAfter).toHaveBeenCalledWith(
expect.objectContaining({
aggregations: {
max_timestamp: {
max: {
field: '@timestamp',
},
},
min_timestamp: {
min: {
field: '@timestamp',
},
},
},
})
);
});
it('should generate a threshold signal query when a field and value are provided', async () => {
await findThresholdSignals({
from: 'now-6m',
to: 'now',
maxSignals: 100,
inputIndexPattern: ['*'],
services: mockService,
ruleExecutionLogger,
filter,
threshold: {
field: ['host.name'],
value: 100,
},
runtimeMappings: undefined,
primaryTimestamp: TIMESTAMP,
secondaryTimestamp: undefined,
aggregatableTimestampField: TIMESTAMP,
});
expect(mockSingleSearchAfter).toHaveBeenCalledWith(
expect.objectContaining({
aggregations: {
thresholdTerms: {
composite: {
size: 10000,
after: undefined,
sources: [
{
'host.name': {
terms: {
field: 'host.name',
},
},
},
],
},
aggs: {
max_timestamp: {
max: {
field: '@timestamp',
},
},
min_timestamp: {
min: {
field: '@timestamp',
},
},
count_check: {
bucket_selector: {
buckets_path: {
docCount: '_count',
},
script: `params.docCount >= 100`,
},
},
},
},
},
})
);
});
it('should generate a threshold signal query when multiple fields and a value are provided', async () => {
await findThresholdSignals({
from: 'now-6m',
to: 'now',
maxSignals: 100,
inputIndexPattern: ['*'],
services: mockService,
ruleExecutionLogger,
filter,
threshold: {
field: ['host.name', 'user.name'],
value: 100,
cardinality: [],
},
runtimeMappings: undefined,
primaryTimestamp: TIMESTAMP,
secondaryTimestamp: undefined,
aggregatableTimestampField: TIMESTAMP,
});
expect(mockSingleSearchAfter).toHaveBeenCalledWith(
expect.objectContaining({
aggregations: {
thresholdTerms: {
composite: {
size: 10000,
after: undefined,
sources: [
{
'host.name': {
terms: {
field: 'host.name',
},
},
},
{
'user.name': {
terms: {
field: 'user.name',
},
},
},
],
},
aggs: {
max_timestamp: {
max: {
field: '@timestamp',
},
},
min_timestamp: {
min: {
field: '@timestamp',
},
},
count_check: {
bucket_selector: {
buckets_path: {
docCount: '_count',
},
script: `params.docCount >= 100`,
},
},
},
},
},
})
);
});
it('should generate a threshold signal query when multiple fields, a value, and cardinality field/value are provided', async () => {
await findThresholdSignals({
from: 'now-6m',
to: 'now',
maxSignals: 100,
inputIndexPattern: ['*'],
services: mockService,
ruleExecutionLogger,
filter,
threshold: {
field: ['host.name', 'user.name'],
value: 100,
cardinality: [
{
field: 'destination.ip',
value: 2,
},
],
},
runtimeMappings: undefined,
primaryTimestamp: TIMESTAMP,
secondaryTimestamp: undefined,
aggregatableTimestampField: TIMESTAMP,
});
expect(mockSingleSearchAfter).toHaveBeenCalledWith(
expect.objectContaining({
aggregations: {
thresholdTerms: {
composite: {
size: 10000,
after: undefined,
sources: [
{
'host.name': {
terms: {
field: 'host.name',
},
},
},
{
'user.name': {
terms: {
field: 'user.name',
},
},
},
],
},
aggs: {
max_timestamp: {
max: {
field: '@timestamp',
},
},
min_timestamp: {
min: {
field: '@timestamp',
},
},
count_check: {
bucket_selector: {
buckets_path: {
docCount: '_count',
},
script: `params.docCount >= 100`,
},
},
cardinality_count: {
cardinality: {
field: 'destination.ip',
},
},
cardinality_check: {
bucket_selector: {
buckets_path: {
cardinalityCount: 'cardinality_count',
},
script: 'params.cardinalityCount >= 2',
},
},
},
},
},
})
);
});
it('should generate a threshold signal query when only a value and a cardinality field/value are provided', async () => {
await findThresholdSignals({
from: 'now-6m',
to: 'now',
maxSignals: 100,
inputIndexPattern: ['*'],
services: mockService,
ruleExecutionLogger,
filter,
threshold: {
cardinality: [
{
field: 'source.ip',
value: 5,
},
],
field: [],
value: 200,
},
runtimeMappings: undefined,
primaryTimestamp: TIMESTAMP,
secondaryTimestamp: undefined,
aggregatableTimestampField: TIMESTAMP,
});
expect(mockSingleSearchAfter).toHaveBeenCalledWith(
expect.objectContaining({
aggregations: {
cardinality_count: {
cardinality: {
field: 'source.ip',
},
},
max_timestamp: {
max: {
field: '@timestamp',
},
},
min_timestamp: {
min: {
field: '@timestamp',
},
},
},
})
);
});
it('should generate a threshold signal query with timestamp override', async () => {
const timestampOverride = 'event.ingested';
const { aggregatableTimestampField, timestampRuntimeMappings } = {
aggregatableTimestampField: TIMESTAMP_RUNTIME_FIELD,
timestampRuntimeMappings: buildTimestampRuntimeMapping({
timestampOverride,
}),
};
await findThresholdSignals({
from: 'now-6m',
to: 'now',
maxSignals: 100,
inputIndexPattern: ['*'],
services: mockService,
ruleExecutionLogger,
filter,
threshold: {
cardinality: [
{
field: 'source.ip',
value: 5,
},
],
field: [],
value: 200,
},
runtimeMappings: timestampRuntimeMappings,
primaryTimestamp: timestampOverride,
secondaryTimestamp: TIMESTAMP,
aggregatableTimestampField,
});
expect(mockSingleSearchAfter).toHaveBeenCalledWith(
expect.objectContaining({
primaryTimestamp: timestampOverride,
secondaryTimestamp: TIMESTAMP,
runtimeMappings: buildTimestampRuntimeMapping({ timestampOverride }),
aggregations: {
cardinality_count: {
cardinality: {
field: 'source.ip',
},
},
max_timestamp: {
max: {
field: TIMESTAMP_RUNTIME_FIELD,
},
},
min_timestamp: {
min: {
field: TIMESTAMP_RUNTIME_FIELD,
},
},
},
})
);
});
});

View file

@ -20,16 +20,13 @@ import type {
TimestampOverride,
} from '../../../../../common/api/detection_engine/model/rule_schema';
import { singleSearchAfter } from '../utils/single_search_after';
import { buildEventsSearchQuery } from '../utils/build_events_query';
import {
buildThresholdMultiBucketAggregation,
buildThresholdSingleBucketAggregation,
} from './build_threshold_aggregation';
import type {
ThresholdMultiBucketAggregationResult,
ThresholdBucket,
ThresholdSingleBucketAggregationResult,
} from './types';
import { shouldFilterByCardinality, searchResultHasAggs } from './utils';
import type { ThresholdCompositeBucket } from './types';
import { shouldFilterByCardinality } from './utils';
import type { IRuleExecutionLogForExecutors } from '../../rule_monitoring';
import { getMaxSignalsWarning, stringifyAfterKey } from '../utils/utils';
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
@ -58,7 +55,6 @@ interface SearchAfterResults {
searchErrors: string[];
}
// eslint-disable-next-line complexity
export const findThresholdSignals = async ({
from,
to,
@ -74,14 +70,14 @@ export const findThresholdSignals = async ({
aggregatableTimestampField,
isLoggedRequestsEnabled,
}: FindThresholdSignalsParams): Promise<{
buckets: ThresholdBucket[];
buckets: ThresholdCompositeBucket[];
searchDurations: string[];
searchErrors: string[];
warnings: string[];
loggedRequests?: RulePreviewLoggedRequest[];
}> => {
// Leaf aggregations used below
const buckets: ThresholdBucket[] = [];
const buckets: ThresholdCompositeBucket[] = [];
const searchAfterResults: SearchAfterResults = {
searchDurations: [],
searchErrors: [],
@ -94,29 +90,33 @@ export const findThresholdSignals = async ({
if (hasThresholdFields(threshold)) {
let sortKeys: Record<string, string | number | null> | undefined;
do {
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: thresholdLoggedRequests,
} = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: buildThresholdMultiBucketAggregation({
threshold,
aggregatableTimestampField,
sortKeys,
}),
index: inputIndexPattern,
searchAfterSortIds: undefined,
from,
to,
services,
ruleExecutionLogger,
filter,
pageSize: 0,
sortOrder: 'desc',
runtimeMappings,
filter,
size: 0,
sortOrder: 'desc',
searchAfterSortIds: undefined,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: thresholdLoggedRequests,
} = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findThresholdBuckets',
@ -134,40 +134,40 @@ export const findThresholdSignals = async ({
sortKeys = undefined; // this will eject us out of the loop
// if a search failure occurs on a secondary iteration,
// we will return early.
} else if (searchResultHasAggs<ThresholdMultiBucketAggregationResult>(searchResult)) {
const thresholdTerms = searchResult.aggregations?.thresholdTerms;
sortKeys = thresholdTerms?.after_key;
buckets.push(
...((searchResult.aggregations?.thresholdTerms.buckets as ThresholdBucket[]) ?? [])
);
} else if (searchResult.aggregations != null) {
const thresholdTerms = searchResult.aggregations.thresholdTerms;
sortKeys = thresholdTerms.after_key;
buckets.push(...thresholdTerms.buckets);
} else {
throw new Error('Aggregations were missing on threshold rule search result');
}
} while (sortKeys && buckets.length <= maxSignals);
} else {
const searchRequest = buildEventsSearchQuery({
aggregations: buildThresholdSingleBucketAggregation({
threshold,
aggregatableTimestampField,
}),
index: inputIndexPattern,
from,
to,
runtimeMappings,
filter,
size: 0,
sortOrder: 'desc',
searchAfterSortIds: undefined,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: thresholdLoggedRequests,
} = await singleSearchAfter({
aggregations: buildThresholdSingleBucketAggregation({
threshold,
aggregatableTimestampField,
}),
searchAfterSortIds: undefined,
index: inputIndexPattern,
from,
to,
searchRequest,
services,
ruleExecutionLogger,
filter,
pageSize: 0,
sortOrder: 'desc',
trackTotalHits: true,
runtimeMappings,
primaryTimestamp,
secondaryTimestamp,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findThresholdBuckets',
@ -180,12 +180,9 @@ export const findThresholdSignals = async ({
searchAfterResults.searchErrors.push(...searchErrors);
loggedRequests.push(...(thresholdLoggedRequests ?? []));
if (
!searchResultHasAggs<ThresholdSingleBucketAggregationResult>(searchResult) &&
isEmpty(searchErrors)
) {
throw new Error('Aggregations were missing on threshold rule search result');
} else if (searchResultHasAggs<ThresholdSingleBucketAggregationResult>(searchResult)) {
if (isEmpty(searchErrors)) {
searchAfterResults.searchErrors.push(...searchErrors);
} else if (searchResult.aggregations != null) {
const docCount = searchResult.hits.total.value;
if (
docCount >= threshold.value &&
@ -196,13 +193,13 @@ export const findThresholdSignals = async ({
buckets.push({
doc_count: docCount,
key: {},
max_timestamp: searchResult.aggregations?.max_timestamp ?? { value: null },
min_timestamp: searchResult.aggregations?.min_timestamp ?? { value: null },
...(includeCardinalityFilter
? { cardinality_count: searchResult.aggregations?.cardinality_count }
: {}),
max_timestamp: searchResult.aggregations.max_timestamp,
min_timestamp: searchResult.aggregations.min_timestamp,
cardinality_count: searchResult.aggregations.cardinality_count,
});
}
} else {
throw new Error('Aggregations were missing on threshold rule search result');
}
}

View file

@ -6,7 +6,6 @@
*/
import dateMath from '@kbn/datemath';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { getExceptionListItemSchemaMock } from '@kbn/lists-plugin/common/schemas/response/exception_list_item_schema.mock';
import { licensingMock } from '@kbn/licensing-plugin/server/mocks';
import { thresholdExecutor } from './threshold';
@ -48,14 +47,12 @@ describe('threshold_executor', () => {
sharedParams.ruleExecutionLogger = ruleExecutionLogger;
beforeEach(() => {
ruleServices = createPersistenceExecutorOptionsMock();
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
...sampleEmptyAggsSearchResults(),
aggregations: {
thresholdTerms: { buckets: [] },
},
})
);
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValue({
...sampleEmptyAggsSearchResults(),
aggregations: {
thresholdTerms: { buckets: [] },
},
});
mockScheduledNotificationResponseAction = jest.fn();
});

View file

@ -7,7 +7,7 @@
import type {
AggregationsCardinalityAggregate,
AggregationsCompositeBucket,
AggregationsCompositeBucketKeys,
AggregationsMaxAggregate,
AggregationsMinAggregate,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
@ -44,8 +44,7 @@ export type ThresholdSingleBucketAggregationResult = ESSearchResponse<
}
>;
export type ThresholdCompositeBucket = AggregationsCompositeBucket & ThresholdLeafAggregates;
export type ThresholdBucket = ThresholdCompositeBucket;
export type ThresholdCompositeBucket = AggregationsCompositeBucketKeys & ThresholdLeafAggregates;
export interface ThresholdResult {
terms?: Array<{

View file

@ -24,7 +24,7 @@ import type {
} from '../../../../../common/api/detection_engine/model/alerts';
import { transformHitToAlert } from '../factories/utils/transform_hit_to_alert';
import type { ThresholdBucket } from './types';
import type { ThresholdCompositeBucket } from './types';
import type { BuildReasonMessage } from '../utils/reason_formatters';
import { transformBucketIntoHit } from './bulk_create_threshold_signals';
import type { SecuritySharedParams } from '../types';
@ -43,7 +43,7 @@ export const wrapSuppressedThresholdALerts = ({
startedAt,
}: {
sharedParams: SecuritySharedParams<ThresholdRuleParams>;
buckets: ThresholdBucket[];
buckets: ThresholdCompositeBucket[];
buildReasonMessage: BuildReasonMessage;
startedAt: Date;
}): Array<WrappedFieldsLatest<BaseFieldsLatest & SuppressionFieldsLatest>> => {

View file

@ -10,6 +10,7 @@ import { buildEventsSearchQuery } from './build_events_query';
describe('create_signals', () => {
test('it builds a now-5m up to today filter', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -24,48 +25,47 @@ describe('create_signals', () => {
allow_no_indices: true,
index: ['auditbeat-*'],
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
],
},
],
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
],
});
});
test('it builds a now-5m up to today filter with timestampOverride', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -80,89 +80,88 @@ describe('create_signals', () => {
allow_no_indices: true,
index: ['auditbeat-*'],
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
bool: {
should: [
{
range: {
'event.ingested': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
size: 100,
query: {
bool: {
filter: [
{},
{
bool: {
should: [
{
range: {
'event.ingested': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
},
{
bool: {
filter: [
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
},
},
},
{
bool: {
filter: [
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
{
bool: {
must_not: {
exists: {
field: 'event.ingested',
},
},
},
{
bool: {
must_not: {
exists: {
field: 'event.ingested',
},
},
},
},
],
},
},
],
},
],
minimum_should_match: 1,
},
},
],
minimum_should_match: 1,
},
],
},
],
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: 'event.ingested',
format: 'strict_date_optional_time',
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
sort: [
{
'event.ingested': {
order: 'asc',
unmapped_type: 'date',
},
},
fields: [
{
field: '*',
include_unmapped: true,
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
{
field: 'event.ingested',
format: 'strict_date_optional_time',
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
sort: [
{
'event.ingested': {
order: 'asc',
unmapped_type: 'date',
},
},
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
},
],
});
});
test('it builds a filter without @timestamp fallback if `secondaryTimestamp` is undefined', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -177,49 +176,48 @@ describe('create_signals', () => {
allow_no_indices: true,
index: ['auditbeat-*'],
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'event.ingested': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'event.ingested': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
],
},
],
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: 'event.ingested',
format: 'strict_date_optional_time',
},
],
sort: [
{
'event.ingested': {
order: 'asc',
unmapped_type: 'date',
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: 'event.ingested',
format: 'strict_date_optional_time',
},
],
sort: [
{
'event.ingested': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
],
});
});
test('if searchAfterSortIds is a valid sortId string', () => {
const fakeSortId = '123456789012';
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -234,49 +232,48 @@ describe('create_signals', () => {
allow_no_indices: true,
index: ['auditbeat-*'],
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
],
},
],
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
search_after: [fakeSortId],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
search_after: [fakeSortId],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
],
});
});
test('if searchAfterSortIds is a valid sortId number', () => {
const fakeSortIdNumber = 123456789012;
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -291,48 +288,47 @@ describe('create_signals', () => {
allow_no_indices: true,
index: ['auditbeat-*'],
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
],
},
],
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
search_after: [fakeSortIdNumber],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
search_after: [fakeSortIdNumber],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
],
});
});
test('if aggregations is not provided it should not be included', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -347,43 +343,41 @@ describe('create_signals', () => {
allow_no_indices: true,
index: ['auditbeat-*'],
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
],
},
],
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
],
});
});
@ -410,55 +404,54 @@ describe('create_signals', () => {
allow_no_indices: true,
index: ['auditbeat-*'],
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
],
},
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
aggregations: {
tags: {
terms: {
field: 'tag',
},
},
],
},
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
fields: [
{
field: '*',
include_unmapped: true,
},
{
field: '@timestamp',
format: 'strict_date_optional_time',
},
],
aggregations: {
tags: {
terms: {
field: 'tag',
},
},
},
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
});
});
test('if trackTotalHits is provided it should be included', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -470,11 +463,12 @@ describe('create_signals', () => {
trackTotalHits: false,
runtimeMappings: undefined,
});
expect(query.body?.track_total_hits).toEqual(false);
expect(query.track_total_hits).toEqual(false);
});
test('if sortOrder is provided it should be included', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -487,7 +481,7 @@ describe('create_signals', () => {
trackTotalHits: false,
runtimeMappings: undefined,
});
expect(query?.body?.sort).toEqual([
expect(query.sort).toEqual([
{
'@timestamp': {
order: 'desc',
@ -499,6 +493,7 @@ describe('create_signals', () => {
test('it respects sort order for timestampOverride', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -510,7 +505,7 @@ describe('create_signals', () => {
sortOrder: 'desc',
runtimeMappings: undefined,
});
expect(query?.body?.sort).toEqual([
expect(query.sort).toEqual([
{
'event.ingested': {
order: 'desc',
@ -528,6 +523,7 @@ describe('create_signals', () => {
test('it respects overriderBody params', () => {
const query = buildEventsSearchQuery({
aggregations: undefined,
index: ['auditbeat-*'],
from: 'now-5m',
to: 'today',
@ -548,36 +544,33 @@ describe('create_signals', () => {
runtime_mappings: undefined,
track_total_hits: undefined,
ignore_unavailable: true,
body: {
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
size: 100,
query: {
bool: {
filter: [
{},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'today',
format: 'strict_date_optional_time',
},
},
],
},
],
},
},
_source: false,
fields: ['@timestamp'],
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
_source: false,
fields: ['@timestamp'],
runtime_mappings: undefined,
sort: [
{
'@timestamp': {
order: 'asc',
unmapped_type: 'date',
},
},
],
},
],
});
});
});

View file

@ -4,13 +4,16 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { estypes } from '@elastic/elasticsearch';
import { isEmpty } from 'lodash';
import type { OverrideBodyQuery } from '../types';
import type { TimestampOverride } from '../../../../../common/api/detection_engine/model/rule_schema';
interface BuildEventsSearchQuery {
aggregations?: Record<string, estypes.AggregationsAggregationContainer>;
interface BuildEventsSearchQuery<
TAggs extends Record<string, estypes.AggregationsAggregationContainer> | undefined = undefined
> {
aggregations: TAggs;
index: string[];
from: string;
to: string;
@ -99,7 +102,9 @@ export const buildTimeRangeFilter = ({
};
};
export const buildEventsSearchQuery = ({
export const buildEventsSearchQuery = <
TAggs extends Record<string, estypes.AggregationsAggregationContainer> | undefined
>({
aggregations,
index,
from,
@ -114,7 +119,7 @@ export const buildEventsSearchQuery = ({
trackTotalHits,
additionalFilters,
overrideBody,
}: BuildEventsSearchQuery): estypes.SearchRequest => {
}: BuildEventsSearchQuery<TAggs>) => {
const timestamps = secondaryTimestamp
? [primaryTimestamp, secondaryTimestamp]
: [primaryTimestamp];
@ -152,39 +157,34 @@ export const buildEventsSearchQuery = ({
});
}
const searchQuery: estypes.SearchRequest = {
const searchQuery = {
allow_no_indices: true,
index,
ignore_unavailable: true,
body: {
track_total_hits: trackTotalHits,
size,
query: {
bool: {
filter: filterWithTime,
},
track_total_hits: trackTotalHits,
size,
query: {
bool: {
filter: filterWithTime,
},
fields: [
{
field: '*',
include_unmapped: true,
},
...docFields,
],
...(aggregations ? { aggregations } : {}),
runtime_mappings: runtimeMappings,
sort,
...overrideBody,
},
fields: [
{
field: '*',
include_unmapped: true,
},
...docFields,
],
aggregations,
runtime_mappings: runtimeMappings,
sort,
...overrideBody,
};
if (searchAfterSortIds != null && !isEmpty(searchAfterSortIds)) {
return {
...searchQuery,
body: {
...searchQuery.body,
search_after: searchAfterSortIds,
},
search_after: searchAfterSortIds,
};
}
return searchQuery;

View file

@ -9,116 +9,69 @@ import { logSearchRequest } from './log_search_request';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
describe('logSearchRequest', () => {
it('should match inline snapshot when deprecated search request used', () => {
it('should match inline snapshot when search request used', () => {
const searchRequest = {
allow_no_indices: true,
index: ['close_alerts*'],
ignore_unavailable: true,
body: {
size: 0,
query: {
bool: {
filter: [
{
bool: {
must: [],
filter: [{ query_string: { query: '*' } }, { bool: { must_not: [] } }],
should: [],
must_not: [],
},
size: 0,
query: {
bool: {
filter: [
{
bool: {
must: [],
filter: [{ query_string: { query: '*' } }, { bool: { must_not: [] } }],
should: [],
must_not: [],
},
{
range: {
'@timestamp': {
lte: '2024-12-09T17:26:48.786Z',
gte: '2013-07-14T00:26:48.786Z',
format: 'strict_date_optional_time',
},
},
},
],
},
},
fields: [
{ field: '*', include_unmapped: true },
{ field: '@timestamp', format: 'strict_date_optional_time' },
],
aggregations: {
thresholdTerms: {
composite: {
sources: [
{ 'agent.name': { terms: { field: 'agent.name' } } },
{ 'destination.ip': { terms: { field: 'destination.ip' } } },
],
after: { 'agent.name': 'test-6', 'destination.ip': '127.0.0.1' },
size: 10000,
},
aggs: {
max_timestamp: { max: { field: '@timestamp' } },
min_timestamp: { min: { field: '@timestamp' } },
count_check: {
bucket_selector: {
buckets_path: { docCount: '_count' },
script: 'params.docCount >= 1',
{
range: {
'@timestamp': {
lte: '2024-12-09T17:26:48.786Z',
gte: '2013-07-14T00:26:48.786Z',
format: 'strict_date_optional_time',
},
},
},
},
],
},
runtime_mappings: {},
sort: [{ '@timestamp': { order: 'desc', unmapped_type: 'date' } }],
},
fields: [
{ field: '*', include_unmapped: true },
{ field: '@timestamp', format: 'strict_date_optional_time' },
],
aggregations: {
thresholdTerms: {
composite: {
sources: [
{ 'agent.name': { terms: { field: 'agent.name' } } },
{ 'destination.ip': { terms: { field: 'destination.ip' } } },
],
after: { 'agent.name': 'test-6', 'destination.ip': '127.0.0.1' },
size: 10000,
},
aggs: {
max_timestamp: { max: { field: '@timestamp' } },
min_timestamp: { min: { field: '@timestamp' } },
count_check: {
bucket_selector: {
buckets_path: { docCount: '_count' },
script: 'params.docCount >= 1',
},
},
},
},
},
runtime_mappings: {},
sort: [{ '@timestamp': { order: 'desc', unmapped_type: 'date' } }],
};
expect(logSearchRequest(searchRequest as unknown as estypes.SearchRequest))
.toMatchInlineSnapshot(`
"POST /close_alerts*/_search?allow_no_indices=true&ignore_unavailable=true
{
\\"size\\": 0,
\\"query\\": {
\\"bool\\": {
\\"filter\\": [
{
\\"bool\\": {
\\"must\\": [],
\\"filter\\": [
{
\\"query_string\\": {
\\"query\\": \\"*\\"
}
},
{
\\"bool\\": {
\\"must_not\\": []
}
}
],
\\"should\\": [],
\\"must_not\\": []
}
},
{
\\"range\\": {
\\"@timestamp\\": {
\\"lte\\": \\"2024-12-09T17:26:48.786Z\\",
\\"gte\\": \\"2013-07-14T00:26:48.786Z\\",
\\"format\\": \\"strict_date_optional_time\\"
}
}
}
]
}
},
\\"fields\\": [
{
\\"field\\": \\"*\\",
\\"include_unmapped\\": true
},
{
\\"field\\": \\"@timestamp\\",
\\"format\\": \\"strict_date_optional_time\\"
}
],
\\"aggregations\\": {
\\"thresholdTerms\\": {
\\"composite\\": {
@ -166,7 +119,52 @@ describe('logSearchRequest', () => {
}
}
},
\\"fields\\": [
{
\\"field\\": \\"*\\",
\\"include_unmapped\\": true
},
{
\\"field\\": \\"@timestamp\\",
\\"format\\": \\"strict_date_optional_time\\"
}
],
\\"query\\": {
\\"bool\\": {
\\"filter\\": [
{
\\"bool\\": {
\\"must\\": [],
\\"filter\\": [
{
\\"query_string\\": {
\\"query\\": \\"*\\"
}
},
{
\\"bool\\": {
\\"must_not\\": []
}
}
],
\\"should\\": [],
\\"must_not\\": []
}
},
{
\\"range\\": {
\\"@timestamp\\": {
\\"lte\\": \\"2024-12-09T17:26:48.786Z\\",
\\"gte\\": \\"2013-07-14T00:26:48.786Z\\",
\\"format\\": \\"strict_date_optional_time\\"
}
}
}
]
}
},
\\"runtime_mappings\\": {},
\\"size\\": 0,
\\"sort\\": [
{
\\"@timestamp\\": {

View file

@ -5,10 +5,46 @@
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { estypes } from '@elastic/elasticsearch';
import { omit, pick } from 'lodash';
// Search body fields as per https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-search-api-request-body
const BODY_FIELDS = [
'aggs',
'aggregations',
'docvalue_fields',
'fields',
'stored_fields',
'explain',
'from',
'indices_boost',
'knn',
'min_score',
'pit',
'query',
'retriever',
'runtime_mappings',
'seq_no_primary_term',
'size',
'sort',
'_source',
'stats',
'terminate_after',
'timeout',
'version',
];
export const logSearchRequest = (searchRequest: estypes.SearchRequest): string => {
const { body, index, ...params } = searchRequest;
const { index } = searchRequest;
const params = {
...omit(searchRequest, [...BODY_FIELDS, 'index']),
};
const body = {
...pick(searchRequest, BODY_FIELDS),
};
const urlParams = Object.entries(params)
.reduce<string[]>((acc, [key, value]) => {
if (value != null) {

View file

@ -18,8 +18,6 @@ import { getExceptionListItemSchemaMock } from '@kbn/lists-plugin/common/schemas
import type { SearchListItemArraySchema } from '@kbn/securitysolution-io-ts-list-types';
import { getSearchListItemResponseMock } from '@kbn/lists-plugin/common/schemas/response/search_list_item_schema.mock';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import type { CommonAlertFieldsLatest } from '@kbn/rule-registry-plugin/common/schemas';
import {
ALERT_RULE_CATEGORY,
@ -88,9 +86,7 @@ describe('searchAfterAndBulkCreate', () => {
test('should return success with number of searches less than max signals', async () => {
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -106,9 +102,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -124,9 +118,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -142,9 +134,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(9, 12))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(9, 12))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -160,9 +150,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
sampleDocSearchResultsNoSortIdNoHits()
);
const exceptionItem = getExceptionListItemSchemaMock();
@ -195,9 +183,7 @@ describe('searchAfterAndBulkCreate', () => {
test('should return success with number of searches less than max signals with gap', async () => {
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
createdAlerts: [
@ -212,9 +198,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -230,9 +214,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -248,9 +230,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
sampleDocSearchResultsNoSortIdNoHits()
);
const exceptionItem = getExceptionListItemSchemaMock();
@ -283,9 +263,7 @@ describe('searchAfterAndBulkCreate', () => {
test('should return success when no search results are in the allowlist', async () => {
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 4, someGuids.slice(0, 3))
)
repeatedSearchResultsWithSortId(4, 4, someGuids.slice(0, 3))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -316,9 +294,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
sampleDocSearchResultsNoSortIdNoHits()
);
const exceptionItem = getExceptionListItemSchemaMock();
@ -357,20 +333,14 @@ describe('searchAfterAndBulkCreate', () => {
listClient.searchListItemByValues = jest.fn().mockResolvedValue(searchListItems);
ruleServices.scopedClusterClient.asCurrentUser.search
.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 4, someGuids.slice(0, 3), [
'1.1.1.1',
'2.2.2.2',
'2.2.2.2',
'2.2.2.2',
])
)
repeatedSearchResultsWithSortId(4, 4, someGuids.slice(0, 3), [
'1.1.1.1',
'2.2.2.2',
'2.2.2.2',
'2.2.2.2',
])
)
.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
);
.mockResolvedValueOnce(sampleDocSearchResultsNoSortIdNoHits());
const exceptionItem = getExceptionListItemSchemaMock();
exceptionItem.entries = [
@ -428,22 +398,16 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search
.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(
4,
4,
someGuids.slice(0, 3),
['1.1.1.1', '2.2.2.2', '2.2.2.2', '2.2.2.2'],
// this is the case we are testing, if we receive an empty string for one of the sort ids.
['', '2222222222222']
)
repeatedSearchResultsWithSortId(
4,
4,
someGuids.slice(0, 3),
['1.1.1.1', '2.2.2.2', '2.2.2.2', '2.2.2.2'],
// this is the case we are testing, if we receive an empty string for one of the sort ids.
['', '2222222222222']
)
)
.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
);
.mockResolvedValueOnce(sampleDocSearchResultsNoSortIdNoHits());
const { success, createdSignalsCount } = await searchAfterAndBulkCreate({
sharedParams,
@ -467,14 +431,12 @@ describe('searchAfterAndBulkCreate', () => {
listClient.searchListItemByValues = jest.fn().mockResolvedValue(searchListItems);
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithNoSortId(4, 4, someGuids.slice(0, 3), [
'1.1.1.1',
'2.2.2.2',
'2.2.2.2',
'2.2.2.2',
])
)
repeatedSearchResultsWithNoSortId(4, 4, someGuids.slice(0, 3), [
'1.1.1.1',
'2.2.2.2',
'2.2.2.2',
'2.2.2.2',
])
);
const exceptionItem = getExceptionListItemSchemaMock();
@ -506,9 +468,7 @@ describe('searchAfterAndBulkCreate', () => {
test('should return success when no sortId present but search results are in the allowlist', async () => {
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithNoSortId(4, 4, someGuids.slice(0, 3))
)
repeatedSearchResultsWithNoSortId(4, 4, someGuids.slice(0, 3))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -567,9 +527,7 @@ describe('searchAfterAndBulkCreate', () => {
test('should return success when no exceptions list provided', async () => {
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 4, someGuids.slice(0, 3))
)
repeatedSearchResultsWithSortId(4, 4, someGuids.slice(0, 3))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -600,9 +558,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
sampleDocSearchResultsNoSortIdNoHits()
);
listClient.searchListItemByValues = jest.fn(({ value }) =>
@ -639,7 +595,7 @@ describe('searchAfterAndBulkCreate', () => {
},
];
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(sampleEmptyDocSearchResults())
sampleEmptyDocSearchResults()
);
listClient.searchListItemByValues = jest.fn(({ value }) =>
Promise.resolve(
@ -723,9 +679,7 @@ describe('searchAfterAndBulkCreate', () => {
],
};
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -748,9 +702,7 @@ describe('searchAfterAndBulkCreate', () => {
ruleServices.scopedClusterClient.asCurrentUser.bulk.mockResponseOnce(bulkItem); // adds the response with errors we are testing
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -766,9 +718,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -784,9 +734,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(9, 12))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(9, 12))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -802,9 +750,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
sampleDocSearchResultsNoSortIdNoHits()
);
const { success, createdSignalsCount, errors } = await searchAfterAndBulkCreate({
sharedParams,
@ -821,9 +767,7 @@ describe('searchAfterAndBulkCreate', () => {
it('invokes the enrichment callback with signal search results', async () => {
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(0, 3))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -839,9 +783,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(3, 6))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -857,9 +799,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
)
repeatedSearchResultsWithSortId(4, 1, someGuids.slice(6, 9))
);
ruleServices.alertWithPersistence.mockResolvedValueOnce({
@ -875,9 +815,7 @@ describe('searchAfterAndBulkCreate', () => {
});
ruleServices.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsNoSortIdNoHits()
)
sampleDocSearchResultsNoSortIdNoHits()
);
const mockEnrichment = jest.fn((a) => a);

View file

@ -10,6 +10,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { singleSearchAfter } from './single_search_after';
import { filterEventsAgainstList } from './large_list_filters/filter_events_against_list';
import { sendAlertTelemetryEvents } from './send_telemetry_events';
import { buildEventsSearchQuery } from './build_events_query';
import {
createSearchAfterReturnType,
createSearchAfterReturnTypeFromResponse,
@ -102,26 +103,30 @@ export const searchAfterAndBulkCreateFactory = async ({
} in index pattern "${inputIndexPattern}"`
);
const searchAfterQuery = buildEventsSearchQuery({
aggregations: undefined,
index: inputIndexPattern,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
runtimeMappings,
filter,
size: Math.ceil(Math.min(maxSignals, pageSize)),
sortOrder,
searchAfterSortIds: sortIds,
primaryTimestamp,
secondaryTimestamp,
trackTotalHits,
additionalFilters,
});
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: singleSearchLoggedRequests = [],
} = await singleSearchAfter({
searchAfterSortIds: sortIds,
index: inputIndexPattern,
runtimeMappings,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
searchRequest: searchAfterQuery,
services,
ruleExecutionLogger,
filter,
pageSize: Math.ceil(Math.min(maxSignals, pageSize)),
primaryTimestamp,
secondaryTimestamp,
trackTotalHits,
sortOrder,
additionalFilters,
loggedRequestsConfig: createLoggedRequestsConfig(
isLoggedRequestsEnabled,
sortIds,

View file

@ -4,24 +4,18 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import {
sampleDocSearchResultsNoSortId,
sampleDocSearchResultsWithSortId,
} from '../__mocks__/es_results';
import type { estypes } from '@elastic/elasticsearch';
import { sampleDocSearchResultsNoSortId } from '../__mocks__/es_results';
import { singleSearchAfter } from './single_search_after';
import type { RuleExecutorServicesMock } from '@kbn/alerting-plugin/server/mocks';
import { alertsMock } from '@kbn/alerting-plugin/server/mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { ruleExecutionLogMock } from '../../rule_monitoring/mocks';
import { buildEventsSearchQuery } from './build_events_query';
jest.mock('./build_events_query');
const mockBuildEventsSearchQuery = buildEventsSearchQuery as jest.Mock;
describe('singleSearchAfter', () => {
const mockService: RuleExecutorServicesMock = alertsMock.createRuleExecutorServices();
const ruleExecutionLogger = ruleExecutionLogMock.forExecutors.create();
const mockSearchRequest = { query: { match_all: {} } };
beforeEach(() => {
jest.clearAllMocks();
@ -29,39 +23,23 @@ describe('singleSearchAfter', () => {
test('if singleSearchAfter works without a given sort id', async () => {
mockService.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(sampleDocSearchResultsNoSortId())
sampleDocSearchResultsNoSortId()
);
const { searchResult } = await singleSearchAfter({
searchAfterSortIds: undefined,
index: [],
from: 'now-360s',
to: 'now',
searchRequest: mockSearchRequest,
services: mockService,
ruleExecutionLogger,
pageSize: 1,
filter: {},
primaryTimestamp: '@timestamp',
secondaryTimestamp: undefined,
runtimeMappings: undefined,
});
expect(searchResult).toEqual(sampleDocSearchResultsNoSortId());
});
test('if singleSearchAfter returns an empty failure array', async () => {
mockService.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(sampleDocSearchResultsNoSortId())
sampleDocSearchResultsNoSortId()
);
const { searchErrors } = await singleSearchAfter({
searchAfterSortIds: undefined,
index: [],
from: 'now-360s',
to: 'now',
searchRequest: mockSearchRequest,
services: mockService,
ruleExecutionLogger,
pageSize: 1,
filter: {},
primaryTimestamp: '@timestamp',
secondaryTimestamp: undefined,
runtimeMappings: undefined,
});
expect(searchErrors).toEqual([]);
});
@ -83,125 +61,41 @@ describe('singleSearchAfter', () => {
},
},
];
mockService.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
took: 10,
timed_out: false,
_shards: {
total: 10,
successful: 10,
failed: 1,
skipped: 0,
failures: errors,
},
hits: {
total: 100,
max_score: 100,
hits: [],
},
})
);
mockService.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce({
took: 10,
timed_out: false,
_shards: {
total: 10,
successful: 10,
failed: 1,
skipped: 0,
failures: errors,
},
hits: {
total: 100,
max_score: 100,
hits: [],
},
});
const { searchErrors } = await singleSearchAfter({
searchAfterSortIds: undefined,
index: [],
from: 'now-360s',
to: 'now',
searchRequest: mockSearchRequest,
services: mockService,
ruleExecutionLogger,
pageSize: 1,
filter: {},
primaryTimestamp: '@timestamp',
secondaryTimestamp: undefined,
runtimeMappings: undefined,
});
expect(searchErrors).toEqual([
'index: "index-123" reason: "some reason" type: "some type" caused by reason: "some reason" caused by type: "some type"',
]);
});
test('if singleSearchAfter works with a given sort id', async () => {
const searchAfterSortIds = ['1234567891111'];
mockService.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(
sampleDocSearchResultsWithSortId()
)
);
const { searchResult } = await singleSearchAfter({
searchAfterSortIds,
index: [],
from: 'now-360s',
to: 'now',
services: mockService,
ruleExecutionLogger,
pageSize: 1,
filter: {},
primaryTimestamp: '@timestamp',
secondaryTimestamp: undefined,
runtimeMappings: undefined,
});
expect(searchResult).toEqual(sampleDocSearchResultsWithSortId());
});
test('if singleSearchAfter throws error', async () => {
const searchAfterSortIds = ['1234567891111'];
mockService.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createErrorTransportRequestPromise(new Error('Fake Error'))
);
await expect(
singleSearchAfter({
searchAfterSortIds,
index: [],
from: 'now-360s',
to: 'now',
searchRequest: mockSearchRequest,
services: mockService,
ruleExecutionLogger,
pageSize: 1,
filter: {},
primaryTimestamp: '@timestamp',
secondaryTimestamp: undefined,
runtimeMappings: undefined,
})
).rejects.toThrow('Fake Error');
});
test('singleSearchAfter passes overrideBody to buildEventsSearchQuery', async () => {
mockService.scopedClusterClient.asCurrentUser.search.mockResolvedValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise(sampleDocSearchResultsNoSortId())
);
await singleSearchAfter({
searchAfterSortIds: undefined,
index: [],
from: 'now-360s',
to: 'now',
services: mockService,
ruleExecutionLogger,
pageSize: 1,
filter: {},
primaryTimestamp: '@timestamp',
secondaryTimestamp: undefined,
runtimeMappings: undefined,
overrideBody: {
_source: false,
fields: ['@timestamp'],
},
});
expect(mockBuildEventsSearchQuery).toHaveBeenCalledWith({
additionalFilters: undefined,
aggregations: undefined,
filter: {},
from: 'now-360s',
index: [],
primaryTimestamp: '@timestamp',
runtimeMappings: undefined,
searchAfterSortIds: undefined,
secondaryTimestamp: undefined,
size: 1,
sortOrder: undefined,
to: 'now',
trackTotalHits: undefined,
overrideBody: {
_source: false,
fields: ['@timestamp'],
},
});
});
});

View file

@ -4,70 +4,38 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { estypes } from '@elastic/elasticsearch';
import type { ESSearchResponse } from '@kbn/es-types';
import { performance } from 'perf_hooks';
import type {
AlertInstanceContext,
AlertInstanceState,
RuleExecutorServices,
} from '@kbn/alerting-plugin/server';
import type {
SignalSearchResponse,
SignalSource,
OverrideBodyQuery,
LoggedRequestsConfig,
} from '../types';
import { buildEventsSearchQuery } from './build_events_query';
import type { SignalSource, LoggedRequestsConfig } from '../types';
import { createErrorsFromShard, makeFloatString } from './utils';
import type { TimestampOverride } from '../../../../../common/api/detection_engine/model/rule_schema';
import { withSecuritySpan } from '../../../../utils/with_security_span';
import type { IRuleExecutionLogForExecutors } from '../../rule_monitoring';
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
import { logSearchRequest } from './logged_requests';
export interface SingleSearchAfterParams {
aggregations?: Record<string, estypes.AggregationsAggregationContainer>;
searchAfterSortIds: estypes.SortResults | undefined;
index: string[];
from: string;
to: string;
export interface SingleSearchAfterParams<TSearchRequest> {
searchRequest: TSearchRequest;
services: RuleExecutorServices<AlertInstanceState, AlertInstanceContext, 'default'>;
ruleExecutionLogger: IRuleExecutionLogForExecutors;
pageSize: number;
sortOrder?: estypes.SortOrder;
filter: estypes.QueryDslQueryContainer;
primaryTimestamp: TimestampOverride;
secondaryTimestamp: TimestampOverride | undefined;
trackTotalHits?: boolean;
runtimeMappings: estypes.MappingRuntimeFields | undefined;
additionalFilters?: estypes.QueryDslQueryContainer[];
overrideBody?: OverrideBodyQuery;
loggedRequestsConfig?: LoggedRequestsConfig;
}
// utilize search_after for paging results into bulk.
export const singleSearchAfter = async <
TAggregations = Record<estypes.AggregateName, estypes.AggregationsAggregate>
TSearchRequest extends estypes.SearchRequest = estypes.SearchRequest
>({
aggregations,
searchAfterSortIds,
index,
runtimeMappings,
from,
to,
searchRequest,
services,
filter,
ruleExecutionLogger,
pageSize,
sortOrder,
primaryTimestamp,
secondaryTimestamp,
trackTotalHits,
additionalFilters,
overrideBody,
loggedRequestsConfig,
}: SingleSearchAfterParams): Promise<{
searchResult: SignalSearchResponse<TAggregations>;
}: SingleSearchAfterParams<TSearchRequest>): Promise<{
searchResult: ESSearchResponse<SignalSource, TSearchRequest>;
searchDuration: string;
searchErrors: string[];
loggedRequests?: RulePreviewLoggedRequest[];
@ -76,33 +44,10 @@ export const singleSearchAfter = async <
const loggedRequests: RulePreviewLoggedRequest[] = [];
try {
const searchAfterQuery = buildEventsSearchQuery({
aggregations,
index,
from,
to,
runtimeMappings,
filter,
size: pageSize,
sortOrder,
searchAfterSortIds,
primaryTimestamp,
secondaryTimestamp,
trackTotalHits,
additionalFilters,
/**
* overrideBody allows the search after to ignore the _source property of the result,
* thus reducing the size of the response and increasing the performance of the query.
*/
overrideBody,
});
const start = performance.now();
const { body: nextSearchAfterResult } =
await services.scopedClusterClient.asCurrentUser.search<SignalSource, TAggregations>(
searchAfterQuery,
{ meta: true }
);
const nextSearchAfterResult = (await services.scopedClusterClient.asCurrentUser.search(
searchRequest
)) as unknown as ESSearchResponse<SignalSource, TSearchRequest>;
const end = performance.now();
@ -114,7 +59,7 @@ export const singleSearchAfter = async <
loggedRequests.push({
request: loggedRequestsConfig.skipRequestQuery
? undefined
: logSearchRequest(searchAfterQuery),
: logSearchRequest(searchRequest),
description: loggedRequestsConfig.description,
request_type: loggedRequestsConfig.type,
duration: Math.round(end - start),
@ -129,34 +74,6 @@ export const singleSearchAfter = async <
};
} catch (exc) {
ruleExecutionLogger.error(`Searching events operation failed: ${exc}`);
if (
exc.message.includes(`No mapping found for [${primaryTimestamp}] in order to sort on`) ||
(secondaryTimestamp &&
exc.message.includes(`No mapping found for [${secondaryTimestamp}] in order to sort on`))
) {
const searchRes: SignalSearchResponse<TAggregations> = {
took: 0,
timed_out: false,
_shards: {
total: 1,
successful: 1,
failed: 0,
skipped: 0,
},
hits: {
total: 0,
max_score: 0,
hits: [],
},
};
return {
searchResult: searchRes,
searchDuration: '-1.0',
searchErrors: exc.message,
loggedRequests,
};
}
throw exc;
}
});

View file

@ -34,7 +34,7 @@ export const getAnomalies = async (
return mlAnomalySearch(queryRequest, params.jobIds);
};
export const buildAnomalyQuery = (params: AnomaliesSearchParams): estypes.SearchRequest => {
export const buildAnomalyQuery = (params: AnomaliesSearchParams) => {
const boolCriteria = buildCriteria(params);
return {
body: {