[Security Solution][Detection Engine] adds async ES|QL query (#216667)

## Summary

- addresses https://github.com/elastic/security-team/issues/11116 (list
item 2)

Introducing async query would allow to overcome ES request timeout for
long running rules and queries.

Timeout for ES request is [defined in alerting
framework](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_es_request_timeout.ts#L21)
and is smaller value out of rule execution timeout or default ES request
timeout(which is 5m and hardcoded
[here](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_rule_task_timeout.ts)).

If ES|QL rule performs a single long-running ES query, it can time out
after 5m due to this ES request timeout. This value can't be changed,
unlike rule execution timeout. It can be overwritten in Kibana config

```
xpack.alerting.rules.run:
  timeout: '10m'
  ruleTypeOverrides:
    - id:  'siem.esqlRule'
      timeout: '15m'
```
So, we can encounter situations when rule fails execution after 5m due
to ES request timeout, despite a fact it configured with longer timeout
of 15m

By using async query, we can overcome this limitation and can poll async
query results until it completes or rule timeouts

More details in internal
[issue](https://github.com/elastic/sdh-security-team/issues/1224)

---------

Co-authored-by: Ryland Herrick <ryalnd@gmail.com>
This commit is contained in:
Vitalii Dmyterko 2025-04-17 15:23:07 +01:00 committed by GitHub
parent c44efc52f6
commit 3d7aac1a44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 449 additions and 38 deletions

View file

@ -606,6 +606,66 @@ describe('wrapScopedClusterClient', () => {
);
expect(logger.warn).not.toHaveBeenCalled();
});
test('throws error when es|ql async search throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
abortController.abort();
childClient.transport.request.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);
const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
abortableSearchClient.asInternalUser.transport.request({
method: 'POST',
path: '/_query/async',
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"ES|QL search has been aborted due to cancelled execution"`
);
expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"POST\",\"path\":\"/_query/async\"} - with options {}`
);
expect(logger.warn).not.toHaveBeenCalled();
});
test('throws error when es|ql async query poll throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
abortController.abort();
childClient.transport.request.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);
const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
abortableSearchClient.asInternalUser.transport.request({
method: 'GET',
path: '/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3',
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"ES|QL search has been aborted due to cancelled execution"`
);
expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"GET\",\"path\":\"/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3\"} - with options {}`
);
expect(logger.warn).not.toHaveBeenCalled();
});
});
test('uses asInternalUser when specified', async () => {

View file

@ -162,7 +162,10 @@ function getWrappedTransportRequestFn(opts: WrapEsClientOpts) {
options?: TransportRequestOptions
): Promise<TResponse | TransportResult<TResponse, TContext>> {
// Wrap ES|QL requests with an abort signal
if (params.method === 'POST' && params.path === '/_query') {
if (
(params.method === 'POST' && ['/_query', '/_query/async'].includes(params.path)) ||
(params.method === 'GET' && params.path.startsWith('/_query/async'))
) {
let requestOptions: TransportRequestOptions = {};
try {
requestOptions = options ?? {};

View file

@ -209,6 +209,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -318,6 +319,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -493,6 +495,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -599,6 +602,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -743,6 +747,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -859,6 +864,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -976,6 +982,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -1080,6 +1087,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -1187,6 +1195,7 @@ describe('RuleTypeRunner', () => {
expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
@ -1254,6 +1263,47 @@ describe('RuleTypeRunner', () => {
shouldLogAlerts: true,
});
});
test('should call executor with custom ruleExecutionTimeout', async () => {
const mockRuleExecutionTimeout = '15m';
await ruleTypeRunner.run({
context: {
alertingEventLogger,
flappingSettings: DEFAULT_FLAPPING_SETTINGS,
queryDelaySec: 0,
request: fakeRequest,
maintenanceWindowsService,
ruleId: RULE_ID,
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
alertsClient,
executionId: 'abc',
executorServices: {
getDataViews,
ruleMonitoringService: publicRuleMonitoringService,
ruleResultService: publicRuleResultService,
savedObjectsClient,
uiSettingsClient,
wrappedScopedClusterClient,
getWrappedSearchSourceClient,
},
rule: mockedRule,
ruleType: { ...ruleType, ruleTaskTimeout: mockRuleExecutionTimeout },
startedAt: new Date(DATE_1970),
state: mockTaskInstance().state,
validatedParams: mockedRuleParams,
});
expect(ruleType.executor).toHaveBeenCalledWith(
expect.objectContaining({
ruleExecutionTimeout: mockRuleExecutionTimeout,
})
);
});
});
});

View file

@ -291,6 +291,7 @@ export class RuleTypeRunner<
...(startedAtOverridden ? { forceNow: startedAt } : {}),
}),
isServerless: context.isServerless,
ruleExecutionTimeout: ruleType.ruleTaskTimeout,
})
)
);

View file

@ -139,6 +139,7 @@ export interface RuleExecutorOptions<
flappingSettings: RulesSettingsFlappingProperties;
getTimeRange: (timeWindow?: string) => GetTimeRangeResult;
isServerless: boolean;
ruleExecutionTimeout?: string;
}
export interface RuleParamsAndRefs<Params extends RuleTypeParams> {

View file

@ -256,7 +256,7 @@ export const previewRulesRoute = (
rule,
services: {
shouldWriteAlerts: () => true,
shouldStopExecution: () => false,
shouldStopExecution: () => isAborted,
alertsClient: null,
alertFactory: {
create: alertInstanceFactoryStub<
@ -298,6 +298,7 @@ export const previewRulesRoute = (
return { dateStart: date, dateEnd: date };
},
isServerless,
ruleExecutionTimeout: `${PREVIEW_TIMEOUT_SECONDS}s`,
})) as { state: TState; loggedRequests: RulePreviewLoggedRequest[] });
const errors = loggedStatusChanges

View file

@ -23,6 +23,7 @@ export interface BuildEqlSearchRequestParams {
primaryTimestamp: TimestampOverride;
secondaryTimestamp: TimestampOverride | undefined;
exceptionFilter: Filter | undefined;
ruleExecutionTimeout: string | undefined;
}
export const buildEsqlSearchRequest = ({
@ -34,6 +35,7 @@ export const buildEsqlSearchRequest = ({
secondaryTimestamp,
exceptionFilter,
size,
ruleExecutionTimeout,
}: BuildEqlSearchRequestParams) => {
const esFilter = getQueryFilter({
query: '',
@ -61,5 +63,7 @@ export const buildEsqlSearchRequest = ({
filter: requestFilter,
},
},
wait_for_completion_timeout: '4m', // hard limit request timeout is 5m set by ES proxy and alerting framework. So, we should be fine to wait 4m for async query completion. If rule execution is shorter than 4m and query was not completed, it will be aborted.
...(ruleExecutionTimeout ? { keep_alive: ruleExecutionTimeout } : {}),
};
};

View file

@ -24,10 +24,8 @@ import { fetchSourceDocuments } from './fetch_source_documents';
import { buildReasonMessageForEsqlAlert } from '../utils/reason_formatters';
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
import type { SecurityRuleServices, SecuritySharedParams, SignalSource } from '../types';
import { logEsqlRequest } from '../utils/logged_requests';
import { getDataTierFilter } from '../utils/get_data_tier_filter';
import { checkErrorDetails } from '../utils/check_error_details';
import * as i18n from '../translations';
import {
addToSearchAfterReturn,
@ -52,12 +50,14 @@ export const esqlExecutor = async ({
state,
licensing,
scheduleNotificationResponseActionsService,
ruleExecutionTimeout,
}: {
sharedParams: SecuritySharedParams<EsqlRuleParams>;
services: SecurityRuleServices;
state: Record<string, unknown>;
licensing: LicensingPluginSetup;
scheduleNotificationResponseActionsService: ScheduleNotificationResponseActionsService;
ruleExecutionTimeout?: string;
}) => {
const {
completeRule,
@ -99,16 +99,10 @@ export const esqlExecutor = async ({
primaryTimestamp,
secondaryTimestamp,
exceptionFilter,
ruleExecutionTimeout,
});
const esqlQueryString = { drop_null_columns: true };
if (isLoggedRequestsEnabled) {
loggedRequests.push({
request: logEsqlRequest(esqlRequest, esqlQueryString),
description: i18n.ESQL_SEARCH_REQUEST_DESCRIPTION,
});
}
ruleExecutionLogger.debug(`ES|QL query request: ${JSON.stringify(esqlRequest)}`);
const exceptionsWarning = getUnprocessedExceptionsWarnings(unprocessedExceptions);
if (exceptionsWarning) {
@ -121,15 +115,14 @@ export const esqlExecutor = async ({
esClient: services.scopedClusterClient.asCurrentUser,
requestBody: esqlRequest,
requestQueryParams: esqlQueryString,
shouldStopExecution: services.shouldStopExecution,
ruleExecutionLogger,
loggedRequests: isLoggedRequestsEnabled ? loggedRequests : undefined,
});
const esqlSearchDuration = performance.now() - esqlSignalSearchStart;
result.searchAfterTimes.push(makeFloatString(esqlSearchDuration));
if (isLoggedRequestsEnabled && loggedRequests[0]) {
loggedRequests[0].duration = Math.round(esqlSearchDuration);
}
ruleExecutionLogger.debug(`ES|QL query request took: ${esqlSearchDuration}ms`);
const isRuleAggregating = computeIsESQLQueryAggregating(completeRule.ruleParams.query);

View file

@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { performEsqlRequest } from './esql_request';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
const columns = [
{ name: '_id', type: 'keyword' as const },
{ name: 'agent.name', type: 'keyword' as const },
{ name: 'agent.version', type: 'keyword' as const },
{ name: 'agent.type', type: 'keyword' as const },
];
const values = [['doc-id', 'agent-name', '8.8.1', 'packetbeat']];
const requestBody = {
query: 'from test* METADATA _id | limit 101',
filter: {
bool: {
filter: [
{
range: {
'@timestamp': {
lte: '2025-04-02T10:13:52.235Z',
gte: '2013-11-04T16:13:52.235Z',
format: 'strict_date_optional_time',
},
},
},
{
bool: {
must: [],
filter: [],
should: [],
must_not: [],
},
},
],
},
},
wait_for_completion_timeout: '4m',
};
const requestQueryParams = { drop_null_columns: true };
describe('performEsqlRequest', () => {
const esClient = elasticsearchServiceMock.createElasticsearchClient();
const shouldStopExecution: jest.Mock = jest.fn();
shouldStopExecution.mockReturnValue(false);
beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
});
it('returns results immediately when the async query completed', async () => {
const mockResponse = {
id: 'QUERY-ID',
is_running: false,
columns,
values,
};
esClient.transport.request.mockResolvedValueOnce(mockResponse);
const result = await performEsqlRequest({
esClient,
requestBody,
requestQueryParams,
shouldStopExecution,
});
expect(result).toEqual(mockResponse);
expect(esClient.transport.request).toHaveBeenCalledTimes(2);
expect(esClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: '/_query/async',
body: requestBody,
querystring: requestQueryParams,
});
expect(esClient.transport.request).toHaveBeenCalledWith({
method: 'DELETE',
path: '/_query/async/QUERY-ID',
});
});
it('polls until the query is completed', async () => {
const mockSubmitResponse = {
id: 'QUERY-ID',
is_running: true,
columns: [],
values: [],
};
const mockPollResponse = {
...mockSubmitResponse,
is_running: false,
columns,
values,
};
esClient.transport.request
.mockResolvedValueOnce(mockSubmitResponse)
.mockResolvedValueOnce(mockPollResponse);
const waitForPerformEsql = performEsqlRequest({
esClient,
requestBody,
requestQueryParams,
shouldStopExecution,
});
await jest.advanceTimersByTimeAsync(15000);
const result = await waitForPerformEsql;
expect(result).toEqual(mockPollResponse);
expect(esClient.transport.request).toHaveBeenCalledTimes(3);
expect(esClient.transport.request).toHaveBeenNthCalledWith(1, {
method: 'POST',
path: '/_query/async',
body: requestBody,
querystring: requestQueryParams,
});
expect(esClient.transport.request).toHaveBeenNthCalledWith(2, {
method: 'GET',
path: '/_query/async/QUERY-ID',
});
expect(esClient.transport.request).toHaveBeenCalledWith({
method: 'DELETE',
path: '/_query/async/QUERY-ID',
});
});
it('throws an error if execution is cancelled', async () => {
const mockSubmitResponse = {
id: 'QUERY-ID',
is_running: true,
columns: [],
values: [],
};
esClient.transport.request.mockResolvedValue(mockSubmitResponse);
shouldStopExecution.mockReturnValue(true);
const waitForPerformEsql = performEsqlRequest({
esClient,
requestBody,
requestQueryParams,
shouldStopExecution,
}).catch((error) => {
expect(error.message).toBe('Rule execution cancelled due to timeout');
});
await jest.advanceTimersByTimeAsync(15000);
await waitForPerformEsql;
expect.assertions(1);
});
it('deletes query if error happens during polling', async () => {
const mockSubmitResponse = {
id: 'QUERY-ID',
is_running: true,
columns: [],
values: [],
};
esClient.transport.request
.mockResolvedValueOnce(mockSubmitResponse)
.mockRejectedValueOnce(new Error('Test error'));
const waitForPerformEsql = performEsqlRequest({
esClient,
requestBody,
requestQueryParams: {},
shouldStopExecution,
}).catch((error) => {
expect(error.message).toBe('Test error');
});
await jest.advanceTimersByTimeAsync(15000);
await waitForPerformEsql;
expect(esClient.transport.request).toHaveBeenCalledWith({
method: 'DELETE',
path: '/_query/async/QUERY-ID',
});
expect.assertions(2);
});
});

View file

@ -5,14 +5,35 @@
* 2.0.
*/
import type { Logger, ElasticsearchClient } from '@kbn/core/server';
import { performance } from 'perf_hooks';
import type { ElasticsearchClient } from '@kbn/core/server';
import { getKbnServerError } from '@kbn/kibana-utils-plugin/server';
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import type { IRuleExecutionLogForExecutors } from '../../rule_monitoring';
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
import { logEsqlRequest } from '../utils/logged_requests';
import * as i18n from '../translations';
const setLatestRequestDuration = (
startTime: number,
loggedRequests: RulePreviewLoggedRequest[] | undefined
) => {
const duration = performance.now() - startTime;
if (loggedRequests && loggedRequests?.[loggedRequests.length - 1]) {
loggedRequests[loggedRequests.length - 1].duration = Math.round(duration);
}
};
export interface EsqlResultColumn {
name: string;
type: 'date' | 'keyword';
}
type AsyncEsqlResponse = {
id: string;
is_running: boolean;
} & EsqlTable;
export type EsqlResultRow = Array<string | null>;
export interface EsqlTable {
@ -24,29 +45,95 @@ export const performEsqlRequest = async ({
esClient,
requestBody,
requestQueryParams,
ruleExecutionLogger,
shouldStopExecution,
loggedRequests,
}: {
logger?: Logger;
ruleExecutionLogger?: IRuleExecutionLogForExecutors;
esClient: ElasticsearchClient;
requestBody: Record<string, unknown>;
requestQueryParams?: { drop_null_columns?: boolean };
}): Promise<EsqlTable> => {
const search = async () => {
try {
const rawResponse = await esClient.transport.request<EsqlTable>({
method: 'POST',
path: '/_query',
body: requestBody,
querystring: requestQueryParams,
});
return {
rawResponse,
isPartial: false,
isRunning: false,
};
} catch (e) {
throw getKbnServerError(e);
}
requestBody: {
query: string;
filter: QueryDslQueryContainer;
};
requestQueryParams?: {
drop_null_columns?: boolean;
};
shouldStopExecution: () => boolean;
loggedRequests?: RulePreviewLoggedRequest[];
}): Promise<EsqlTable> => {
let pollInterval = 10 * 1000; // Poll every 10 seconds
let pollCount = 0;
let queryId: string = '';
return (await search()).rawResponse;
try {
loggedRequests?.push({
request: logEsqlRequest(requestBody, requestQueryParams),
description: i18n.ESQL_SEARCH_REQUEST_DESCRIPTION,
});
const asyncSearchStarted = performance.now();
const asyncEsqlResponse = await esClient.transport.request<AsyncEsqlResponse>({
method: 'POST',
path: '/_query/async',
body: requestBody,
querystring: requestQueryParams,
});
setLatestRequestDuration(asyncSearchStarted, loggedRequests);
queryId = asyncEsqlResponse.id;
const isRunning = asyncEsqlResponse.is_running;
if (!isRunning) {
return asyncEsqlResponse;
}
// Poll for long-executing query
while (true) {
await new Promise((resolve) => setTimeout(resolve, pollInterval));
loggedRequests?.push({
request: `GET /_query/async/${queryId}`,
description: i18n.ESQL_POLL_REQUEST_DESCRIPTION,
});
const pollStarted = performance.now();
const pollResponse = await esClient.transport.request<AsyncEsqlResponse>({
method: 'GET',
path: `/_query/async/${queryId}`,
});
setLatestRequestDuration(pollStarted, loggedRequests);
if (!pollResponse.is_running) {
return pollResponse;
}
pollCount++;
if (pollCount > 60) {
pollInterval = 60 * 1000; // Increase the poll interval after 10m
}
const isCancelled = shouldStopExecution(); // Execution will be cancelled if rule times out
ruleExecutionLogger?.debug(`Polling for query ID: ${queryId}, isCancelled: ${isCancelled}`);
if (isCancelled) {
throw new Error('Rule execution cancelled due to timeout');
}
ruleExecutionLogger?.debug(`Query is still running for query ID: ${queryId}`);
}
} catch (error) {
ruleExecutionLogger?.error(`Error while performing ES|QL search: ${error?.message}`);
throw getKbnServerError(error);
} finally {
if (queryId) {
loggedRequests?.push({
request: `DELETE /_query/async/${queryId}`,
description: i18n.ESQL_DELETE_REQUEST_DESCRIPTION,
});
const deleteStarted = performance.now();
await esClient.transport.request({
method: 'DELETE',
path: `/_query/async/${queryId}`,
});
setLatestRequestDuration(deleteStarted, loggedRequests);
}
}
};

View file

@ -14,6 +14,20 @@ export const ESQL_SEARCH_REQUEST_DESCRIPTION = i18n.translate(
}
);
export const ESQL_POLL_REQUEST_DESCRIPTION = i18n.translate(
'xpack.securitySolution.detectionEngine.esqlRuleType.esqlPollRequestDescription',
{
defaultMessage: 'ES|QL request to poll for async search results',
}
);
export const ESQL_DELETE_REQUEST_DESCRIPTION = i18n.translate(
'xpack.securitySolution.detectionEngine.esqlRuleType.esqlDeleteRequestDescription',
{
defaultMessage: 'ES|QL request to delete async search query',
}
);
export const FIND_SOURCE_DOCUMENTS_REQUEST_DESCRIPTION = i18n.translate(
'xpack.securitySolution.detectionEngine.esqlRuleType.findSourceDocumentsRequestDescription',
{

View file

@ -24,5 +24,9 @@ export const logEsqlRequest = (
}, [])
.join('&');
return `POST _query${urlParams ? `?${urlParams}` : ''}\n${JSON.stringify(requestBody, null, 2)}`;
return `POST _query/async${urlParams ? `?${urlParams}` : ''}\n${JSON.stringify(
requestBody,
null,
2
)}`;
};