mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
# Backport This will backport the following commits from `main` to `8.18`: - [[Security Solution][Detection Engine] adds async ES|QL query (#216667)](https://github.com/elastic/kibana/pull/216667) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Vitalii Dmyterko","email":"92328789+vitaliidm@users.noreply.github.com"},"sourceCommit":{"committedDate":"2025-04-17T14:23:07Z","message":"[Security Solution][Detection Engine] adds async ES|QL query (#216667)\n\n## Summary\n\n- addresses https://github.com/elastic/security-team/issues/11116 (list\nitem 2)\n\nIntroducing async query would allow to overcome ES request timeout for\nlong running rules and queries.\n\nTimeout for ES request is [defined in alerting\nframework](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_es_request_timeout.ts#L21)\nand is smaller value out of rule execution timeout or default ES request\ntimeout(which is 5m and hardcoded\n[here](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_rule_task_timeout.ts)).\n\nIf ES|QL rule performs a single long-running ES query, it can time out\nafter 5m due to this ES request timeout. This value can't be changed,\nunlike rule execution timeout. It can be overwritten in Kibana config\n\n```\nxpack.alerting.rules.run:\n timeout: '10m'\n ruleTypeOverrides:\n - id: 'siem.esqlRule'\n timeout: '15m'\n```\nSo, we can encounter situations when rule fails execution after 5m due\nto ES request timeout, despite a fact it configured with longer timeout\nof 15m\n\nBy using async query, we can overcome this limitation and can poll async\nquery results until it completes or rule timeouts\n\nMore details in internal\n[issue](https://github.com/elastic/sdh-security-team/issues/1224)\n\n---------\n\nCo-authored-by: Ryland Herrick <ryalnd@gmail.com>","sha":"3d7aac1a443092ebdbc20fbd9345d373bcb16c48","branchLabelMapping":{"^v9.1.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team: SecuritySolution","Team:Detection Engine","backport:version","v9.1.0","v8.19.0","v8.18.1","v9.0.1"],"title":"[Security Solution][Detection Engine] adds async ES|QL query","number":216667,"url":"https://github.com/elastic/kibana/pull/216667","mergeCommit":{"message":"[Security Solution][Detection Engine] adds async ES|QL query (#216667)\n\n## Summary\n\n- addresses https://github.com/elastic/security-team/issues/11116 (list\nitem 2)\n\nIntroducing async query would allow to overcome ES request timeout for\nlong running rules and queries.\n\nTimeout for ES request is [defined in alerting\nframework](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_es_request_timeout.ts#L21)\nand is smaller value out of rule execution timeout or default ES request\ntimeout(which is 5m and hardcoded\n[here](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_rule_task_timeout.ts)).\n\nIf ES|QL rule performs a single long-running ES query, it can time out\nafter 5m due to this ES request timeout. This value can't be changed,\nunlike rule execution timeout. It can be overwritten in Kibana config\n\n```\nxpack.alerting.rules.run:\n timeout: '10m'\n ruleTypeOverrides:\n - id: 'siem.esqlRule'\n timeout: '15m'\n```\nSo, we can encounter situations when rule fails execution after 5m due\nto ES request timeout, despite a fact it configured with longer timeout\nof 15m\n\nBy using async query, we can overcome this limitation and can poll async\nquery results until it completes or rule timeouts\n\nMore details in internal\n[issue](https://github.com/elastic/sdh-security-team/issues/1224)\n\n---------\n\nCo-authored-by: Ryland Herrick <ryalnd@gmail.com>","sha":"3d7aac1a443092ebdbc20fbd9345d373bcb16c48"}},"sourceBranch":"main","suggestedTargetBranches":["8.18","9.0"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/216667","number":216667,"mergeCommit":{"message":"[Security Solution][Detection Engine] adds async ES|QL query (#216667)\n\n## Summary\n\n- addresses https://github.com/elastic/security-team/issues/11116 (list\nitem 2)\n\nIntroducing async query would allow to overcome ES request timeout for\nlong running rules and queries.\n\nTimeout for ES request is [defined in alerting\nframework](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_es_request_timeout.ts#L21)\nand is smaller value out of rule execution timeout or default ES request\ntimeout(which is 5m and hardcoded\n[here](https://github.com/elastic/kibana/blob/8.18/x-pack/platform/plugins/shared/alerting/server/lib/get_rule_task_timeout.ts)).\n\nIf ES|QL rule performs a single long-running ES query, it can time out\nafter 5m due to this ES request timeout. This value can't be changed,\nunlike rule execution timeout. It can be overwritten in Kibana config\n\n```\nxpack.alerting.rules.run:\n timeout: '10m'\n ruleTypeOverrides:\n - id: 'siem.esqlRule'\n timeout: '15m'\n```\nSo, we can encounter situations when rule fails execution after 5m due\nto ES request timeout, despite a fact it configured with longer timeout\nof 15m\n\nBy using async query, we can overcome this limitation and can poll async\nquery results until it completes or rule timeouts\n\nMore details in internal\n[issue](https://github.com/elastic/sdh-security-team/issues/1224)\n\n---------\n\nCo-authored-by: Ryland Herrick <ryalnd@gmail.com>","sha":"3d7aac1a443092ebdbc20fbd9345d373bcb16c48"}},{"branch":"8.19","label":"v8.19.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"url":"https://github.com/elastic/kibana/pull/218567","number":218567,"state":"OPEN"},{"branch":"8.18","label":"v8.18.1","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"},{"branch":"9.0","label":"v9.0.1","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
3a0e234b20
commit
d8e5155c84
12 changed files with 450 additions and 39 deletions
|
@ -606,6 +606,66 @@ describe('wrapScopedClusterClient', () => {
|
|||
);
|
||||
expect(logger.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('throws error when es|ql async search throws abort error', async () => {
|
||||
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
|
||||
|
||||
abortController.abort();
|
||||
childClient.transport.request.mockRejectedValueOnce(
|
||||
new Error('Request has been aborted by the user')
|
||||
);
|
||||
|
||||
const abortableSearchClient = createWrappedScopedClusterClientFactory({
|
||||
scopedClusterClient,
|
||||
rule,
|
||||
logger,
|
||||
abortController,
|
||||
}).client();
|
||||
|
||||
await expect(
|
||||
abortableSearchClient.asInternalUser.transport.request({
|
||||
method: 'POST',
|
||||
path: '/_query/async',
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(
|
||||
`"ES|QL search has been aborted due to cancelled execution"`
|
||||
);
|
||||
|
||||
expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
|
||||
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"POST\",\"path\":\"/_query/async\"} - with options {}`
|
||||
);
|
||||
expect(logger.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('throws error when es|ql async query poll throws abort error', async () => {
|
||||
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
|
||||
|
||||
abortController.abort();
|
||||
childClient.transport.request.mockRejectedValueOnce(
|
||||
new Error('Request has been aborted by the user')
|
||||
);
|
||||
|
||||
const abortableSearchClient = createWrappedScopedClusterClientFactory({
|
||||
scopedClusterClient,
|
||||
rule,
|
||||
logger,
|
||||
abortController,
|
||||
}).client();
|
||||
|
||||
await expect(
|
||||
abortableSearchClient.asInternalUser.transport.request({
|
||||
method: 'GET',
|
||||
path: '/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3',
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(
|
||||
`"ES|QL search has been aborted due to cancelled execution"`
|
||||
);
|
||||
|
||||
expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
|
||||
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"GET\",\"path\":\"/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3\"} - with options {}`
|
||||
);
|
||||
expect(logger.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
test('uses asInternalUser when specified', async () => {
|
||||
|
|
|
@ -162,7 +162,10 @@ function getWrappedTransportRequestFn(opts: WrapEsClientOpts) {
|
|||
options?: TransportRequestOptions
|
||||
): Promise<TResponse | TransportResult<TResponse, TContext>> {
|
||||
// Wrap ES|QL requests with an abort signal
|
||||
if (params.method === 'POST' && params.path === '/_query') {
|
||||
if (
|
||||
(params.method === 'POST' && ['/_query', '/_query/async'].includes(params.path)) ||
|
||||
(params.method === 'GET' && params.path.startsWith('/_query/async'))
|
||||
) {
|
||||
let requestOptions: TransportRequestOptions = {};
|
||||
try {
|
||||
requestOptions = options ?? {};
|
||||
|
|
|
@ -211,6 +211,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -319,6 +320,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -492,6 +494,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -596,6 +599,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -738,6 +742,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -853,6 +858,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -969,6 +975,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -1075,6 +1082,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -1181,6 +1189,7 @@ describe('RuleTypeRunner', () => {
|
|||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith({
|
||||
executionId: 'abc',
|
||||
ruleExecutionTimeout: '5m',
|
||||
services: {
|
||||
alertFactory: alertsClient.factory(),
|
||||
alertsClient: alertsClient.client(),
|
||||
|
@ -1247,6 +1256,47 @@ describe('RuleTypeRunner', () => {
|
|||
shouldLogAlerts: true,
|
||||
});
|
||||
});
|
||||
|
||||
test('should call executor with custom ruleExecutionTimeout', async () => {
|
||||
const mockRuleExecutionTimeout = '15m';
|
||||
|
||||
await ruleTypeRunner.run({
|
||||
context: {
|
||||
alertingEventLogger,
|
||||
flappingSettings: DEFAULT_FLAPPING_SETTINGS,
|
||||
queryDelaySec: 0,
|
||||
request: fakeRequest,
|
||||
maintenanceWindowsService,
|
||||
ruleId: RULE_ID,
|
||||
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
|
||||
ruleRunMetricsStore,
|
||||
spaceId: 'default',
|
||||
isServerless: false,
|
||||
},
|
||||
alertsClient,
|
||||
executionId: 'abc',
|
||||
executorServices: {
|
||||
getDataViews,
|
||||
ruleMonitoringService: publicRuleMonitoringService,
|
||||
ruleResultService: publicRuleResultService,
|
||||
savedObjectsClient,
|
||||
uiSettingsClient,
|
||||
wrappedScopedClusterClient,
|
||||
getWrappedSearchSourceClient,
|
||||
},
|
||||
rule: mockedRule,
|
||||
ruleType: { ...ruleType, ruleTaskTimeout: mockRuleExecutionTimeout },
|
||||
startedAt: new Date(DATE_1970),
|
||||
state: mockTaskInstance().state,
|
||||
validatedParams: mockedRuleParams,
|
||||
});
|
||||
|
||||
expect(ruleType.executor).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
ruleExecutionTimeout: mockRuleExecutionTimeout,
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -286,6 +286,7 @@ export class RuleTypeRunner<
|
|||
...(startedAtOverridden ? { forceNow: startedAt } : {}),
|
||||
}),
|
||||
isServerless: context.isServerless,
|
||||
ruleExecutionTimeout: ruleType.ruleTaskTimeout,
|
||||
})
|
||||
)
|
||||
);
|
||||
|
|
|
@ -134,6 +134,7 @@ export interface RuleExecutorOptions<
|
|||
flappingSettings: RulesSettingsFlappingProperties;
|
||||
getTimeRange: (timeWindow?: string) => GetTimeRangeResult;
|
||||
isServerless: boolean;
|
||||
ruleExecutionTimeout?: string;
|
||||
}
|
||||
|
||||
export interface RuleParamsAndRefs<Params extends RuleTypeParams> {
|
||||
|
|
|
@ -293,7 +293,7 @@ export const previewRulesRoute = (
|
|||
rule,
|
||||
services: {
|
||||
shouldWriteAlerts,
|
||||
shouldStopExecution: () => false,
|
||||
shouldStopExecution: () => isAborted,
|
||||
alertsClient: null,
|
||||
alertFactory,
|
||||
savedObjectsClient: coreContext.savedObjects.client,
|
||||
|
@ -322,6 +322,7 @@ export const previewRulesRoute = (
|
|||
return { dateStart: date, dateEnd: date };
|
||||
},
|
||||
isServerless,
|
||||
ruleExecutionTimeout: `${PREVIEW_TIMEOUT_SECONDS}s`,
|
||||
})) as { state: TState; loggedRequests: RulePreviewLoggedRequest[] });
|
||||
|
||||
const errors = loggedStatusChanges
|
||||
|
|
|
@ -23,6 +23,7 @@ export interface BuildEqlSearchRequestParams {
|
|||
primaryTimestamp: TimestampOverride;
|
||||
secondaryTimestamp: TimestampOverride | undefined;
|
||||
exceptionFilter: Filter | undefined;
|
||||
ruleExecutionTimeout: string | undefined;
|
||||
}
|
||||
|
||||
export const buildEsqlSearchRequest = ({
|
||||
|
@ -34,6 +35,7 @@ export const buildEsqlSearchRequest = ({
|
|||
secondaryTimestamp,
|
||||
exceptionFilter,
|
||||
size,
|
||||
ruleExecutionTimeout,
|
||||
}: BuildEqlSearchRequestParams) => {
|
||||
const esFilter = getQueryFilter({
|
||||
query: '',
|
||||
|
@ -61,5 +63,7 @@ export const buildEsqlSearchRequest = ({
|
|||
filter: requestFilter,
|
||||
},
|
||||
},
|
||||
wait_for_completion_timeout: '4m', // hard limit request timeout is 5m set by ES proxy and alerting framework. So, we should be fine to wait 4m for async query completion. If rule execution is shorter than 4m and query was not completed, it will be aborted.
|
||||
...(ruleExecutionTimeout ? { keep_alive: ruleExecutionTimeout } : {}),
|
||||
};
|
||||
};
|
||||
|
|
|
@ -28,11 +28,9 @@ import { rowToDocument, mergeEsqlResultInSource } from './utils';
|
|||
import { fetchSourceDocuments } from './fetch_source_documents';
|
||||
import { buildReasonMessageForEsqlAlert } from '../utils/reason_formatters';
|
||||
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
|
||||
import type { CreateRuleOptions, RunOpts, SignalSource } from '../types';
|
||||
import { logEsqlRequest } from '../utils/logged_requests';
|
||||
import type { CreateRuleOptions, SignalSource, RunOpts } from '../types';
|
||||
import { getDataTierFilter } from '../utils/get_data_tier_filter';
|
||||
import { checkErrorDetails } from '../utils/check_error_details';
|
||||
import * as i18n from '../translations';
|
||||
|
||||
import {
|
||||
addToSearchAfterReturn,
|
||||
|
@ -69,6 +67,7 @@ export const esqlExecutor = async ({
|
|||
experimentalFeatures,
|
||||
licensing,
|
||||
scheduleNotificationResponseActionsService,
|
||||
ruleExecutionTimeout,
|
||||
}: {
|
||||
runOpts: RunOpts<EsqlRuleParams>;
|
||||
services: RuleExecutorServices<AlertInstanceState, AlertInstanceContext, 'default'>;
|
||||
|
@ -78,6 +77,7 @@ export const esqlExecutor = async ({
|
|||
experimentalFeatures: ExperimentalFeatures;
|
||||
licensing: LicensingPluginSetup;
|
||||
scheduleNotificationResponseActionsService: CreateRuleOptions['scheduleNotificationResponseActionsService'];
|
||||
ruleExecutionTimeout?: string;
|
||||
}) => {
|
||||
const loggedRequests: RulePreviewLoggedRequest[] = [];
|
||||
const ruleParams = completeRule.ruleParams;
|
||||
|
@ -110,16 +110,10 @@ export const esqlExecutor = async ({
|
|||
primaryTimestamp,
|
||||
secondaryTimestamp,
|
||||
exceptionFilter,
|
||||
ruleExecutionTimeout,
|
||||
});
|
||||
const esqlQueryString = { drop_null_columns: true };
|
||||
|
||||
if (isLoggedRequestsEnabled) {
|
||||
loggedRequests.push({
|
||||
request: logEsqlRequest(esqlRequest, esqlQueryString),
|
||||
description: i18n.ESQL_SEARCH_REQUEST_DESCRIPTION,
|
||||
});
|
||||
}
|
||||
|
||||
ruleExecutionLogger.debug(`ES|QL query request: ${JSON.stringify(esqlRequest)}`);
|
||||
const exceptionsWarning = getUnprocessedExceptionsWarnings(unprocessedExceptions);
|
||||
if (exceptionsWarning) {
|
||||
|
@ -132,15 +126,14 @@ export const esqlExecutor = async ({
|
|||
esClient: services.scopedClusterClient.asCurrentUser,
|
||||
requestBody: esqlRequest,
|
||||
requestQueryParams: esqlQueryString,
|
||||
shouldStopExecution: services.shouldStopExecution,
|
||||
ruleExecutionLogger,
|
||||
loggedRequests: isLoggedRequestsEnabled ? loggedRequests : undefined,
|
||||
});
|
||||
|
||||
const esqlSearchDuration = performance.now() - esqlSignalSearchStart;
|
||||
result.searchAfterTimes.push(makeFloatString(esqlSearchDuration));
|
||||
|
||||
if (isLoggedRequestsEnabled && loggedRequests[0]) {
|
||||
loggedRequests[0].duration = Math.round(esqlSearchDuration);
|
||||
}
|
||||
|
||||
ruleExecutionLogger.debug(`ES|QL query request took: ${esqlSearchDuration}ms`);
|
||||
|
||||
const isRuleAggregating = computeIsESQLQueryAggregating(completeRule.ruleParams.query);
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { performEsqlRequest } from './esql_request';
|
||||
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
|
||||
|
||||
const columns = [
|
||||
{ name: '_id', type: 'keyword' as const },
|
||||
{ name: 'agent.name', type: 'keyword' as const },
|
||||
{ name: 'agent.version', type: 'keyword' as const },
|
||||
{ name: 'agent.type', type: 'keyword' as const },
|
||||
];
|
||||
const values = [['doc-id', 'agent-name', '8.8.1', 'packetbeat']];
|
||||
|
||||
const requestBody = {
|
||||
query: 'from test* METADATA _id | limit 101',
|
||||
filter: {
|
||||
bool: {
|
||||
filter: [
|
||||
{
|
||||
range: {
|
||||
'@timestamp': {
|
||||
lte: '2025-04-02T10:13:52.235Z',
|
||||
gte: '2013-11-04T16:13:52.235Z',
|
||||
format: 'strict_date_optional_time',
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
bool: {
|
||||
must: [],
|
||||
filter: [],
|
||||
should: [],
|
||||
must_not: [],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
wait_for_completion_timeout: '4m',
|
||||
};
|
||||
const requestQueryParams = { drop_null_columns: true };
|
||||
|
||||
describe('performEsqlRequest', () => {
|
||||
const esClient = elasticsearchServiceMock.createElasticsearchClient();
|
||||
const shouldStopExecution: jest.Mock = jest.fn();
|
||||
shouldStopExecution.mockReturnValue(false);
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.useFakeTimers();
|
||||
});
|
||||
|
||||
it('returns results immediately when the async query completed', async () => {
|
||||
const mockResponse = {
|
||||
id: 'QUERY-ID',
|
||||
is_running: false,
|
||||
columns,
|
||||
values,
|
||||
};
|
||||
|
||||
esClient.transport.request.mockResolvedValueOnce(mockResponse);
|
||||
|
||||
const result = await performEsqlRequest({
|
||||
esClient,
|
||||
requestBody,
|
||||
requestQueryParams,
|
||||
shouldStopExecution,
|
||||
});
|
||||
|
||||
expect(result).toEqual(mockResponse);
|
||||
expect(esClient.transport.request).toHaveBeenCalledTimes(2);
|
||||
expect(esClient.transport.request).toHaveBeenCalledWith({
|
||||
method: 'POST',
|
||||
path: '/_query/async',
|
||||
body: requestBody,
|
||||
querystring: requestQueryParams,
|
||||
});
|
||||
expect(esClient.transport.request).toHaveBeenCalledWith({
|
||||
method: 'DELETE',
|
||||
path: '/_query/async/QUERY-ID',
|
||||
});
|
||||
});
|
||||
|
||||
it('polls until the query is completed', async () => {
|
||||
const mockSubmitResponse = {
|
||||
id: 'QUERY-ID',
|
||||
is_running: true,
|
||||
columns: [],
|
||||
values: [],
|
||||
};
|
||||
|
||||
const mockPollResponse = {
|
||||
...mockSubmitResponse,
|
||||
is_running: false,
|
||||
columns,
|
||||
values,
|
||||
};
|
||||
|
||||
esClient.transport.request
|
||||
.mockResolvedValueOnce(mockSubmitResponse)
|
||||
.mockResolvedValueOnce(mockPollResponse);
|
||||
|
||||
const waitForPerformEsql = performEsqlRequest({
|
||||
esClient,
|
||||
requestBody,
|
||||
requestQueryParams,
|
||||
shouldStopExecution,
|
||||
});
|
||||
|
||||
await jest.advanceTimersByTimeAsync(15000);
|
||||
|
||||
const result = await waitForPerformEsql;
|
||||
|
||||
expect(result).toEqual(mockPollResponse);
|
||||
expect(esClient.transport.request).toHaveBeenCalledTimes(3);
|
||||
expect(esClient.transport.request).toHaveBeenNthCalledWith(1, {
|
||||
method: 'POST',
|
||||
path: '/_query/async',
|
||||
body: requestBody,
|
||||
querystring: requestQueryParams,
|
||||
});
|
||||
expect(esClient.transport.request).toHaveBeenNthCalledWith(2, {
|
||||
method: 'GET',
|
||||
path: '/_query/async/QUERY-ID',
|
||||
});
|
||||
expect(esClient.transport.request).toHaveBeenCalledWith({
|
||||
method: 'DELETE',
|
||||
path: '/_query/async/QUERY-ID',
|
||||
});
|
||||
});
|
||||
|
||||
it('throws an error if execution is cancelled', async () => {
|
||||
const mockSubmitResponse = {
|
||||
id: 'QUERY-ID',
|
||||
is_running: true,
|
||||
columns: [],
|
||||
values: [],
|
||||
};
|
||||
|
||||
esClient.transport.request.mockResolvedValue(mockSubmitResponse);
|
||||
shouldStopExecution.mockReturnValue(true);
|
||||
|
||||
const waitForPerformEsql = performEsqlRequest({
|
||||
esClient,
|
||||
requestBody,
|
||||
requestQueryParams,
|
||||
shouldStopExecution,
|
||||
}).catch((error) => {
|
||||
expect(error.message).toBe('Rule execution cancelled due to timeout');
|
||||
});
|
||||
|
||||
await jest.advanceTimersByTimeAsync(15000);
|
||||
await waitForPerformEsql;
|
||||
expect.assertions(1);
|
||||
});
|
||||
|
||||
it('deletes query if error happens during polling', async () => {
|
||||
const mockSubmitResponse = {
|
||||
id: 'QUERY-ID',
|
||||
is_running: true,
|
||||
columns: [],
|
||||
values: [],
|
||||
};
|
||||
|
||||
esClient.transport.request
|
||||
.mockResolvedValueOnce(mockSubmitResponse)
|
||||
.mockRejectedValueOnce(new Error('Test error'));
|
||||
|
||||
const waitForPerformEsql = performEsqlRequest({
|
||||
esClient,
|
||||
requestBody,
|
||||
requestQueryParams: {},
|
||||
shouldStopExecution,
|
||||
}).catch((error) => {
|
||||
expect(error.message).toBe('Test error');
|
||||
});
|
||||
|
||||
await jest.advanceTimersByTimeAsync(15000);
|
||||
await waitForPerformEsql;
|
||||
|
||||
expect(esClient.transport.request).toHaveBeenCalledWith({
|
||||
method: 'DELETE',
|
||||
path: '/_query/async/QUERY-ID',
|
||||
});
|
||||
|
||||
expect.assertions(2);
|
||||
});
|
||||
});
|
|
@ -5,14 +5,35 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { Logger, ElasticsearchClient } from '@kbn/core/server';
|
||||
import { performance } from 'perf_hooks';
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { getKbnServerError } from '@kbn/kibana-utils-plugin/server';
|
||||
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { IRuleExecutionLogForExecutors } from '../../rule_monitoring';
|
||||
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
|
||||
import { logEsqlRequest } from '../utils/logged_requests';
|
||||
import * as i18n from '../translations';
|
||||
|
||||
const setLatestRequestDuration = (
|
||||
startTime: number,
|
||||
loggedRequests: RulePreviewLoggedRequest[] | undefined
|
||||
) => {
|
||||
const duration = performance.now() - startTime;
|
||||
if (loggedRequests && loggedRequests?.[loggedRequests.length - 1]) {
|
||||
loggedRequests[loggedRequests.length - 1].duration = Math.round(duration);
|
||||
}
|
||||
};
|
||||
|
||||
export interface EsqlResultColumn {
|
||||
name: string;
|
||||
type: 'date' | 'keyword';
|
||||
}
|
||||
|
||||
type AsyncEsqlResponse = {
|
||||
id: string;
|
||||
is_running: boolean;
|
||||
} & EsqlTable;
|
||||
|
||||
export type EsqlResultRow = Array<string | null>;
|
||||
|
||||
export interface EsqlTable {
|
||||
|
@ -24,29 +45,95 @@ export const performEsqlRequest = async ({
|
|||
esClient,
|
||||
requestBody,
|
||||
requestQueryParams,
|
||||
ruleExecutionLogger,
|
||||
shouldStopExecution,
|
||||
loggedRequests,
|
||||
}: {
|
||||
logger?: Logger;
|
||||
ruleExecutionLogger?: IRuleExecutionLogForExecutors;
|
||||
esClient: ElasticsearchClient;
|
||||
requestBody: Record<string, unknown>;
|
||||
requestQueryParams?: { drop_null_columns?: boolean };
|
||||
}): Promise<EsqlTable> => {
|
||||
const search = async () => {
|
||||
try {
|
||||
const rawResponse = await esClient.transport.request<EsqlTable>({
|
||||
method: 'POST',
|
||||
path: '/_query',
|
||||
body: requestBody,
|
||||
querystring: requestQueryParams,
|
||||
});
|
||||
return {
|
||||
rawResponse,
|
||||
isPartial: false,
|
||||
isRunning: false,
|
||||
};
|
||||
} catch (e) {
|
||||
throw getKbnServerError(e);
|
||||
}
|
||||
requestBody: {
|
||||
query: string;
|
||||
filter: QueryDslQueryContainer;
|
||||
};
|
||||
requestQueryParams?: {
|
||||
drop_null_columns?: boolean;
|
||||
};
|
||||
shouldStopExecution: () => boolean;
|
||||
loggedRequests?: RulePreviewLoggedRequest[];
|
||||
}): Promise<EsqlTable> => {
|
||||
let pollInterval = 10 * 1000; // Poll every 10 seconds
|
||||
let pollCount = 0;
|
||||
let queryId: string = '';
|
||||
|
||||
return (await search()).rawResponse;
|
||||
try {
|
||||
loggedRequests?.push({
|
||||
request: logEsqlRequest(requestBody, requestQueryParams),
|
||||
description: i18n.ESQL_SEARCH_REQUEST_DESCRIPTION,
|
||||
});
|
||||
const asyncSearchStarted = performance.now();
|
||||
const asyncEsqlResponse = await esClient.transport.request<AsyncEsqlResponse>({
|
||||
method: 'POST',
|
||||
path: '/_query/async',
|
||||
body: requestBody,
|
||||
querystring: requestQueryParams,
|
||||
});
|
||||
setLatestRequestDuration(asyncSearchStarted, loggedRequests);
|
||||
|
||||
queryId = asyncEsqlResponse.id;
|
||||
const isRunning = asyncEsqlResponse.is_running;
|
||||
|
||||
if (!isRunning) {
|
||||
return asyncEsqlResponse;
|
||||
}
|
||||
|
||||
// Poll for long-executing query
|
||||
while (true) {
|
||||
await new Promise((resolve) => setTimeout(resolve, pollInterval));
|
||||
|
||||
loggedRequests?.push({
|
||||
request: `GET /_query/async/${queryId}`,
|
||||
description: i18n.ESQL_POLL_REQUEST_DESCRIPTION,
|
||||
});
|
||||
const pollStarted = performance.now();
|
||||
const pollResponse = await esClient.transport.request<AsyncEsqlResponse>({
|
||||
method: 'GET',
|
||||
path: `/_query/async/${queryId}`,
|
||||
});
|
||||
setLatestRequestDuration(pollStarted, loggedRequests);
|
||||
|
||||
if (!pollResponse.is_running) {
|
||||
return pollResponse;
|
||||
}
|
||||
|
||||
pollCount++;
|
||||
|
||||
if (pollCount > 60) {
|
||||
pollInterval = 60 * 1000; // Increase the poll interval after 10m
|
||||
}
|
||||
|
||||
const isCancelled = shouldStopExecution(); // Execution will be cancelled if rule times out
|
||||
ruleExecutionLogger?.debug(`Polling for query ID: ${queryId}, isCancelled: ${isCancelled}`);
|
||||
|
||||
if (isCancelled) {
|
||||
throw new Error('Rule execution cancelled due to timeout');
|
||||
}
|
||||
ruleExecutionLogger?.debug(`Query is still running for query ID: ${queryId}`);
|
||||
}
|
||||
} catch (error) {
|
||||
ruleExecutionLogger?.error(`Error while performing ES|QL search: ${error?.message}`);
|
||||
throw getKbnServerError(error);
|
||||
} finally {
|
||||
if (queryId) {
|
||||
loggedRequests?.push({
|
||||
request: `DELETE /_query/async/${queryId}`,
|
||||
description: i18n.ESQL_DELETE_REQUEST_DESCRIPTION,
|
||||
});
|
||||
const deleteStarted = performance.now();
|
||||
await esClient.transport.request({
|
||||
method: 'DELETE',
|
||||
path: `/_query/async/${queryId}`,
|
||||
});
|
||||
setLatestRequestDuration(deleteStarted, loggedRequests);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -14,6 +14,20 @@ export const ESQL_SEARCH_REQUEST_DESCRIPTION = i18n.translate(
|
|||
}
|
||||
);
|
||||
|
||||
export const ESQL_POLL_REQUEST_DESCRIPTION = i18n.translate(
|
||||
'xpack.securitySolution.detectionEngine.esqlRuleType.esqlPollRequestDescription',
|
||||
{
|
||||
defaultMessage: 'ES|QL request to poll for async search results',
|
||||
}
|
||||
);
|
||||
|
||||
export const ESQL_DELETE_REQUEST_DESCRIPTION = i18n.translate(
|
||||
'xpack.securitySolution.detectionEngine.esqlRuleType.esqlDeleteRequestDescription',
|
||||
{
|
||||
defaultMessage: 'ES|QL request to delete async search query',
|
||||
}
|
||||
);
|
||||
|
||||
export const FIND_SOURCE_DOCUMENTS_REQUEST_DESCRIPTION = i18n.translate(
|
||||
'xpack.securitySolution.detectionEngine.esqlRuleType.findSourceDocumentsRequestDescription',
|
||||
{
|
||||
|
|
|
@ -24,5 +24,9 @@ export const logEsqlRequest = (
|
|||
}, [])
|
||||
.join('&');
|
||||
|
||||
return `POST _query${urlParams ? `?${urlParams}` : ''}\n${JSON.stringify(requestBody, null, 2)}`;
|
||||
return `POST _query/async${urlParams ? `?${urlParams}` : ''}\n${JSON.stringify(
|
||||
requestBody,
|
||||
null,
|
||||
2
|
||||
)}`;
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue