[Discover] Cancel long running requests in Discover alert (#130077)

* [Discover] improve long running requests for search source within alert rule

* [Discover] add tests

* [Discover] fix linting

* [Discover] fix unit test

* [Discover] add getMetrics test

* [Discover] fix unit test

* [Discover] merge search clients metrics

* [Discover] wrap searchSourceClient

* [Discover] add unit tests

* [Discover] replace searchSourceUtils with searchSourceClient in tests

* [Discover] apply suggestions
This commit is contained in:
Dmitry Tomashevich 2022-05-19 10:24:55 +03:00 committed by GitHub
parent 5ecde4b053
commit fdf2086eb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 622 additions and 42 deletions

View file

@ -7,6 +7,9 @@
import * as t from 'io-ts';
import { either } from 'fp-ts/lib/Either';
import { Rule } from '../types';
import { RuleRunMetrics } from './rule_run_metrics_store';
// represents a Date from an ISO string
export const DateFromString = new t.Type<Date, string, unknown>(
'DateFromString',
@ -24,3 +27,15 @@ export const DateFromString = new t.Type<Date, string, unknown>(
),
(valueToEncode) => valueToEncode.toISOString()
);
export type RuleInfo = Pick<Rule, 'name' | 'alertTypeId' | 'id'> & { spaceId: string };
export interface LogSearchMetricsOpts {
esSearchDuration: number;
totalSearchDuration: number;
}
export type SearchMetrics = Pick<
RuleRunMetrics,
'numSearches' | 'totalSearchDurationMs' | 'esSearchDurationMs'
>;

View file

@ -20,15 +20,8 @@ import type {
SearchRequest as SearchRequestWithBody,
AggregationsAggregate,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { IScopedClusterClient, ElasticsearchClient, Logger } from '@kbn/core/server';
import { Rule } from '../types';
import { RuleRunMetrics } from './rule_run_metrics_store';
type RuleInfo = Pick<Rule, 'name' | 'alertTypeId' | 'id'> & { spaceId: string };
type SearchMetrics = Pick<
RuleRunMetrics,
'numSearches' | 'totalSearchDurationMs' | 'esSearchDurationMs'
>;
import type { IScopedClusterClient, ElasticsearchClient, Logger } from '@kbn/core/server';
import { SearchMetrics, RuleInfo } from './types';
interface WrapScopedClusterClientFactoryOpts {
scopedClusterClient: IScopedClusterClient;

View file

@ -0,0 +1,157 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { ISearchStartSearchSource } from '@kbn/data-plugin/common';
import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import { of, throwError } from 'rxjs';
import { wrapSearchSourceClient } from './wrap_search_source_client';
const logger = loggingSystemMock.create().get();
const rule = {
name: 'test-rule',
alertTypeId: '.test-rule-type',
id: 'abcdefg',
spaceId: 'my-space',
};
const createSearchSourceClientMock = () => {
const searchSourceMock = createSearchSourceMock();
searchSourceMock.fetch$ = jest.fn().mockImplementation(() => of({ rawResponse: { took: 5 } }));
return {
searchSourceMock,
searchSourceClientMock: {
create: jest.fn().mockReturnValue(searchSourceMock),
createEmpty: jest.fn().mockReturnValue(searchSourceMock),
} as unknown as ISearchStartSearchSource,
};
};
describe('wrapSearchSourceClient', () => {
beforeAll(() => {
jest.useFakeTimers();
});
afterAll(() => {
jest.useRealTimers();
});
afterEach(() => {
jest.resetAllMocks();
});
test('searches with provided abort controller', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.createEmpty();
await wrappedSearchSource.fetch();
expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
abortSignal: abortController.signal,
});
});
test('uses search options when specified', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
await wrappedSearchSource.fetch({ isStored: true });
expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
isStored: true,
abortSignal: abortController.signal,
});
});
test('keeps track of number of queries', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockImplementation(() => of({ rawResponse: { took: 333 } }));
const { searchSourceClient, getMetrics } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
await wrappedSearchSource.fetch();
await wrappedSearchSource.fetch();
await wrappedSearchSource.fetch();
expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
abortSignal: abortController.signal,
});
const stats = getMetrics();
expect(stats.numSearches).toEqual(3);
expect(stats.esSearchDurationMs).toEqual(999);
expect(logger.debug).toHaveBeenCalledWith(
`executing query for rule .test-rule-type:abcdefg in space my-space - with options {}`
);
});
test('re-throws error when search throws error', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockReturnValue(throwError(new Error('something went wrong!')));
const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
const fetch = wrappedSearchSource.fetch();
await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot('"something went wrong!"');
});
test('throws error when search throws abort error', async () => {
const abortController = new AbortController();
abortController.abort();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockReturnValue(throwError(new Error('Request has been aborted by the user')));
const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
const fetch = wrappedSearchSource.fetch();
await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot(
'"Search has been aborted due to cancelled execution"'
);
});
});

View file

@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/core/server';
import {
ISearchOptions,
ISearchSource,
ISearchStartSearchSource,
SearchSource,
SerializedSearchSourceFields,
} from '@kbn/data-plugin/common';
import { catchError, tap, throwError } from 'rxjs';
import { LogSearchMetricsOpts, RuleInfo, SearchMetrics } from './types';
interface Props {
logger: Logger;
rule: RuleInfo;
abortController: AbortController;
searchSourceClient: ISearchStartSearchSource;
}
interface WrapParams<T extends ISearchSource | SearchSource> {
logger: Logger;
rule: RuleInfo;
abortController: AbortController;
pureSearchSource: T;
logMetrics: (metrics: LogSearchMetricsOpts) => void;
}
export function wrapSearchSourceClient({
logger,
rule,
abortController,
searchSourceClient: pureSearchSourceClient,
}: Props) {
let numSearches: number = 0;
let esSearchDurationMs: number = 0;
let totalSearchDurationMs: number = 0;
function logMetrics(metrics: LogSearchMetricsOpts) {
numSearches++;
esSearchDurationMs += metrics.esSearchDuration;
totalSearchDurationMs += metrics.totalSearchDuration;
}
const wrapParams = {
logMetrics,
logger,
rule,
abortController,
};
const wrappedSearchSourceClient: ISearchStartSearchSource = Object.create(pureSearchSourceClient);
wrappedSearchSourceClient.createEmpty = () => {
const pureSearchSource = pureSearchSourceClient.createEmpty();
return wrapSearchSource({
...wrapParams,
pureSearchSource,
});
};
wrappedSearchSourceClient.create = async (fields?: SerializedSearchSourceFields) => {
const pureSearchSource = await pureSearchSourceClient.create(fields);
return wrapSearchSource({
...wrapParams,
pureSearchSource,
});
};
return {
searchSourceClient: wrappedSearchSourceClient,
getMetrics: (): SearchMetrics => ({
esSearchDurationMs,
totalSearchDurationMs,
numSearches,
}),
};
}
function wrapSearchSource<T extends ISearchSource | SearchSource>({
pureSearchSource,
...wrapParams
}: WrapParams<T>): T {
const wrappedSearchSource = Object.create(pureSearchSource);
wrappedSearchSource.createChild = wrapCreateChild({ ...wrapParams, pureSearchSource });
wrappedSearchSource.createCopy = wrapCreateCopy({ ...wrapParams, pureSearchSource });
wrappedSearchSource.create = wrapCreate({ ...wrapParams, pureSearchSource });
wrappedSearchSource.fetch$ = wrapFetch$({ ...wrapParams, pureSearchSource });
return wrappedSearchSource;
}
function wrapCreate({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function () {
const pureCreatedSearchSource = pureSearchSource.create();
return wrapSearchSource({
...wrapParams,
pureSearchSource: pureCreatedSearchSource,
});
};
}
function wrapCreateChild({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function (options?: {}) {
const pureSearchSourceChild = pureSearchSource.createChild(options);
return wrapSearchSource({
...wrapParams,
pureSearchSource: pureSearchSourceChild,
});
};
}
function wrapCreateCopy({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function () {
const pureSearchSourceChild = pureSearchSource.createCopy();
return wrapSearchSource({
...wrapParams,
pureSearchSource: pureSearchSourceChild,
}) as SearchSource;
};
}
function wrapFetch$({
logger,
rule,
abortController,
pureSearchSource,
logMetrics,
}: WrapParams<ISearchSource>) {
return (options?: ISearchOptions) => {
const searchOptions = options ?? {};
const start = Date.now();
logger.debug(
`executing query for rule ${rule.alertTypeId}:${rule.id} in space ${
rule.spaceId
} - with options ${JSON.stringify(searchOptions)}`
);
return pureSearchSource
.fetch$({
...searchOptions,
abortSignal: abortController.signal,
})
.pipe(
catchError((error) => {
if (abortController.signal.aborted) {
return throwError(
() => new Error('Search has been aborted due to cancelled execution')
);
}
return throwError(() => error);
}),
tap((result) => {
const durationMs = Date.now() - start;
logMetrics({
esSearchDuration: result.rawResponse.took ?? 0,
totalSearchDuration: durationMs,
});
})
);
};
}

View file

@ -9,9 +9,8 @@ import {
elasticsearchServiceMock,
savedObjectsClientMock,
uiSettingsServiceMock,
httpServerMock,
} from '@kbn/core/server/mocks';
import { dataPluginMock } from '@kbn/data-plugin/server/mocks';
import { searchSourceCommonMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import { rulesClientMock } from './rules_client.mock';
import { PluginSetupContract, PluginStartContract } from './plugin';
import { Alert, AlertFactoryDoneUtils } from './alert';
@ -113,11 +112,7 @@ const createRuleExecutorServicesMock = <
shouldWriteAlerts: () => true,
shouldStopExecution: () => true,
search: createAbortableSearchServiceMock(),
searchSourceClient: Promise.resolve(
dataPluginMock
.createStartContract()
.search.searchSource.asScoped(httpServerMock.createKibanaRequest())
),
searchSourceClient: searchSourceCommonMock,
};
};
export type RuleExecutorServicesMock = ReturnType<typeof createRuleExecutorServicesMock>;

View file

@ -17,7 +17,6 @@ import { TaskRunnerContext } from './task_runner_factory';
import { createExecutionHandler, ExecutionHandler } from './create_execution_handler';
import { Alert, createAlertFactory } from '../alert';
import {
createWrappedScopedClusterClientFactory,
ElasticsearchError,
ErrorWithReason,
executionStatusFromError,
@ -69,9 +68,12 @@ import {
RuleRunResult,
RuleTaskStateAndMetrics,
} from './types';
import { createWrappedScopedClusterClientFactory } from '../lib/wrap_scoped_cluster_client';
import { IExecutionStatusAndMetrics } from '../lib/rule_execution_status';
import { RuleRunMetricsStore } from '../lib/rule_run_metrics_store';
import { wrapSearchSourceClient } from '../lib/wrap_search_source_client';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';
import { SearchMetrics } from '../lib/types';
const FALLBACK_RETRY_INTERVAL = '5m';
const CONNECTIVITY_RETRY_INTERVAL = '5m';
@ -337,9 +339,7 @@ export class TaskRunner<
const ruleLabel = `${this.ruleType.id}:${ruleId}: '${name}'`;
const scopedClusterClient = this.context.elasticsearch.client.asScoped(fakeRequest);
const wrappedScopedClusterClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
const wrappedClientOptions = {
rule: {
name: rule.name,
alertTypeId: rule.alertTypeId,
@ -348,6 +348,16 @@ export class TaskRunner<
},
logger: this.logger,
abortController: this.searchAbortController,
};
const scopedClusterClient = this.context.elasticsearch.client.asScoped(fakeRequest);
const wrappedScopedClusterClient = createWrappedScopedClusterClientFactory({
...wrappedClientOptions,
scopedClusterClient,
});
const searchSourceClient = await this.context.data.search.searchSource.asScoped(fakeRequest);
const wrappedSearchSourceClient = wrapSearchSourceClient({
...wrappedClientOptions,
searchSourceClient,
});
let updatedRuleTypeState: void | Record<string, unknown>;
@ -371,9 +381,9 @@ export class TaskRunner<
executionId: this.executionId,
services: {
savedObjectsClient,
searchSourceClient: wrappedSearchSourceClient.searchSourceClient,
uiSettingsClient: this.context.uiSettings.asScopedToClient(savedObjectsClient),
scopedClusterClient: wrappedScopedClusterClient.client(),
searchSourceClient: this.context.data.search.searchSource.asScoped(fakeRequest),
alertFactory: createAlertFactory<
InstanceState,
InstanceContext,
@ -426,9 +436,19 @@ export class TaskRunner<
this.alertingEventLogger.setExecutionSucceeded(`rule executed: ${ruleLabel}`);
const scopedClusterClientMetrics = wrappedScopedClusterClient.getMetrics();
const searchSourceClientMetrics = wrappedSearchSourceClient.getMetrics();
const searchMetrics: SearchMetrics = {
numSearches: scopedClusterClientMetrics.numSearches + searchSourceClientMetrics.numSearches,
totalSearchDurationMs:
scopedClusterClientMetrics.totalSearchDurationMs +
searchSourceClientMetrics.totalSearchDurationMs,
esSearchDurationMs:
scopedClusterClientMetrics.esSearchDurationMs +
searchSourceClientMetrics.esSearchDurationMs,
};
const ruleRunMetricsStore = new RuleRunMetricsStore();
const searchMetrics = wrappedScopedClusterClient.getMetrics();
ruleRunMetricsStore.setNumSearches(searchMetrics.numSearches);
ruleRunMetricsStore.setTotalSearchDurationMs(searchMetrics.totalSearchDurationMs);
ruleRunMetricsStore.setEsSearchDurationMs(searchMetrics.esSearchDurationMs);

View file

@ -10,13 +10,15 @@ import type {
CustomRequestHandlerContext,
SavedObjectReference,
IUiSettingsClient,
} from '@kbn/core/server';
import { ISearchStartSearchSource } from '@kbn/data-plugin/common';
import { LicenseType } from '@kbn/licensing-plugin/server';
import {
IScopedClusterClient,
SavedObjectAttributes,
SavedObjectsClientContract,
} from '@kbn/core/server';
import type { PublicMethodsOf } from '@kbn/utility-types';
import { ISearchStartSearchSource } from '@kbn/data-plugin/common';
import { LicenseType } from '@kbn/licensing-plugin/server';
import { AlertFactoryDoneUtils, PublicAlert } from './alert';
import { RuleTypeRegistry as OrigruleTypeRegistry } from './rule_type_registry';
import { PluginSetupContract, PluginStartContract } from './plugin';
@ -72,7 +74,7 @@ export interface RuleExecutorServices<
InstanceContext extends AlertInstanceContext = AlertInstanceContext,
ActionGroupIds extends string = never
> {
searchSourceClient: Promise<ISearchStartSearchSource>;
searchSourceClient: ISearchStartSearchSource;
savedObjectsClient: SavedObjectsClientContract;
uiSettingsClient: IUiSettingsClient;
scopedClusterClient: IScopedClusterClient;

View file

@ -118,7 +118,7 @@ function createRule(shouldWriteAlerts: boolean = true) {
shouldWriteAlerts: () => shouldWriteAlerts,
shouldStopExecution: () => false,
search: {} as any,
searchSourceClient: Promise.resolve({} as ISearchStartSearchSource),
searchSourceClient: {} as ISearchStartSearchSource,
},
spaceId: 'spaceId',
state,

View file

@ -7,7 +7,6 @@
import {
elasticsearchServiceMock,
savedObjectsClientMock,
httpServerMock,
uiSettingsServiceMock,
} from '@kbn/core/server/mocks';
import {
@ -18,7 +17,7 @@ import {
RuleTypeState,
} from '@kbn/alerting-plugin/server';
import { alertsMock } from '@kbn/alerting-plugin/server/mocks';
import { dataPluginMock } from '@kbn/data-plugin/server/mocks';
import { searchSourceCommonMock } from '@kbn/data-plugin/common/search/search_source/mocks';
export const createDefaultAlertExecutorOptions = <
Params extends RuleTypeParams = never,
@ -77,11 +76,7 @@ export const createDefaultAlertExecutorOptions = <
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
shouldWriteAlerts: () => shouldWriteAlerts,
shouldStopExecution: () => false,
searchSourceClient: Promise.resolve(
dataPluginMock
.createStartContract()
.search.searchSource.asScoped(httpServerMock.createKibanaRequest())
),
searchSourceClient: searchSourceCommonMock,
},
state,
updatedBy: null,

View file

@ -54,6 +54,7 @@ import {
import { createSecurityRuleTypeWrapper } from '../../rule_types/create_security_rule_type_wrapper';
import { RULE_PREVIEW_INVOCATION_COUNT } from '../../../../../common/detection_engine/constants';
import { RuleExecutionContext, StatusChangeArgs } from '../../rule_execution_log';
import { wrapSearchSourceClient } from './utils/wrap_search_source_client';
const PREVIEW_TIMEOUT_SECONDS = 60;
@ -86,7 +87,7 @@ export const previewRulesRoute = async (
}
try {
const [, { data, security: securityService }] = await getStartServices();
const searchSourceClient = data.search.searchSource.asScoped(request);
const searchSourceClient = await data.search.searchSource.asScoped(request);
const savedObjectsClient = coreContext.savedObjects.client;
const siemClient = (await context.securitySolution).getAppClient();
@ -242,7 +243,10 @@ export const previewRulesRoute = async (
abortController,
scopedClusterClient: coreContext.elasticsearch.client,
}),
searchSourceClient,
searchSourceClient: wrapSearchSourceClient({
abortController,
searchSourceClient,
}),
uiSettingsClient: coreContext.uiSettings.client,
},
spaceId,

View file

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ISearchStartSearchSource } from '@kbn/data-plugin/common';
import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import { of, throwError } from 'rxjs';
import { wrapSearchSourceClient } from './wrap_search_source_client';
const createSearchSourceClientMock = () => {
const searchSourceMock = createSearchSourceMock();
searchSourceMock.fetch$ = jest.fn().mockImplementation(() => of({}));
return {
searchSourceMock,
searchSourceClientMock: {
create: jest.fn().mockReturnValue(searchSourceMock),
createEmpty: jest.fn().mockReturnValue(searchSourceMock),
} as unknown as ISearchStartSearchSource,
};
};
describe('wrapSearchSourceClient', () => {
beforeAll(() => {
jest.useFakeTimers();
});
afterAll(() => {
jest.useRealTimers();
});
afterEach(() => {
jest.resetAllMocks();
});
test('searches with provided abort controller', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
const wrappedSearchClient = wrapSearchSourceClient({
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await wrappedSearchClient.createEmpty();
await wrappedSearchSource.fetch();
expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
abortSignal: abortController.signal,
});
});
test('uses search options when specified', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
const wrappedSearchClient = wrapSearchSourceClient({
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await wrappedSearchClient.create();
await wrappedSearchSource.fetch({ isStored: true });
expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
isStored: true,
abortSignal: abortController.signal,
});
});
test('re-throws error when search throws error', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockReturnValue(throwError(new Error('something went wrong!')));
const wrappedSearchClient = wrapSearchSourceClient({
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await wrappedSearchClient.create();
const fetch = wrappedSearchSource.fetch();
await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot('"something went wrong!"');
});
test('throws error when search throws abort error', async () => {
const abortController = new AbortController();
abortController.abort();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockReturnValue(throwError(new Error('Request has been aborted by the user')));
const wrappedSearchClient = wrapSearchSourceClient({
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await wrappedSearchClient.create();
const fetch = wrappedSearchSource.fetch();
await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot(
'"Search has been aborted due to cancelled execution"'
);
});
});

View file

@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
ISearchOptions,
ISearchSource,
ISearchStartSearchSource,
SearchSource,
SerializedSearchSourceFields,
} from '@kbn/data-plugin/common';
import { catchError, throwError } from 'rxjs';
interface Props {
abortController: AbortController;
searchSourceClient: ISearchStartSearchSource;
}
interface WrapParams<T extends ISearchSource | SearchSource> {
abortController: AbortController;
pureSearchSource: T;
}
export function wrapSearchSourceClient({
abortController,
searchSourceClient: pureSearchSourceClient,
}: Props) {
const wrappedSearchSourceClient: ISearchStartSearchSource = Object.create(pureSearchSourceClient);
wrappedSearchSourceClient.createEmpty = () => {
const pureSearchSource = pureSearchSourceClient.createEmpty();
return wrapSearchSource({
abortController,
pureSearchSource,
});
};
wrappedSearchSourceClient.create = async (fields?: SerializedSearchSourceFields) => {
const pureSearchSource = await pureSearchSourceClient.create(fields);
return wrapSearchSource({
abortController,
pureSearchSource,
});
};
return wrappedSearchSourceClient;
}
function wrapSearchSource<T extends ISearchSource | SearchSource>({
pureSearchSource,
...wrapParams
}: WrapParams<T>): T {
const wrappedSearchSource = Object.create(pureSearchSource);
wrappedSearchSource.createChild = wrapCreateChild({ ...wrapParams, pureSearchSource });
wrappedSearchSource.createCopy = wrapCreateCopy({ ...wrapParams, pureSearchSource });
wrappedSearchSource.create = wrapCreate({ ...wrapParams, pureSearchSource });
wrappedSearchSource.fetch$ = wrapFetch$({ ...wrapParams, pureSearchSource });
return wrappedSearchSource;
}
function wrapCreate({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function () {
const pureCreatedSearchSource = pureSearchSource.create();
return wrapSearchSource({
...wrapParams,
pureSearchSource: pureCreatedSearchSource,
});
};
}
function wrapCreateChild({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function (options?: {}) {
const pureSearchSourceChild = pureSearchSource.createChild(options);
return wrapSearchSource({
...wrapParams,
pureSearchSource: pureSearchSourceChild,
});
};
}
function wrapCreateCopy({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function () {
const pureSearchSourceChild = pureSearchSource.createCopy();
return wrapSearchSource({
...wrapParams,
pureSearchSource: pureSearchSourceChild,
}) as SearchSource;
};
}
function wrapFetch$({ abortController, pureSearchSource }: WrapParams<ISearchSource>) {
return (options?: ISearchOptions) => {
const searchOptions = options ?? {};
return pureSearchSource
.fetch$({
...searchOptions,
abortSignal: abortController.signal,
})
.pipe(
catchError((error) => {
if (abortController.signal.aborted) {
return throwError(
() => new Error('Search has been aborted due to cancelled execution')
);
}
return throwError(() => error);
})
);
};
}

View file

@ -51,10 +51,7 @@ export async function executor(
alertId,
params as OnlySearchSourceAlertParams,
latestTimestamp,
{
searchSourceClient,
logger,
}
{ searchSourceClient, logger }
);
// apply the alert condition

View file

@ -20,12 +20,12 @@ export async function fetchSearchSourceQuery(
latestTimestamp: string | undefined,
services: {
logger: Logger;
searchSourceClient: Promise<ISearchStartSearchSource>;
searchSourceClient: ISearchStartSearchSource;
}
) {
const { logger, searchSourceClient } = services;
const client = await searchSourceClient;
const initialSearchSource = await client.create(params.searchConfiguration);
const initialSearchSource = await searchSourceClient.create(params.searchConfiguration);
const { searchSource, dateStart, dateEnd } = updateSearchSource(
initialSearchSource,