Cancel EQL and ES|QL queries when an alerting rule times out from running (#171348)

## Problem Statement
I noticed when investigating OOM errors in serverless that we have some
large EQL queries being returned by Elasticsearch alongside a series of
logs mentioning the rule execution timed out. I later on noticed that
EQL queries are not cancelled in that scenario, causing task
cancellation to not work properly and have Kibana handle responses of
tasks it stopped running.

## Solution
To solve this issue, I'm enhancing the ES Client provided to rule type
executors so that it makes EQL (and also ES|QL while I'm in the area)
cancel queries whenever the rule execution times out by providing the
`abortController`'s signal that we trigger when a rule times out.

Additional changes:
- Adding total search duration and es search duration metrics to EQL
queries.
- Adding total search duration metrics to ES|QL queries (`took` isn't
returned for these types of queries, so we don't know how much ES
processing occurred).
- Moving some code in the jest test into a function so my additional
tests don't add too much code

## To verify
1. Create an ES|QL alerting rule using ES Query rule type
2. Create an EQL rule in security solution
  - Index pattern: '.kibana-event-log*`
  - EQL: `process where event.kind == "execute"`
3. Notice rules are running successfully
4. Apply the following diff to your codebase. This diff will abort the
queries shortly after they are sent to Elasticsearch. This is the
easiest approach to ensure we are passing the signal properly through.
Unfortunately it is not easy to write queries that would take a while to
run for testing purposes nor was it easy to find a mechanism to flood
the queue so the test query would timeout to have a full end to end
test.
<details>
<summary>Click here to expand</summary>

```
diff --git a/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts b/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts
index 9ddd22a292b..6ae8e07630e 100644
--- a/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts
+++ b/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts
@@ -133,10 +133,15 @@ function getWrappedTransportRequestFn(opts: WrapEsClientOpts) {
             opts.rule.spaceId
           } - ${JSON.stringify(params)} - with options ${JSON.stringify(requestOptions)}`
         );
-        const result = (await originalRequestFn.call(opts.esClient.transport, params, {
+
+        const promise = originalRequestFn.call(opts.esClient.transport, params, {
           ...requestOptions,
           signal: opts.abortController.signal,
-        })) as Promise<TResponse> | TransportResult<TResponse, TContext>;
+        });
+
+        opts.abortController.abort();
+
+        const result = (await promise) as Promise<TResponse> | TransportResult<TResponse, TContext>;

         const end = Date.now();
         const durationMs = end - start;
@@ -190,10 +195,16 @@ function getWrappedEqlSearchFn(opts: WrapEsClientOpts) {
           opts.rule.spaceId
         } - ${JSON.stringify(params)} - with options ${JSON.stringify(searchOptions)}`
       );
-      const result = (await originalEqlSearch.call(opts.esClient, params, {
+      const promise = originalEqlSearch.call(opts.esClient, params, {
         ...searchOptions,
         signal: opts.abortController.signal,
-      })) as TransportResult<EqlSearchResponse<TEvent>, unknown> | EqlSearchResponse<TEvent>;
+      });
+
+      opts.abortController.abort();
+
+      const result = (await promise) as
+        | TransportResult<EqlSearchResponse<TEvent>, unknown>
+        | EqlSearchResponse<TEvent>;

       const end = Date.now();
       const durationMs = end - start;
```
</details>

5. Restart the Kibana server (if not done automatically)
6. Use the "Run rule" feature in Stack Management to trigger each
created rule to run soon.
7. Wait for the rules to run
8. Notice each rule now fails with the following error message: `search
has been aborted due to cancelled execution`. I noticed when writing
this test plan that the EQL rule marks as success in the framework but
if you "View in App" you will see the error message in security
solution.

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2023-12-28 16:10:26 -05:00 committed by GitHub
parent d67c0eff2f
commit c06a6a1a22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 473 additions and 158 deletions

View file

@ -13,6 +13,17 @@ import { createWrappedScopedClusterClientFactory } from './wrap_scoped_cluster_c
const esQuery = {
body: { query: { bool: { filter: { range: { '@timestamp': { gte: 0 } } } } } },
};
const eqlQuery = {
index: 'foo',
query: 'process where process.name == "regsvr32.exe"',
};
const esqlQueryRequest = {
method: 'POST',
path: '/_esql',
body: {
query: 'from .kibana_task_manager',
},
};
const logger = loggingSystemMock.create().get();
@ -36,180 +47,358 @@ describe('wrapScopedClusterClient', () => {
jest.resetAllMocks();
});
test('searches with asInternalUser when specified', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
describe('search', () => {
test('uses asInternalUser when specified', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
scopedClusterClient.asInternalUser.child.mockReturnValue(childClient as unknown as Client);
const asInternalUserWrappedSearchFn = childClient.search;
const asInternalUserWrappedSearchFn = childClient.search;
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await wrappedSearchClient.asInternalUser.search(esQuery);
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await wrappedSearchClient.asInternalUser.search(esQuery);
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledWith(esQuery, {
signal: abortController.signal,
});
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});
test('searches with asCurrentUser when specified', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
scopedClusterClient.asCurrentUser.child.mockReturnValue(childClient as unknown as Client);
const asCurrentUserWrappedSearchFn = childClient.search;
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await wrappedSearchClient.asCurrentUser.search(esQuery);
expect(asCurrentUserWrappedSearchFn).toHaveBeenCalledWith(esQuery, {
signal: abortController.signal,
});
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});
test('uses search options when specified', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
scopedClusterClient.asInternalUser.child.mockReturnValue(childClient as unknown as Client);
const asInternalUserWrappedSearchFn = childClient.search;
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await wrappedSearchClient.asInternalUser.search(esQuery, { ignore: [404] });
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledWith(esQuery, {
ignore: [404],
signal: abortController.signal,
});
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});
test('re-throws error when search throws error', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
scopedClusterClient.asInternalUser.child.mockReturnValue(childClient as unknown as Client);
const asInternalUserWrappedSearchFn = childClient.search;
asInternalUserWrappedSearchFn.mockRejectedValueOnce(new Error('something went wrong!'));
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
wrappedSearchClient.asInternalUser.search
).rejects.toThrowErrorMatchingInlineSnapshot(`"something went wrong!"`);
});
test('handles empty search result object', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
scopedClusterClient.asInternalUser.child.mockReturnValue(childClient as unknown as Client);
const asInternalUserWrappedSearchFn = childClient.search;
// @ts-ignore incomplete return type
asInternalUserWrappedSearchFn.mockResolvedValue({});
const wrappedSearchClientFactory = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledWith(esQuery, {
signal: abortController.signal,
});
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});
const wrappedSearchClient = wrappedSearchClientFactory.client();
await wrappedSearchClient.asInternalUser.search(esQuery);
test('uses asCurrentUser when specified', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients(true);
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledTimes(1);
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
const asCurrentUserWrappedSearchFn = childClient.search;
const stats = wrappedSearchClientFactory.getMetrics();
expect(stats.numSearches).toEqual(1);
expect(stats.esSearchDurationMs).toEqual(0);
});
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await wrappedSearchClient.asCurrentUser.search(esQuery);
test('keeps track of number of queries', async () => {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
scopedClusterClient.asInternalUser.child.mockReturnValue(childClient as unknown as Client);
const asInternalUserWrappedSearchFn = childClient.search;
// @ts-ignore incomplete return type
asInternalUserWrappedSearchFn.mockResolvedValue({ took: 333 });
const wrappedSearchClientFactory = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
expect(asCurrentUserWrappedSearchFn).toHaveBeenCalledWith(esQuery, {
signal: abortController.signal,
});
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});
const wrappedSearchClient = wrappedSearchClientFactory.client();
await wrappedSearchClient.asInternalUser.search(esQuery);
await wrappedSearchClient.asInternalUser.search(esQuery);
await wrappedSearchClient.asInternalUser.search(esQuery);
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledTimes(3);
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
test('uses search options when specified', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
const stats = wrappedSearchClientFactory.getMetrics();
expect(stats.numSearches).toEqual(3);
expect(stats.esSearchDurationMs).toEqual(999);
const asInternalUserWrappedSearchFn = childClient.search;
expect(logger.debug).toHaveBeenCalledWith(
`executing query for rule .test-rule-type:abcdefg in space my-space - {\"body\":{\"query\":{\"bool\":{\"filter\":{\"range\":{\"@timestamp\":{\"gte\":0}}}}}}} - with options {}`
);
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await wrappedSearchClient.asInternalUser.search(esQuery, { ignore: [404] });
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledWith(esQuery, {
ignore: [404],
signal: abortController.signal,
});
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
});
test('re-throws error when an error is thrown', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
childClient.search.mockRejectedValueOnce(new Error('something went wrong!'));
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
wrappedSearchClient.asInternalUser.search
).rejects.toThrowErrorMatchingInlineSnapshot(`"something went wrong!"`);
});
test('handles empty search result object', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
const asInternalUserWrappedSearchFn = childClient.search;
// @ts-ignore incomplete return type
asInternalUserWrappedSearchFn.mockResolvedValue({});
const wrappedSearchClientFactory = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
});
const wrappedSearchClient = wrappedSearchClientFactory.client();
await wrappedSearchClient.asInternalUser.search(esQuery);
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledTimes(1);
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
const stats = wrappedSearchClientFactory.getMetrics();
expect(stats.numSearches).toEqual(1);
expect(stats.esSearchDurationMs).toEqual(0);
});
test('keeps track of number of queries', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
const asInternalUserWrappedSearchFn = childClient.search;
// @ts-ignore incomplete return type
asInternalUserWrappedSearchFn.mockResolvedValue({ took: 333 });
const wrappedSearchClientFactory = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
});
const wrappedSearchClient = wrappedSearchClientFactory.client();
await wrappedSearchClient.asInternalUser.search(esQuery);
await wrappedSearchClient.asInternalUser.search(esQuery);
await wrappedSearchClient.asInternalUser.search(esQuery);
expect(asInternalUserWrappedSearchFn).toHaveBeenCalledTimes(3);
expect(scopedClusterClient.asInternalUser.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.search).not.toHaveBeenCalled();
const stats = wrappedSearchClientFactory.getMetrics();
expect(stats.numSearches).toEqual(3);
expect(stats.esSearchDurationMs).toEqual(999);
expect(logger.debug).toHaveBeenCalledWith(
`executing query for rule .test-rule-type:abcdefg in space my-space - {\"body\":{\"query\":{\"bool\":{\"filter\":{\"range\":{\"@timestamp\":{\"gte\":0}}}}}}} - with options {}`
);
});
test('throws error when search throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
abortController.abort();
childClient.search.mockRejectedValueOnce(new Error('Request has been aborted by the user'));
const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
abortableSearchClient.asInternalUser.search
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Search has been aborted due to cancelled execution"`
);
});
});
test('throws error when search throws abort error', async () => {
const abortController = new AbortController();
abortController.abort();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
describe('eql.search', () => {
test('re-throws error when an error is thrown', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
scopedClusterClient.asInternalUser.child.mockReturnValue(childClient as unknown as Client);
childClient.search.mockRejectedValueOnce(new Error('Request has been aborted by the user'));
childClient.eql.search.mockRejectedValueOnce(new Error('something went wrong!'));
const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
abortableSearchClient.asInternalUser.search
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Search has been aborted due to cancelled execution"`
);
await expect(
wrappedSearchClient.asInternalUser.eql.search
).rejects.toThrowErrorMatchingInlineSnapshot(`"something went wrong!"`);
});
test('keeps track of number of queries', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
const asInternalUserWrappedEqlSearchFn = childClient.eql.search;
// @ts-ignore incomplete return type
asInternalUserWrappedEqlSearchFn.mockResolvedValue({ took: 333 });
const wrappedSearchClientFactory = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
});
const wrappedSearchClient = wrappedSearchClientFactory.client();
await wrappedSearchClient.asInternalUser.eql.search(eqlQuery);
await wrappedSearchClient.asInternalUser.eql.search(eqlQuery);
await wrappedSearchClient.asInternalUser.eql.search(eqlQuery);
expect(asInternalUserWrappedEqlSearchFn).toHaveBeenCalledTimes(3);
expect(scopedClusterClient.asInternalUser.eql.search).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.eql.search).not.toHaveBeenCalled();
const stats = wrappedSearchClientFactory.getMetrics();
expect(stats.numSearches).toEqual(3);
expect(stats.esSearchDurationMs).toEqual(999);
expect(logger.debug).toHaveBeenCalledWith(
`executing eql query for rule .test-rule-type:abcdefg in space my-space - {\"index\":\"foo\",\"query\":\"process where process.name == \\\"regsvr32.exe\\\"\"} - with options {}`
);
});
test('throws error when eql search throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
abortController.abort();
childClient.eql.search.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);
const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
abortableSearchClient.asInternalUser.eql.search
).rejects.toThrowErrorMatchingInlineSnapshot(
`"EQL search has been aborted due to cancelled execution"`
);
});
});
describe('transport.request', () => {
describe('ES|QL', () => {
test('re-throws error when an error is thrown', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
childClient.transport.request.mockRejectedValueOnce(new Error('something went wrong!'));
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
wrappedSearchClient.asInternalUser.transport.request({ method: 'POST', path: '/_esql' })
).rejects.toThrowErrorMatchingInlineSnapshot(`"something went wrong!"`);
});
test('keeps track of number of queries', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
const asInternalUserWrappedRequestFn = childClient.transport.request;
// @ts-ignore incomplete return type
asInternalUserWrappedRequestFn.mockResolvedValue({});
const wrappedSearchClientFactory = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
});
const wrappedSearchClient = wrappedSearchClientFactory.client();
await wrappedSearchClient.asInternalUser.transport.request(esqlQueryRequest);
await wrappedSearchClient.asInternalUser.transport.request(esqlQueryRequest);
await wrappedSearchClient.asInternalUser.transport.request(esqlQueryRequest);
expect(asInternalUserWrappedRequestFn).toHaveBeenCalledTimes(3);
expect(scopedClusterClient.asInternalUser.transport.request).not.toHaveBeenCalled();
expect(scopedClusterClient.asCurrentUser.transport.request).not.toHaveBeenCalled();
const stats = wrappedSearchClientFactory.getMetrics();
expect(stats.numSearches).toEqual(3);
expect(stats.totalSearchDurationMs).toBeGreaterThan(-1);
expect(logger.debug).toHaveBeenCalledWith(
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"POST\",\"path\":\"/_esql\",\"body\":{\"query\":\"from .kibana_task_manager\"}} - with options {}`
);
});
test('throws error when es|ql search throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
abortController.abort();
childClient.transport.request.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);
const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
abortableSearchClient.asInternalUser.transport.request({ method: 'POST', path: '/_esql' })
).rejects.toThrowErrorMatchingInlineSnapshot(
`"ES|QL search has been aborted due to cancelled execution"`
);
});
});
test('re-throws error when an error is thrown', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
childClient.transport.request.mockRejectedValueOnce(new Error('something went wrong!'));
const wrappedSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
wrappedSearchClient.asInternalUser.transport.request({ method: '', path: '' })
).rejects.toThrowErrorMatchingInlineSnapshot(`"something went wrong!"`);
});
test(`doesn't throw error when non es|ql request throws an error`, async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();
abortController.abort();
childClient.transport.request.mockRejectedValueOnce(new Error('Some other error'));
const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();
await expect(
abortableSearchClient.asInternalUser.transport.request({
method: 'GET',
path: '/_cat/indices',
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Some other error"`);
});
});
});
function getMockClusterClients(asCurrentUser: boolean = false) {
const abortController = new AbortController();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const childClient = elasticsearchServiceMock.createElasticsearchClient();
if (asCurrentUser) {
scopedClusterClient.asCurrentUser.child.mockReturnValue(childClient as unknown as Client);
} else {
scopedClusterClient.asInternalUser.child.mockReturnValue(childClient as unknown as Client);
}
return { abortController, scopedClusterClient, childClient };
}

View file

@ -10,15 +10,19 @@ import {
TransportResult,
TransportRequestOptionsWithMeta,
TransportRequestOptionsWithOutMeta,
TransportRequestParams,
} from '@elastic/elasticsearch';
import type {
SearchRequest,
SearchResponse,
AggregateName,
EqlSearchRequest,
EqlSearchResponse,
} from '@elastic/elasticsearch/lib/api/types';
import type {
SearchRequest as SearchRequestWithBody,
AggregationsAggregate,
EqlSearchRequest as EqlSearchRequestWithBody,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { IScopedClusterClient, ElasticsearchClient, Logger } from '@kbn/core/server';
import { SearchMetrics, RuleInfo } from './types';
@ -89,11 +93,133 @@ function wrapEsClient(opts: WrapEsClientOpts): ElasticsearchClient {
const wrappedClient = esClient.child({});
// Mutating the functions we want to wrap
wrappedClient.transport.request = getWrappedTransportRequestFn({
esClient: wrappedClient,
...rest,
});
wrappedClient.search = getWrappedSearchFn({ esClient: wrappedClient, ...rest });
wrappedClient.eql.search = getWrappedEqlSearchFn({ esClient: wrappedClient, ...rest });
return wrappedClient;
}
function getWrappedTransportRequestFn(opts: WrapEsClientOpts) {
const originalRequestFn = opts.esClient.transport.request;
// A bunch of overloads to make TypeScript happy
async function request<TResponse = unknown>(
params: TransportRequestParams,
options?: TransportRequestOptionsWithOutMeta
): Promise<TResponse>;
async function request<TResponse = unknown, TContext = unknown>(
params: TransportRequestParams,
options?: TransportRequestOptionsWithMeta
): Promise<TransportResult<TResponse, TContext>>;
async function request<TResponse = unknown>(
params: TransportRequestParams,
options?: TransportRequestOptions
): Promise<TResponse>;
async function request<TResponse = unknown, TContext = unknown>(
params: TransportRequestParams,
options?: TransportRequestOptions
): Promise<TResponse | TransportResult<TResponse, TContext>> {
// Wrap ES|QL requests with an abort signal
if (params.method === 'POST' && params.path === '/_esql') {
try {
const requestOptions = options ?? {};
const start = Date.now();
opts.logger.debug(
`executing ES|QL query for rule ${opts.rule.alertTypeId}:${opts.rule.id} in space ${
opts.rule.spaceId
} - ${JSON.stringify(params)} - with options ${JSON.stringify(requestOptions)}`
);
const result = (await originalRequestFn.call(opts.esClient.transport, params, {
...requestOptions,
signal: opts.abortController.signal,
})) as Promise<TResponse> | TransportResult<TResponse, TContext>;
const end = Date.now();
const durationMs = end - start;
opts.logMetricsFn({ esSearchDuration: 0, totalSearchDuration: durationMs });
return result;
} catch (e) {
if (opts.abortController.signal.aborted) {
throw new Error('ES|QL search has been aborted due to cancelled execution');
}
throw e;
}
}
// No wrap
return (await originalRequestFn.call(
opts.esClient.transport,
params,
options
)) as Promise<TResponse>;
}
return request;
}
function getWrappedEqlSearchFn(opts: WrapEsClientOpts) {
const originalEqlSearch = opts.esClient.eql.search;
// A bunch of overloads to make TypeScript happy
async function search<TEvent = unknown>(
params: EqlSearchRequest | EqlSearchRequestWithBody,
options?: TransportRequestOptionsWithOutMeta
): Promise<EqlSearchResponse<TEvent>>;
async function search<TEvent = unknown>(
params: EqlSearchRequest | EqlSearchRequestWithBody,
options?: TransportRequestOptionsWithMeta
): Promise<TransportResult<EqlSearchResponse<TEvent>, unknown>>;
async function search<TEvent = unknown>(
params: EqlSearchRequest | EqlSearchRequestWithBody,
options?: TransportRequestOptions
): Promise<EqlSearchResponse<TEvent>>;
async function search<TEvent = unknown>(
params: EqlSearchRequest | EqlSearchRequestWithBody,
options?: TransportRequestOptions
): Promise<EqlSearchResponse<TEvent> | TransportResult<EqlSearchResponse<TEvent>, unknown>> {
try {
const searchOptions = options ?? {};
const start = Date.now();
opts.logger.debug(
`executing eql query for rule ${opts.rule.alertTypeId}:${opts.rule.id} in space ${
opts.rule.spaceId
} - ${JSON.stringify(params)} - with options ${JSON.stringify(searchOptions)}`
);
const result = (await originalEqlSearch.call(opts.esClient, params, {
...searchOptions,
signal: opts.abortController.signal,
})) as TransportResult<EqlSearchResponse<TEvent>, unknown> | EqlSearchResponse<TEvent>;
const end = Date.now();
const durationMs = end - start;
let took: number | undefined = 0;
if (searchOptions.meta) {
// when meta: true, response is TransportResult<EqlSearchResponse<TEvent>, unknown>
took = (result as TransportResult<EqlSearchResponse<TEvent>, unknown>).body.took;
} else {
// when meta: false, response is EqlSearchResponse<TEvent>
took = (result as EqlSearchResponse<TEvent>).took;
}
opts.logMetricsFn({ esSearchDuration: took ?? 0, totalSearchDuration: durationMs });
return result;
} catch (e) {
if (opts.abortController.signal.aborted) {
throw new Error('EQL search has been aborted due to cancelled execution');
}
throw e;
}
}
return search;
}
function getWrappedSearchFn(opts: WrapEsClientOpts) {
const originalSearch = opts.esClient.search;