mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
# Backport This will backport the following commits from `main` to `8.x`: - [[Security Solution][Detection Engine] Simplify searchAfterBulkCreate (#216498)](https://github.com/elastic/kibana/pull/216498) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Marshall Main","email":"55718608+marshallmain@users.noreply.github.com"},"sourceCommit":{"committedDate":"2025-04-02T12:23:02Z","message":"[Security Solution][Detection Engine] Simplify searchAfterBulkCreate (#216498)\n\n## Summary\n\nLong ago, we did multiple searches on each \"page\" of results to search\nfor both docs with the timestamp override and default `@timestamp`. We\nthen merged the results together before attempting to bulk create\nalerts. We no longer do this, instead we have a simpler process that\njust does one query per page so there's no need to merge search results\ntogether.\n\nWe also used to build the `tuple` inside `searchAfterBulkCreate`, so we\nhad logic to verify if the tuple was created correctly. The time range\ntuple is now calculated in the shared security wrapper, which is\nresponsible for any error handling. The TS types tell us that `tuple`\nand its subcomponents can't be null, we don't need to check in\n`searchAfterBulkCreate`.\n\n---------\n\nCo-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>","sha":"e303fb3d20d01f7351fa5b767eab5908b06eec3c","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:Detection Engine","backport:version","v9.1.0","v8.19.0"],"title":"[Security Solution][Detection Engine] Simplify searchAfterBulkCreate","number":216498,"url":"https://github.com/elastic/kibana/pull/216498","mergeCommit":{"message":"[Security Solution][Detection Engine] Simplify searchAfterBulkCreate (#216498)\n\n## Summary\n\nLong ago, we did multiple searches on each \"page\" of results to search\nfor both docs with the timestamp override and default `@timestamp`. We\nthen merged the results together before attempting to bulk create\nalerts. We no longer do this, instead we have a simpler process that\njust does one query per page so there's no need to merge search results\ntogether.\n\nWe also used to build the `tuple` inside `searchAfterBulkCreate`, so we\nhad logic to verify if the tuple was created correctly. The time range\ntuple is now calculated in the shared security wrapper, which is\nresponsible for any error handling. The TS types tell us that `tuple`\nand its subcomponents can't be null, we don't need to check in\n`searchAfterBulkCreate`.\n\n---------\n\nCo-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>","sha":"e303fb3d20d01f7351fa5b767eab5908b06eec3c"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/216498","number":216498,"mergeCommit":{"message":"[Security Solution][Detection Engine] Simplify searchAfterBulkCreate (#216498)\n\n## Summary\n\nLong ago, we did multiple searches on each \"page\" of results to search\nfor both docs with the timestamp override and default `@timestamp`. We\nthen merged the results together before attempting to bulk create\nalerts. We no longer do this, instead we have a simpler process that\njust does one query per page so there's no need to merge search results\ntogether.\n\nWe also used to build the `tuple` inside `searchAfterBulkCreate`, so we\nhad logic to verify if the tuple was created correctly. The time range\ntuple is now calculated in the shared security wrapper, which is\nresponsible for any error handling. The TS types tell us that `tuple`\nand its subcomponents can't be null, we don't need to check in\n`searchAfterBulkCreate`.\n\n---------\n\nCo-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>","sha":"e303fb3d20d01f7351fa5b767eab5908b06eec3c"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> Co-authored-by: Marshall Main <55718608+marshallmain@users.noreply.github.com>
This commit is contained in:
parent
b421fe725b
commit
ed5ad481db
2 changed files with 64 additions and 162 deletions
|
@ -12,11 +12,9 @@ import { filterEventsAgainstList } from './large_list_filters/filter_events_agai
|
|||
import { sendAlertTelemetryEvents } from './send_telemetry_events';
|
||||
import {
|
||||
createSearchAfterReturnType,
|
||||
createSearchResultReturnType,
|
||||
createSearchAfterReturnTypeFromResponse,
|
||||
getTotalHitsValue,
|
||||
mergeReturns,
|
||||
mergeSearchResults,
|
||||
getSafeSortIds,
|
||||
} from './utils';
|
||||
import type {
|
||||
|
@ -84,7 +82,7 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
ruleExecutionLogger,
|
||||
listClient,
|
||||
} = sharedParams;
|
||||
// eslint-disable-next-line complexity
|
||||
|
||||
return withSecuritySpan('searchAfterAndBulkCreate', async () => {
|
||||
let toReturn = createSearchAfterReturnType();
|
||||
let searchingIteration = 0;
|
||||
|
@ -92,107 +90,75 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
|
||||
// sortId tells us where to start our next consecutive search_after query
|
||||
let sortIds: estypes.SortResults | undefined;
|
||||
let hasSortId = true; // default to true so we execute the search on initial run
|
||||
|
||||
if (tuple == null || tuple.to == null || tuple.from == null) {
|
||||
ruleExecutionLogger.error(
|
||||
`missing run options fields: ${!tuple.to ? '"tuple.to"' : ''}, ${
|
||||
!tuple.from ? '"tuple.from"' : ''
|
||||
}`
|
||||
);
|
||||
return createSearchAfterReturnType({
|
||||
success: false,
|
||||
errors: ['malformed date tuple'],
|
||||
});
|
||||
}
|
||||
|
||||
const maxSignals = maxSignalsOverride ?? tuple.maxSignals;
|
||||
|
||||
while (toReturn.createdSignalsCount <= maxSignals) {
|
||||
const cycleNum = `cycle ${searchingIteration++}`;
|
||||
try {
|
||||
let mergedSearchResults = createSearchResultReturnType();
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Searching events${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
} in index pattern "${inputIndexPattern}"`
|
||||
);
|
||||
|
||||
if (hasSortId) {
|
||||
const {
|
||||
const {
|
||||
searchResult,
|
||||
searchDuration,
|
||||
searchErrors,
|
||||
loggedRequests: singleSearchLoggedRequests = [],
|
||||
} = await singleSearchAfter({
|
||||
searchAfterSortIds: sortIds,
|
||||
index: inputIndexPattern,
|
||||
runtimeMappings,
|
||||
from: tuple.from.toISOString(),
|
||||
to: tuple.to.toISOString(),
|
||||
services,
|
||||
ruleExecutionLogger,
|
||||
filter,
|
||||
pageSize: Math.ceil(Math.min(maxSignals, pageSize)),
|
||||
primaryTimestamp,
|
||||
secondaryTimestamp,
|
||||
trackTotalHits,
|
||||
sortOrder,
|
||||
additionalFilters,
|
||||
loggedRequestsConfig: createLoggedRequestsConfig(
|
||||
isLoggedRequestsEnabled,
|
||||
sortIds,
|
||||
searchingIteration
|
||||
),
|
||||
});
|
||||
toReturn = mergeReturns([
|
||||
toReturn,
|
||||
createSearchAfterReturnTypeFromResponse({
|
||||
searchResult,
|
||||
searchDuration,
|
||||
searchErrors,
|
||||
loggedRequests: singleSearchLoggedRequests = [],
|
||||
} = await singleSearchAfter({
|
||||
searchAfterSortIds: sortIds,
|
||||
index: inputIndexPattern,
|
||||
runtimeMappings,
|
||||
from: tuple.from.toISOString(),
|
||||
to: tuple.to.toISOString(),
|
||||
services,
|
||||
ruleExecutionLogger,
|
||||
filter,
|
||||
pageSize: Math.ceil(Math.min(maxSignals, pageSize)),
|
||||
primaryTimestamp,
|
||||
secondaryTimestamp,
|
||||
trackTotalHits,
|
||||
sortOrder,
|
||||
additionalFilters,
|
||||
loggedRequestsConfig: createLoggedRequestsConfig(
|
||||
isLoggedRequestsEnabled,
|
||||
sortIds,
|
||||
searchingIteration
|
||||
),
|
||||
});
|
||||
mergedSearchResults = mergeSearchResults([mergedSearchResults, searchResult]);
|
||||
toReturn = mergeReturns([
|
||||
toReturn,
|
||||
createSearchAfterReturnTypeFromResponse({
|
||||
searchResult: mergedSearchResults,
|
||||
primaryTimestamp,
|
||||
}),
|
||||
createSearchAfterReturnType({
|
||||
searchAfterTimes: [searchDuration],
|
||||
errors: searchErrors,
|
||||
}),
|
||||
]);
|
||||
loggedRequests.push(...singleSearchLoggedRequests);
|
||||
// determine if there are any candidate signals to be processed
|
||||
const totalHits = getTotalHitsValue(mergedSearchResults.hits.total);
|
||||
const lastSortIds = getSafeSortIds(
|
||||
searchResult.hits.hits[searchResult.hits.hits.length - 1]?.sort
|
||||
}),
|
||||
createSearchAfterReturnType({
|
||||
searchAfterTimes: [searchDuration],
|
||||
errors: searchErrors,
|
||||
}),
|
||||
]);
|
||||
loggedRequests.push(...singleSearchLoggedRequests);
|
||||
// determine if there are any candidate signals to be processed
|
||||
const totalHits = getTotalHitsValue(searchResult.hits.total);
|
||||
const lastSortIds = getSafeSortIds(
|
||||
searchResult.hits.hits[searchResult.hits.hits.length - 1]?.sort
|
||||
);
|
||||
|
||||
if (totalHits === 0 || searchResult.hits.hits.length === 0) {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found 0 events ${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}`
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found ${searchResult.hits.hits.length} of total ${totalHits} events${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}, last cursor ${JSON.stringify(lastSortIds)}`
|
||||
);
|
||||
|
||||
if (totalHits === 0 || mergedSearchResults.hits.hits.length === 0) {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found 0 events ${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}`
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
ruleExecutionLogger.debug(
|
||||
`[${cycleNum}] Found ${
|
||||
mergedSearchResults.hits.hits.length
|
||||
} of total ${totalHits} events${
|
||||
sortIds ? ` after cursor ${JSON.stringify(sortIds)}` : ''
|
||||
}, last cursor ${JSON.stringify(lastSortIds)}`
|
||||
);
|
||||
}
|
||||
|
||||
// ES can return negative sort id for date field, when sort order set to desc
|
||||
// this could happen when event has empty sort field
|
||||
// https://github.com/elastic/kibana/issues/174573 (happens to IM rule only since it uses desc order for events search)
|
||||
// when negative sort id used in subsequent request it fails, so when negative sort value found we don't do next request
|
||||
const hasNegativeNumber = lastSortIds?.some((val) => val < 0);
|
||||
|
||||
if (lastSortIds != null && lastSortIds.length !== 0 && !hasNegativeNumber) {
|
||||
sortIds = lastSortIds;
|
||||
hasSortId = true;
|
||||
} else {
|
||||
hasSortId = false;
|
||||
}
|
||||
}
|
||||
|
||||
// filter out the search results that match with the values found in the list.
|
||||
|
@ -202,7 +168,7 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
listClient,
|
||||
exceptionsList,
|
||||
ruleExecutionLogger,
|
||||
events: mergedSearchResults.hits.hits,
|
||||
events: searchResult.hits.hits,
|
||||
});
|
||||
|
||||
// only bulk create if there are filteredEvents leftover
|
||||
|
@ -233,7 +199,14 @@ export const searchAfterAndBulkCreateFactory = async ({
|
|||
}
|
||||
}
|
||||
|
||||
if (!hasSortId) {
|
||||
// ES can return negative sort id for date field, when sort order set to desc
|
||||
// this could happen when event has empty sort field
|
||||
// https://github.com/elastic/kibana/issues/174573 (happens to IM rule only since it uses desc order for events search)
|
||||
// when negative sort id used in subsequent request it fails, so when negative sort value found we don't do next request
|
||||
const hasNegativeNumber = lastSortIds?.some((val) => val < 0);
|
||||
if (lastSortIds != null && lastSortIds.length !== 0 && !hasNegativeNumber) {
|
||||
sortIds = lastSortIds;
|
||||
} else {
|
||||
ruleExecutionLogger.debug(`[${cycleNum}] Unable to fetch last event cursor`);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -47,7 +47,6 @@ import type {
|
|||
WrappedSignalHit,
|
||||
RuleRangeTuple,
|
||||
BaseSignalHit,
|
||||
SignalSourceHit,
|
||||
SimpleHit,
|
||||
WrappedEventHit,
|
||||
SecuritySharedParams,
|
||||
|
@ -627,28 +626,6 @@ export const createSearchAfterReturnType = ({
|
|||
};
|
||||
};
|
||||
|
||||
export const createSearchResultReturnType = <
|
||||
TAggregations = Record<estypes.AggregateName, estypes.AggregationsAggregate>
|
||||
>(): SignalSearchResponse<TAggregations> => {
|
||||
const hits: SignalSourceHit[] = [];
|
||||
return {
|
||||
took: 0,
|
||||
timed_out: false,
|
||||
_shards: {
|
||||
total: 0,
|
||||
successful: 0,
|
||||
failed: 0,
|
||||
skipped: 0,
|
||||
failures: [],
|
||||
},
|
||||
hits: {
|
||||
total: 0,
|
||||
max_score: 0,
|
||||
hits,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Merges the return values from bulk creating alerts into the appropriate fields in the combined return object.
|
||||
*/
|
||||
|
@ -719,54 +696,6 @@ export const mergeReturns = (
|
|||
});
|
||||
};
|
||||
|
||||
export const mergeSearchResults = <
|
||||
TAggregations = Record<estypes.AggregateName, estypes.AggregationsAggregate>
|
||||
>(
|
||||
searchResults: Array<SignalSearchResponse<TAggregations>>
|
||||
) => {
|
||||
return searchResults.reduce((prev, next) => {
|
||||
const {
|
||||
took: existingTook,
|
||||
timed_out: existingTimedOut,
|
||||
_shards: existingShards,
|
||||
hits: existingHits,
|
||||
} = prev;
|
||||
|
||||
const {
|
||||
took: newTook,
|
||||
timed_out: newTimedOut,
|
||||
_scroll_id: newScrollId,
|
||||
_shards: newShards,
|
||||
aggregations: newAggregations,
|
||||
hits: newHits,
|
||||
} = next;
|
||||
|
||||
return {
|
||||
took: Math.max(newTook, existingTook),
|
||||
timed_out: newTimedOut && existingTimedOut,
|
||||
_scroll_id: newScrollId,
|
||||
_shards: {
|
||||
total: newShards.total + existingShards.total,
|
||||
successful: newShards.successful + existingShards.successful,
|
||||
failed: newShards.failed + existingShards.failed,
|
||||
// @ts-expect-error @elastic/elaticsearch skipped is optional in ShardStatistics
|
||||
skipped: newShards.skipped + existingShards.skipped,
|
||||
failures: [
|
||||
...(existingShards.failures != null ? existingShards.failures : []),
|
||||
...(newShards.failures != null ? newShards.failures : []),
|
||||
],
|
||||
},
|
||||
aggregations: newAggregations,
|
||||
hits: {
|
||||
total: calculateTotal(prev.hits.total, next.hits.total),
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
max_score: Math.max(newHits.max_score!, existingHits.max_score!),
|
||||
hits: [...existingHits.hits, ...newHits.hits],
|
||||
},
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
export const getTotalHitsValue = (totalHits: number | { value: number } | undefined): number =>
|
||||
typeof totalHits === 'undefined'
|
||||
? -1
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue