mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Security Solution][Detection Engine] Simplify searchAfterBulkCreate (#216498)
## Summary Long ago, we did multiple searches on each "page" of results to search for both docs with the timestamp override and default `@timestamp`. We then merged the results together before attempting to bulk create alerts. We no longer do this, instead we have a simpler process that just does one query per page so there's no need to merge search results together. We also used to build the `tuple` inside `searchAfterBulkCreate`, so we had logic to verify if the tuple was created correctly. The time range tuple is now calculated in the shared security wrapper, which is responsible for any error handling. The TS types tell us that `tuple` and its subcomponents can't be null, we don't need to check in `searchAfterBulkCreate`. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
40e02f1745
commit
e303fb3d20
2 changed files with 64 additions and 162 deletions
|
@ -12,11 +12,9 @@ import { filterEventsAgainstList } from './large_list_filters/filter_events_agai
|
|||
import { sendAlertTelemetryEvents } from './send_telemetry_events';
|
||||
import {
|
||||
createSearchAfterReturnType,
|
||||
createSearchResultReturnType,
|
||||
createSearchAfterReturnTypeFromResponse,
|
||||
getTotalHitsValue,
|
||||
mergeReturns,
|
||||
mergeSearchResults,
|
||||
getSafeSortIds,
|
||||
} from './utils';
|
||||
import type {
|
||||
|
@ -84,7 +82,7 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
ruleExecutionLogger,
|
||||
listClient,
|
||||
} = sharedParams;
|
||||
// eslint-disable-next-line complexity
|
||||
|
||||
return withSecuritySpan('searchAfterAndBulkCreate', async () => {
|
||||
let toReturn = createSearchAfterReturnType();
|
||||
let searchingIteration = 0;
|
||||
|
@ -92,107 +90,75 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
|
||||
// sortId tells us where to start our next consecutive search_after query
|
||||
let sortIds: estypes.SortResults | undefined;
|
||||
let hasSortId = true; // default to true so we execute the search on initial run
|
||||
|
||||
if (tuple == null || tuple.to == null || tuple.from == null) {
|
||||
ruleExecutionLogger.error(
|
||||
`missing run options fields: ${!tuple.to ? '"tuple.to"' : ''}, ${
|
||||
!tuple.from ? '"tuple.from"' : ''
|
||||
}`
|
||||
);
|
||||
return createSearchAfterReturnType({
|
||||
success: false,
|
||||
errors: ['malformed date tuple'],
|
||||
});
|
||||
}
|
||||
|
||||
const maxSignals = maxSignalsOverride ?? tuple.maxSignals;
|
||||
|
||||
while (toReturn.createdSignalsCount <= maxSignals) {
|
||||
const cycleNum = `cycle ${searchingIteration++}`;
|
||||
try {
|
||||
let mergedSearchResults = createSearchResultReturnType();
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Searching events${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
} in index pattern "${inputIndexPattern}"`
|
||||
);
|
||||
|
||||
if (hasSortId) {
|
||||
const {
|
||||
const {
|
||||
searchResult,
|
||||
searchDuration,
|
||||
searchErrors,
|
||||
loggedRequests: singleSearchLoggedRequests = [],
|
||||
} = await singleSearchAfter({
|
||||
searchAfterSortIds: sortIds,
|
||||
index: inputIndexPattern,
|
||||
runtimeMappings,
|
||||
from: tuple.from.toISOString(),
|
||||
to: tuple.to.toISOString(),
|
||||
services,
|
||||
ruleExecutionLogger,
|
||||
filter,
|
||||
pageSize: Math.ceil(Math.min(maxSignals, pageSize)),
|
||||
primaryTimestamp,
|
||||
secondaryTimestamp,
|
||||
trackTotalHits,
|
||||
sortOrder,
|
||||
additionalFilters,
|
||||
loggedRequestsConfig: createLoggedRequestsConfig(
|
||||
isLoggedRequestsEnabled,
|
||||
sortIds,
|
||||
searchingIteration
|
||||
),
|
||||
});
|
||||
toReturn = mergeReturns([
|
||||
toReturn,
|
||||
createSearchAfterReturnTypeFromResponse({
|
||||
searchResult,
|
||||
searchDuration,
|
||||
searchErrors,
|
||||
loggedRequests: singleSearchLoggedRequests = [],
|
||||
} = await singleSearchAfter({
|
||||
searchAfterSortIds: sortIds,
|
||||
index: inputIndexPattern,
|
||||
runtimeMappings,
|
||||
from: tuple.from.toISOString(),
|
||||
to: tuple.to.toISOString(),
|
||||
services,
|
||||
ruleExecutionLogger,
|
||||
filter,
|
||||
pageSize: Math.ceil(Math.min(maxSignals, pageSize)),
|
||||
primaryTimestamp,
|
||||
secondaryTimestamp,
|
||||
trackTotalHits,
|
||||
sortOrder,
|
||||
additionalFilters,
|
||||
loggedRequestsConfig: createLoggedRequestsConfig(
|
||||
isLoggedRequestsEnabled,
|
||||
sortIds,
|
||||
searchingIteration
|
||||
),
|
||||
});
|
||||
mergedSearchResults = mergeSearchResults([mergedSearchResults, searchResult]);
|
||||
toReturn = mergeReturns([
|
||||
toReturn,
|
||||
createSearchAfterReturnTypeFromResponse({
|
||||
searchResult: mergedSearchResults,
|
||||
primaryTimestamp,
|
||||
}),
|
||||
createSearchAfterReturnType({
|
||||
searchAfterTimes: [searchDuration],
|
||||
errors: searchErrors,
|
||||
}),
|
||||
]);
|
||||
loggedRequests.push(...singleSearchLoggedRequests);
|
||||
// determine if there are any candidate signals to be processed
|
||||
const totalHits = getTotalHitsValue(mergedSearchResults.hits.total);
|
||||
const lastSortIds = getSafeSortIds(
|
||||
searchResult.hits.hits[searchResult.hits.hits.length - 1]?.sort
|
||||
}),
|
||||
createSearchAfterReturnType({
|
||||
searchAfterTimes: [searchDuration],
|
||||
errors: searchErrors,
|
||||
}),
|
||||
]);
|
||||
loggedRequests.push(...singleSearchLoggedRequests);
|
||||
// determine if there are any candidate signals to be processed
|
||||
const totalHits = getTotalHitsValue(searchResult.hits.total);
|
||||
const lastSortIds = getSafeSortIds(
|
||||
searchResult.hits.hits[searchResult.hits.hits.length - 1]?.sort
|
||||
);
|
||||
|
||||
if (totalHits === 0 || searchResult.hits.hits.length === 0) {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found 0 events ${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}`
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found ${searchResult.hits.hits.length} of total ${totalHits} events${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}, last cursor ${JSON.stringify(lastSortIds)}`
|
||||
);
|
||||
|
||||
if (totalHits === 0 || mergedSearchResults.hits.hits.length === 0) {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found 0 events ${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}`
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found ${
|
||||
mergedSearchResults.hits.hits.length
|
||||
} of total ${totalHits} events${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}, last cursor ${JSON.stringify(lastSortIds)}`
|
||||
);
|
||||
}
|
||||
|
||||
// ES can return negative sort id for date field, when sort order set to desc
|
||||
// this could happen when event has empty sort field
|
||||
// https://github.com/elastic/kibana/issues/174573 (happens to IM rule only since it uses desc order for events search)
|
||||
// when negative sort id used in subsequent request it fails, so when negative sort value found we don't do next request
|
||||
const hasNegativeNumber = lastSortIds?.some((val) => val < 0);
|
||||
|
||||
if (lastSortIds != null && lastSortIds.length !== 0 && !hasNegativeNumber) {
|
||||
sortIds = lastSortIds;
|
||||
hasSortId = true;
|
||||
} else {
|
||||
hasSortId = false;
|
||||
}
|
||||
}
|
||||
|
||||
// filter out the search results that match with the values found in the list.
|
||||
|
@ -202,7 +168,7 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
listClient,
|
||||
exceptionsList,
|
||||
ruleExecutionLogger,
|
||||
events: mergedSearchResults.hits.hits,
|
||||
events: searchResult.hits.hits,
|
||||
});
|
||||
|
||||
// only bulk create if there are filteredEvents leftover
|
||||
|
@ -233,7 +199,14 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
}
|
||||
}
|
||||
|
||||
if (!hasSortId) {
|
||||
// ES can return negative sort id for date field, when sort order set to desc
|
||||
// this could happen when event has empty sort field
|
||||
// https://github.com/elastic/kibana/issues/174573 (happens to IM rule only since it uses desc order for events search)
|
||||
// when negative sort id used in subsequent request it fails, so when negative sort value found we don't do next request
|
||||
const hasNegativeNumber = lastSortIds?.some((val) => val < 0);
|
||||
if (lastSortIds != null && lastSortIds.length !== 0 && !hasNegativeNumber) {
|
||||
sortIds = lastSortIds;
|
||||
} else {
|
||||
ruleExecutionLogger.debug(`[${cycleNum}] Unable to fetch last event cursor`);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -46,7 +46,6 @@ import type {
|
|||
WrappedSignalHit,
|
||||
RuleRangeTuple,
|
||||
BaseSignalHit,
|
||||
SignalSourceHit,
|
||||
SimpleHit,
|
||||
WrappedEventHit,
|
||||
SecuritySharedParams,
|
||||
|
@ -626,28 +625,6 @@ export const createSearchAfterReturnType = ({
|
|||
};
|
||||
};
|
||||
|
||||
export const createSearchResultReturnType = <
|
||||
TAggregations = Record<estypes.AggregateName, estypes.AggregationsAggregate>
|
||||
>(): SignalSearchResponse<TAggregations> => {
|
||||
const hits: SignalSourceHit[] = [];
|
||||
return {
|
||||
took: 0,
|
||||
timed_out: false,
|
||||
_shards: {
|
||||
total: 0,
|
||||
successful: 0,
|
||||
failed: 0,
|
||||
skipped: 0,
|
||||
failures: [],
|
||||
},
|
||||
hits: {
|
||||
total: 0,
|
||||
max_score: 0,
|
||||
hits,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Merges the return values from bulk creating alerts into the appropriate fields in the combined return object.
|
||||
*/
|
||||
|
@ -718,54 +695,6 @@ export const mergeReturns = (
|
|||
});
|
||||
};
|
||||
|
||||
export const mergeSearchResults = <
|
||||
TAggregations = Record<estypes.AggregateName, estypes.AggregationsAggregate>
|
||||
>(
|
||||
searchResults: Array<SignalSearchResponse<TAggregations>>
|
||||
) => {
|
||||
return searchResults.reduce((prev, next) => {
|
||||
const {
|
||||
took: existingTook,
|
||||
timed_out: existingTimedOut,
|
||||
_shards: existingShards,
|
||||
hits: existingHits,
|
||||
} = prev;
|
||||
|
||||
const {
|
||||
took: newTook,
|
||||
timed_out: newTimedOut,
|
||||
_scroll_id: newScrollId,
|
||||
_shards: newShards,
|
||||
aggregations: newAggregations,
|
||||
hits: newHits,
|
||||
} = next;
|
||||
|
||||
return {
|
||||
took: Math.max(newTook, existingTook),
|
||||
timed_out: newTimedOut && existingTimedOut,
|
||||
_scroll_id: newScrollId,
|
||||
_shards: {
|
||||
total: newShards.total + existingShards.total,
|
||||
successful: newShards.successful + existingShards.successful,
|
||||
failed: newShards.failed + existingShards.failed,
|
||||
// @ts-expect-error @elastic/elaticsearch skipped is optional in ShardStatistics
|
||||
skipped: newShards.skipped + existingShards.skipped,
|
||||
failures: [
|
||||
...(existingShards.failures != null ? existingShards.failures : []),
|
||||
...(newShards.failures != null ? newShards.failures : []),
|
||||
],
|
||||
},
|
||||
aggregations: newAggregations,
|
||||
hits: {
|
||||
total: calculateTotal(prev.hits.total, next.hits.total),
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
max_score: Math.max(newHits.max_score!, existingHits.max_score!),
|
||||
hits: [...existingHits.hits, ...newHits.hits],
|
||||
},
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
export const getTotalHitsValue = (totalHits: number | { value: number } | undefined): number =>
|
||||
typeof totalHits === 'undefined'
|
||||
? -1
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue