[ResponseOps] resolve conflicts when updating alert docs after rule execution (#166283)

resolves: #158403

When conflicts are detected while updating alert docs after a rule runs,
we'll try to resolve the conflict by `mget()`'ing the alert documents
again, to get the updated OCC info `_seq_no` and `_primary_term`. We'll
also get the current versions of "ad-hoc" updated fields (which caused
the conflict), like workflow status, case assignments, etc. And then
attempt to update the alert doc again, with that info, which should get
it back up-to-date.

Note that the rule registry was not touched here. During this PR's
development, I added the retry support to it, but then my function tests
were failing because there were never any conflicts happening. Turns out
rule registry mget's the alerts before it updates them, to get the
latest values. So they won't need this fix.

It's also not clear to me if this can be exercised in serverless, since
it requires the use of an alerting framework based AaD implementation
AND the ability to ad-hoc update alerts. I think this can only be done
with Elasticsearch Query and Index Threshold, and only when used in
metrics scope, so it will show up in the metrics UX, which is where you
can add the alerts to the case.

## manual testing

It's hard! I've seen the conflict messages before, but it's quite
difficult to get them to go off whenever you want. The basic idea is to
get a rule that uses alerting framework AAD (not rule registry, which is
not affected the same way with conflicts (they mget alerts right before
updating them), set it to run on a `1s` interval, and probably also
configure TM to run a `1s` interval, via the following configs:

```
xpack.alerting.rules.minimumScheduleInterval.value: "1s"
xpack.task_manager.poll_interval: 1000
```

You want to get the rule to execute often and generate a lot of alerts,
and run for as long as possible. Then while it's running, add the
generated alerts to cases. Here's the EQ rule definition I used:


![image](56c69d50-a76c-48d4-9a45-665a0008b248)

I selected the alerts from the o11y alerts page, since you can't add
alerts to cases from the stack page. Hmm. :-). Sort the alert list by
low-high duration, so the newest alerts will be at the top. Refresh,
select all the rules (set page to show 100), then add to case from the
`...` menu. If you force a conflict, you should see something like this
in the Kibana logs:

```
[ERROR] [plugins.alerting] Error writing alerts: 168 successful, 100 conflicts, 0 errors:
[INFO ] [plugins.alerting] Retrying bulk update of 100 conflicted alerts
[INFO ] [plugins.alerting] Retried bulk update of 100 conflicted alerts succeeded
```

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Patrick Mueller 2023-09-28 09:47:17 -04:00 committed by GitHub
parent 3ad5addd89
commit e6e3e2d188
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 1039 additions and 11 deletions

View file

@ -1299,7 +1299,7 @@ describe('Alerts Client', () => {
expect(clusterClient.bulk).toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(
`Error writing 1 out of 2 alerts - [{\"type\":\"action_request_validation_exception\",\"reason\":\"Validation Failed: 1: index is missing;2: type is missing;\"}]`
`Error writing alerts: 1 successful, 0 conflicts, 1 errors: Validation Failed: 1: index is missing;2: type is missing;`
);
});

View file

@ -55,6 +55,7 @@ import {
getContinualAlertsQuery,
} from './lib';
import { isValidAlertIndexName } from '../alerts_service';
import { resolveAlertConflicts } from './lib/alert_conflict_resolver';
// Term queries can take up to 10,000 terms
const CHUNK_SIZE = 10000;
@ -467,15 +468,17 @@ export class AlertsClient<
// If there were individual indexing errors, they will be returned in the success response
if (response && response.errors) {
const errorsInResponse = (response.items ?? [])
.map((item) => item?.index?.error || item?.create?.error)
.filter((item) => item != null);
this.options.logger.error(
`Error writing ${errorsInResponse.length} out of ${
alertsToIndex.length
} alerts - ${JSON.stringify(errorsInResponse)}`
);
await resolveAlertConflicts({
logger: this.options.logger,
esClient,
bulkRequest: {
refresh: 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
operations: bulkBody,
},
bulkResponse: response,
});
}
} catch (err) {
this.options.logger.error(

View file

@ -0,0 +1,307 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import {
BulkRequest,
BulkResponse,
BulkResponseItem,
BulkOperationType,
} from '@elastic/elasticsearch/lib/api/types';
import { resolveAlertConflicts } from './alert_conflict_resolver';
const logger = loggingSystemMock.create().get();
const esClient = elasticsearchServiceMock.createElasticsearchClient();
const alertDoc = {
event: { action: 'active' },
kibana: {
alert: {
status: 'untracked',
workflow_status: 'a-ok!',
workflow_tags: ['fee', 'fi', 'fo', 'fum'],
case_ids: ['123', '456', '789'],
},
},
};
describe('alert_conflict_resolver', () => {
beforeEach(() => {
jest.resetAllMocks();
});
describe('handles errors gracefully', () => {
test('when mget fails', async () => {
const { bulkRequest, bulkResponse } = getReqRes('ic');
esClient.mget.mockRejectedValueOnce(new Error('mget failed'));
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).toHaveBeenNthCalledWith(
2,
'Error resolving alert conflicts: mget failed'
);
});
test('when bulk fails', async () => {
const { bulkRequest, bulkResponse } = getReqRes('ic');
esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(0, alertDoc)],
});
esClient.bulk.mockRejectedValueOnce(new Error('bulk failed'));
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).toHaveBeenNthCalledWith(
2,
'Error resolving alert conflicts: bulk failed'
);
});
});
describe('is successful with', () => {
test('no bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('');
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).not.toHaveBeenCalled();
});
test('no errors in bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('c is is c is');
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).not.toHaveBeenCalled();
});
test('one conflicted doc', async () => {
const { bulkRequest, bulkResponse } = getReqRes('ic');
esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(0, alertDoc)],
});
esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(0)],
});
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts: 0 successful, 1 conflicts, 0 errors: `
);
expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 1 conflicted alerts succeeded`
);
});
test('one conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ie');
esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc)],
});
esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(2)],
});
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts: 2 successful, 1 conflicts, 1 errors: hallo`
);
expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 1 conflicted alerts succeeded`
);
});
test('multiple conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ic ie ic');
esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc), getMGetResDoc(3, alertDoc), getMGetResDoc(5, alertDoc)],
});
esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(2), getBulkResItem(3), getBulkResItem(5)],
});
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts: 2 successful, 3 conflicts, 1 errors: hallo`
);
expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 3 conflicted alerts`);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 3 conflicted alerts succeeded`
);
});
});
});
function getBulkResItem(id: number) {
return {
index: {
_index: `index-${id}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
status: 200,
},
};
}
function getMGetResDoc(id: number, doc: unknown) {
return {
_index: `index-${id}}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
found: true,
_source: doc,
};
}
interface GetReqResResult {
bulkRequest: BulkRequest<unknown, unknown>;
bulkResponse: BulkResponse;
}
/**
* takes as input a string of c, is, ic, ie tokens and builds appropriate
* bulk request and response objects to use in the tests:
* - c: create, ignored by the resolve logic
* - is: index with success
* - ic: index with conflict
* - ie: index with error but not conflict
*/
function getReqRes(bulkOps: string): GetReqResResult {
const ops = bulkOps.trim().split(/\s+/g);
const bulkRequest = getBulkRequest();
const bulkResponse = getBulkResponse();
bulkRequest.operations = [];
bulkResponse.items = [];
bulkResponse.errors = false;
if (ops[0] === '') return { bulkRequest, bulkResponse };
const createOp = { create: {} };
let id = 0;
for (const op of ops) {
id++;
switch (op) {
// create, ignored by the resolve logic
case 'c':
bulkRequest.operations.push(createOp, alertDoc);
bulkResponse.items.push(getResponseItem('create', id, false, 200));
break;
// index with success
case 'is':
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, false, 200));
break;
// index with conflict
case 'ic':
bulkResponse.errors = true;
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, true, 409));
break;
// index with error but not conflict
case 'ie':
bulkResponse.errors = true;
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, true, 418)); // I'm a teapot
break;
// developer error
default:
throw new Error('bad input');
}
}
return { bulkRequest, bulkResponse };
}
function getBulkRequest(): BulkRequest<unknown, unknown> {
return {
refresh: 'wait_for',
index: 'some-index',
require_alias: true,
operations: [],
};
}
function getIndexOp(id: number) {
return {
index: {
_id: `id-${id}`,
_index: `index-${id}`,
if_seq_no: 17,
if_primary_term: 1,
require_alias: false,
},
};
}
function getBulkResponse(): BulkResponse {
return {
errors: false,
took: 0,
items: [],
};
}
function getResponseItem(
type: BulkOperationType,
id: number,
error: boolean,
status: number
): Partial<Record<BulkOperationType, BulkResponseItem>> {
if (error) {
return {
[type]: {
_index: `index-${id}`,
_id: `id-${id}`,
error: { reason: 'hallo' },
status,
},
};
}
return {
[type]: {
_index: `index-${id}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
status: 200,
},
};
}

View file

@ -0,0 +1,288 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
BulkRequest,
BulkResponse,
BulkOperationContainer,
MgetResponseItem,
} from '@elastic/elasticsearch/lib/api/types';
import { Logger, ElasticsearchClient } from '@kbn/core/server';
import {
ALERT_STATUS,
ALERT_STATUS_ACTIVE,
ALERT_STATUS_RECOVERED,
ALERT_WORKFLOW_STATUS,
ALERT_WORKFLOW_TAGS,
ALERT_CASE_IDS,
} from '@kbn/rule-data-utils';
import { set } from '@kbn/safer-lodash-set';
import { zip, get } from 'lodash';
// these fields are the one's we'll refresh from the fresh mget'd docs
const REFRESH_FIELDS_ALWAYS = [ALERT_WORKFLOW_STATUS, ALERT_WORKFLOW_TAGS, ALERT_CASE_IDS];
const REFRESH_FIELDS_CONDITIONAL = [ALERT_STATUS];
const REFRESH_FIELDS_ALL = [...REFRESH_FIELDS_ALWAYS, ...REFRESH_FIELDS_CONDITIONAL];
export interface ResolveAlertConflictsParams {
esClient: ElasticsearchClient;
logger: Logger;
bulkRequest: BulkRequest<unknown, unknown>;
bulkResponse: BulkResponse;
}
interface NormalizedBulkRequest {
op: BulkOperationContainer;
doc: unknown;
}
// wrapper to catch anything thrown; current usage of this function is
// to replace just logging that the error occurred, so we don't want
// to cause _more_ errors ...
export async function resolveAlertConflicts(params: ResolveAlertConflictsParams): Promise<void> {
const { logger } = params;
try {
await resolveAlertConflicts_(params);
} catch (err) {
logger.error(`Error resolving alert conflicts: ${err.message}`);
}
}
async function resolveAlertConflicts_(params: ResolveAlertConflictsParams): Promise<void> {
const { logger, esClient, bulkRequest, bulkResponse } = params;
if (bulkRequest.operations && bulkRequest.operations?.length === 0) return;
if (bulkResponse.items && bulkResponse.items?.length === 0) return;
// get numbers for a summary log message
const { success, errors, conflicts, messages } = getResponseStats(bulkResponse);
if (conflicts === 0 && errors === 0) return;
const allMessages = messages.join('; ');
logger.error(
`Error writing alerts: ${success} successful, ${conflicts} conflicts, ${errors} errors: ${allMessages}`
);
// get a new bulk request for just conflicted docs
const conflictRequest = getConflictRequest(bulkRequest, bulkResponse);
if (conflictRequest.length === 0) return;
// get the fresh versions of those docs
const freshDocs = await getFreshDocs(esClient, conflictRequest);
// update the OCC and refresh-able fields
await updateOCC(conflictRequest, freshDocs);
await refreshFieldsInDocs(conflictRequest, freshDocs);
logger.info(`Retrying bulk update of ${conflictRequest.length} conflicted alerts`);
const mbrResponse = await makeBulkRequest(params.esClient, params.bulkRequest, conflictRequest);
if (mbrResponse.bulkResponse?.items.length !== conflictRequest.length) {
const actual = mbrResponse.bulkResponse?.items.length;
const expected = conflictRequest.length;
logger.error(
`Unexpected number of bulk response items retried; expecting ${expected}, retried ${actual}`
);
return;
}
if (mbrResponse.error) {
const index = bulkRequest.index || 'unknown index';
logger.error(
`Error writing ${conflictRequest.length} alerts to ${index} - ${mbrResponse.error.message}`
);
return;
}
if (mbrResponse.errors === 0) {
logger.info(`Retried bulk update of ${conflictRequest.length} conflicted alerts succeeded`);
} else {
logger.error(
`Retried bulk update of ${conflictRequest.length} conflicted alerts still had ${mbrResponse.errors} conflicts`
);
}
}
interface MakeBulkRequestResponse {
bulkRequest: BulkRequest;
bulkResponse?: BulkResponse;
errors: number;
error?: Error;
}
// make the bulk request to fix conflicts
async function makeBulkRequest(
esClient: ElasticsearchClient,
bulkRequest: BulkRequest,
conflictRequest: NormalizedBulkRequest[]
): Promise<MakeBulkRequestResponse> {
const operations = conflictRequest.map((req) => [req.op, req.doc]).flat();
// just replace the operations from the original request
const updatedBulkRequest = { ...bulkRequest, operations };
const bulkResponse = await esClient.bulk(updatedBulkRequest);
const errors = bulkResponse.items.filter((item) => item.index?.error).length;
return { bulkRequest, bulkResponse, errors };
}
/** Update refreshable fields in the conflict requests. */
async function refreshFieldsInDocs(
conflictRequests: NormalizedBulkRequest[],
freshResponses: MgetResponseItem[]
) {
for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) {
if (!conflictRequest?.op.index || !freshResponse) continue;
// @ts-expect-error @elastic/elasticsearch _source is not in the type!
const freshDoc = freshResponse._source;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const conflictDoc = conflictRequest.doc as Record<string, any>;
if (!freshDoc || !conflictDoc) continue;
for (const refreshField of REFRESH_FIELDS_ALWAYS) {
const val = get(freshDoc, refreshField);
set(conflictDoc, refreshField, val);
}
// structured this way to make sure all conditional refresh
// fields are listed in REFRESH_FIELDS_CONDITIONAL when we mget
for (const refreshField of REFRESH_FIELDS_CONDITIONAL) {
switch (refreshField) {
// hamdling for kibana.alert.status: overwrite conflict doc
// with fresh version if it's not active or recovered (ie, untracked)
case ALERT_STATUS:
const freshStatus = get(freshDoc, ALERT_STATUS);
if (freshStatus !== ALERT_STATUS_ACTIVE && freshStatus !== ALERT_STATUS_RECOVERED) {
set(conflictDoc, ALERT_STATUS, freshStatus);
}
break;
}
}
}
}
/** Update the OCC info in the conflict request with the fresh info. */
async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: MgetResponseItem[]) {
for (const [req, freshDoc] of zip(conflictRequests, freshDocs)) {
if (!req?.op.index || !freshDoc) continue;
// @ts-expect-error @elastic/elasticsearch _seq_no is not in the type!
const seqNo: number | undefined = freshDoc._seq_no;
// @ts-expect-error @elastic/elasticsearch _primary_term is not in the type!
const primaryTerm: number | undefined = freshDoc._primary_term;
if (seqNo === undefined) throw new Error('Unexpected undefined seqNo');
if (primaryTerm === undefined) throw new Error('Unexpected undefined primaryTerm');
req.op.index.if_seq_no = seqNo;
req.op.index.if_primary_term = primaryTerm;
}
}
/** Get the latest version of the conflicted docs, with fields to refresh. */
async function getFreshDocs(
esClient: ElasticsearchClient,
conflictRequests: NormalizedBulkRequest[]
): Promise<MgetResponseItem[]> {
const docs: Array<{ _id: string; _index: string }> = [];
conflictRequests.forEach((req) => {
const [id, index] = [req.op.index?._id, req.op.index?._index];
if (!id || !index) return;
docs.push({ _id: id, _index: index });
});
const mgetRes = await esClient.mget<unknown>({ docs, _source_includes: REFRESH_FIELDS_ALL });
if (mgetRes.docs.length !== docs.length) {
throw new Error(
`Unexpected number of mget response docs; expected ${docs.length}, got ${mgetRes.docs.length}`
);
}
return mgetRes.docs;
}
/** Return the bulk request, filtered to those requests that had conflicts. */
function getConflictRequest(
bulkRequest: BulkRequest,
bulkResponse: BulkResponse
): NormalizedBulkRequest[] {
// first "normalize" the request from it's non-linear form
const request = normalizeRequest(bulkRequest);
// maybe we didn't unwind it right ...
if (request.length !== bulkResponse.items.length) {
throw new Error('Unexpected number of bulk response items');
}
if (request.length === 0) return [];
// we only want op: index where the status was 409 / conflict
const conflictRequest = zip(request, bulkResponse.items)
.filter(([_, res]) => res?.index?.status === 409)
.map(([req, _]) => req!);
return conflictRequest;
}
/** Convert a bulk request (op | doc)[] to an array of { op, doc }[] */
function normalizeRequest(bulkRequest: BulkRequest) {
if (!bulkRequest.operations) return [];
const result: NormalizedBulkRequest[] = [];
let index = 0;
while (index < bulkRequest.operations.length) {
// the "op" data
const op = bulkRequest.operations[index] as BulkOperationContainer;
// now the "doc" data, if there is any (none for delete)
if (op.create || op.index || op.update) {
index++;
const doc = bulkRequest.operations[index];
result.push({ op, doc });
} else if (op.delete) {
// no doc for delete op
} else {
throw new Error(`Unsupported bulk operation: ${JSON.stringify(op)}`);
}
index++;
}
return result;
}
interface ResponseStatsResult {
success: number;
conflicts: number;
errors: number;
messages: string[];
}
// generate a summary of the original bulk request attempt, for logging
function getResponseStats(bulkResponse: BulkResponse): ResponseStatsResult {
const stats: ResponseStatsResult = { success: 0, conflicts: 0, errors: 0, messages: [] };
for (const item of bulkResponse.items) {
const op = item.create || item.index || item.update || item.delete;
if (op?.error) {
if (op?.status === 409 && op === item.index) {
stats.conflicts++;
} else {
stats.errors++;
stats.messages.push(op?.error?.reason || 'no bulk reason provided');
}
} else {
stats.success++;
}
}
return stats;
}

View file

@ -7,7 +7,7 @@
import { v4 as uuidv4 } from 'uuid';
import { Logger } from '@kbn/logging';
import { CoreSetup } from '@kbn/core/server';
import { CoreSetup, ElasticsearchClient } from '@kbn/core/server';
import { schema, TypeOf } from '@kbn/config-schema';
import { curry, range, times } from 'lodash';
import {
@ -941,6 +941,136 @@ function getAlwaysFiringAlertAsDataRuleType(
});
}
function getWaitingRuleType(logger: Logger) {
const ParamsType = schema.object({
source: schema.string(),
alerts: schema.number(),
});
type ParamsType = TypeOf<typeof ParamsType>;
interface State extends RuleTypeState {
runCount?: number;
}
const id = 'test.waitingRule';
const result: RuleType<
ParamsType,
never,
State,
{},
{},
'default',
'recovered',
{ runCount: number }
> = {
id,
name: 'Test: Rule that waits for a signal before finishing',
actionGroups: [{ id: 'default', name: 'Default' }],
producer: 'alertsFixture',
defaultActionGroupId: 'default',
minimumLicenseRequired: 'basic',
isExportable: true,
doesSetRecoveryContext: true,
validate: { params: ParamsType },
alerts: {
context: id.toLowerCase(),
shouldWrite: true,
mappings: {
fieldMap: {
runCount: { required: false, type: 'long' },
},
},
},
async executor(alertExecutorOptions) {
const { services, state, params } = alertExecutorOptions;
const { source, alerts } = params;
const alertsClient = services.alertsClient;
if (!alertsClient) throw new Error(`Expected alertsClient!`);
const runCount = (state.runCount || 0) + 1;
const es = services.scopedClusterClient.asInternalUser;
await sendSignal(logger, es, id, source, `rule-starting-${runCount}`);
await waitForSignal(logger, es, id, source, `rule-complete-${runCount}`);
for (let i = 0; i < alerts; i++) {
alertsClient.report({
id: `alert-${i}`,
actionGroup: 'default',
payload: { runCount },
});
}
return { state: { runCount } };
},
};
return result;
}
async function sendSignal(
logger: Logger,
es: ElasticsearchClient,
id: string,
source: string,
reference: string
) {
logger.info(`rule type ${id} sending signal ${reference}`);
await es.index({ index: ES_TEST_INDEX_NAME, refresh: 'true', body: { source, reference } });
}
async function waitForSignal(
logger: Logger,
es: ElasticsearchClient,
id: string,
source: string,
reference: string
) {
let docs: unknown[] = [];
for (let attempt = 0; attempt < 20; attempt++) {
docs = await getSignalDocs(es, source, reference);
if (docs.length > 0) {
logger.info(`rule type ${id} received signal ${reference}`);
break;
}
logger.info(`rule type ${id} waiting for signal ${reference}`);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
if (docs.length === 0) {
throw new Error(`Expected to find docs with source ${source}`);
}
}
async function getSignalDocs(es: ElasticsearchClient, source: string, reference: string) {
const body = {
query: {
bool: {
must: [
{
term: {
source,
},
},
{
term: {
reference,
},
},
],
},
},
};
const params = {
index: ES_TEST_INDEX_NAME,
size: 1000,
_source: false,
body,
};
const result = await es.search(params, { meta: true });
return result?.body?.hits?.hits || [];
}
export function defineAlertTypes(
core: CoreSetup<FixtureStartDeps>,
{ alerting, ruleRegistry }: Pick<FixtureSetupDeps, 'alerting' | 'ruleRegistry'>,
@ -1162,4 +1292,5 @@ export function defineAlertTypes(
alerting.registerType(getAlwaysFiringAlertAsDataRuleType(logger, { ruleRegistry }));
alerting.registerType(getPatternFiringAutoRecoverFalseAlertType());
alerting.registerType(getPatternFiringAlertsAsDataRuleType());
alerting.registerType(getWaitingRuleType(logger));
}

View file

@ -88,6 +88,7 @@ export class FixturePlugin implements Plugin<void, void, FixtureSetupDeps, Fixtu
'test.exceedsAlertLimit',
'test.always-firing-alert-as-data',
'test.patternFiringAad',
'test.waitingRule',
],
privileges: {
all: {
@ -117,6 +118,7 @@ export class FixturePlugin implements Plugin<void, void, FixtureSetupDeps, Fixtu
'test.exceedsAlertLimit',
'test.always-firing-alert-as-data',
'test.patternFiringAad',
'test.waitingRule',
],
},
},
@ -149,6 +151,7 @@ export class FixturePlugin implements Plugin<void, void, FixtureSetupDeps, Fixtu
'test.exceedsAlertLimit',
'test.always-firing-alert-as-data',
'test.patternFiringAad',
'test.waitingRule',
],
},
},

View file

@ -68,6 +68,17 @@ export class ESTestIndexTool {
);
}
async indexDoc(source: string, reference?: string) {
return await this.es.index({
index: this.index,
document: {
source,
reference,
},
refresh: true,
});
}
async destroy() {
const indexExists = await this.es.indices.exists({ index: this.index });
if (indexExists) {

View file

@ -0,0 +1,284 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { Client } from '@elastic/elasticsearch';
import { SearchHit } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { Alert } from '@kbn/alerts-as-data-utils';
import { ESTestIndexTool } from '@kbn/alerting-api-integration-helpers';
import { basename } from 'node:path';
import { v4 as uuidv4 } from 'uuid';
import { get, omit } from 'lodash';
import { FtrProviderContext } from '../../../../../common/ftr_provider_context';
import { Spaces } from '../../../../scenarios';
import { getTestRuleData, getUrlPrefix, ObjectRemover } from '../../../../../common/lib';
type AlertDoc = Alert & { runCount: number };
// sort results of a search of alert docs by alert instance id
function sortAlertDocsByInstanceId(a: SearchHit<AlertDoc>, b: SearchHit<AlertDoc>) {
return a._source!.kibana.alert.instance.id.localeCompare(b._source!.kibana.alert.instance.id);
}
// eslint-disable-next-line import/no-default-export
export default function createAlertsAsDataInstallResourcesTest({ getService }: FtrProviderContext) {
const es = getService('es');
const retry = getService('retry');
const supertestWithoutAuth = getService('supertestWithoutAuth');
const objectRemover = new ObjectRemover(supertestWithoutAuth);
const esTestIndexTool = new ESTestIndexTool(es, retry);
describe('document conflicts during rule execution', () => {
before(async () => {
await esTestIndexTool.destroy();
await esTestIndexTool.setup();
});
after(async () => {
await objectRemover.removeAll();
await esTestIndexTool.destroy();
});
const ruleType = 'test.waitingRule';
const aadIndex = `.alerts-${ruleType.toLowerCase()}.alerts-default`;
describe(`should be handled for alerting framework based AaD`, () => {
it('for a single conflicted alert', async () => {
const source = uuidv4();
const count = 1;
const params = { source, alerts: count };
const createdRule = await supertestWithoutAuth
.post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`)
.set('kbn-xsrf', 'foo')
.send(
getTestRuleData({
name: `${basename(__filename)} ${ruleType} ${source}}`,
rule_type_id: ruleType,
schedule: { interval: '1s' },
throttle: null,
params,
actions: [],
})
);
if (createdRule.status !== 200) {
log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`);
}
expect(createdRule.status).to.eql(200);
const ruleId = createdRule.body.id;
objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting');
// this rule type uses esTextIndexTool documents to communicate
// with the rule executor. Once the rule starts executing, it
// "sends" `rule-starting-<n>`, which this code waits for. It
// then updates the alert doc, and "sends" `rule-complete-<n>`.
// which the rule executor is waiting on, to complete the rule
// execution.
log(`signal the rule to finish the first run`);
await esTestIndexTool.indexDoc(source, 'rule-complete-1');
log(`wait for the first alert doc to be created`);
const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count);
expect(initialDocs.length).to.eql(count);
log(`wait for the start of the next execution`);
await esTestIndexTool.waitForDocs(source, 'rule-starting-2');
log(`ad-hoc update the alert doc`);
await adHocUpdate(es, aadIndex, initialDocs[0]._id);
log(`signal the rule to finish`);
await esTestIndexTool.indexDoc(source, 'rule-complete-2');
log(`wait for the start of the next execution`);
await esTestIndexTool.waitForDocs(source, 'rule-starting-3');
log(`get the updated alert doc`);
const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count);
expect(updatedDocs.length).to.eql(1);
log(`signal the rule to finish, then delete it`);
await esTestIndexTool.indexDoc(source, 'rule-complete-3');
await objectRemover.removeAll();
// compare the initial and updated alert docs
compareAlertDocs(initialDocs[0], updatedDocs[0], true);
});
it('for a mix of successful and conflicted alerts', async () => {
const source = uuidv4();
const count = 5;
const params = { source, alerts: count };
const createdRule = await supertestWithoutAuth
.post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`)
.set('kbn-xsrf', 'foo')
.send(
getTestRuleData({
name: `${basename(__filename)} ${ruleType} ${source}}`,
rule_type_id: ruleType,
schedule: { interval: '1s' },
throttle: null,
params,
actions: [],
})
);
if (createdRule.status !== 200) {
log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`);
}
expect(createdRule.status).to.eql(200);
const ruleId = createdRule.body.id;
objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting');
log(`signal the rule to finish the first run`);
await esTestIndexTool.indexDoc(source, 'rule-complete-1');
log(`wait for the first alert doc to be created`);
const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count);
initialDocs.sort(sortAlertDocsByInstanceId);
expect(initialDocs.length).to.eql(5);
log(`wait for the start of the next execution`);
await esTestIndexTool.waitForDocs(source, 'rule-starting-2');
log(`ad-hoc update the 2nd and 4th alert docs`);
await adHocUpdate(es, aadIndex, initialDocs[1]._id);
await adHocUpdate(es, aadIndex, initialDocs[3]._id);
log(`signal the rule to finish`);
await esTestIndexTool.indexDoc(source, 'rule-complete-2');
log(`wait for the start of the next execution`);
await esTestIndexTool.waitForDocs(source, 'rule-starting-3');
log(`get the updated alert doc`);
const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count);
updatedDocs.sort(sortAlertDocsByInstanceId);
expect(updatedDocs.length).to.eql(5);
log(`signal the rule to finish, then delete it`);
await esTestIndexTool.indexDoc(source, 'rule-complete-3');
await objectRemover.removeAll();
// compare the initial and updated alert docs
compareAlertDocs(initialDocs[0], updatedDocs[0], false);
compareAlertDocs(initialDocs[1], updatedDocs[1], true);
compareAlertDocs(initialDocs[2], updatedDocs[2], false);
compareAlertDocs(initialDocs[3], updatedDocs[3], true);
compareAlertDocs(initialDocs[4], updatedDocs[4], false);
});
});
});
// waits for a specified number of alert documents
async function waitForAlertDocs(
index: string,
ruleId: string,
count: number = 1
): Promise<Array<SearchHit<AlertDoc>>> {
return await retry.try(async () => {
const searchResult = await es.search<AlertDoc>({
index,
size: count,
body: {
query: {
bool: {
must: [{ term: { 'kibana.alert.rule.uuid': ruleId } }],
},
},
},
});
const docs = searchResult.hits.hits as Array<SearchHit<AlertDoc>>;
if (docs.length < count) throw new Error(`only ${docs.length} out of ${count} docs found`);
return docs;
});
}
}
// general comparator for initial / updated alert documents
function compareAlertDocs(
initialDoc: SearchHit<AlertDoc>,
updatedDoc: SearchHit<AlertDoc>,
conflicted: boolean
) {
// ensure both rule run updates and other updates persisted
if (!initialDoc) throw new Error('not enough initial docs');
if (!updatedDoc) throw new Error('not enough updated docs');
const initialAlert = initialDoc._source!;
const updatedAlert = updatedDoc._source!;
expect(initialAlert.runCount).to.be.greaterThan(0);
expect(updatedAlert.runCount).not.to.eql(-1);
expect(updatedAlert.runCount).to.be.greaterThan(initialAlert.runCount);
if (conflicted) {
expect(get(updatedAlert, 'kibana.alert.case_ids')).to.eql(
get(DocUpdate, 'kibana.alert.case_ids')
);
expect(get(updatedAlert, 'kibana.alert.workflow_tags')).to.eql(
get(DocUpdate, 'kibana.alert.workflow_tags')
);
expect(get(updatedAlert, 'kibana.alert.workflow_status')).to.eql(
get(DocUpdate, 'kibana.alert.workflow_status')
);
expect(get(initialAlert, 'kibana.alert.status')).to.be('active');
expect(get(updatedAlert, 'kibana.alert.status')).to.be('untracked');
}
const initial = omit(initialAlert, SkipFields);
const updated = omit(updatedAlert, SkipFields);
expect(initial).to.eql(updated);
}
// perform an adhoc update to an alert doc
async function adHocUpdate(es: Client, index: string, id: string) {
const body = { doc: DocUpdate };
await es.update({ index, id, body, refresh: true });
}
// we'll do the adhoc updates with this data
const DocUpdate = {
runCount: -1, // rule-specific field, will be overwritten by rule execution
kibana: {
alert: {
action_group: 'not-the-default', // will be overwritten by rule execution
// below are all fields that will NOT be overwritten by rule execution
workflow_status: 'a-ok!',
workflow_tags: ['fee', 'fi', 'fo', 'fum'],
case_ids: ['123', '456', '789'],
status: 'untracked',
},
},
};
const SkipFields = [
// dynamically changing fields we have no control over
'@timestamp',
'event.action',
'kibana.alert.duration.us',
'kibana.alert.flapping_history',
'kibana.alert.rule.execution.uuid',
// fields under our control we test separately
'runCount',
'kibana.alert.status',
'kibana.alert.case_ids',
'kibana.alert.workflow_tags',
'kibana.alert.workflow_status',
];
function log(message: string) {
// eslint-disable-next-line no-console
console.log(`${new Date().toISOString()} ${message}`);
}

View file

@ -13,5 +13,6 @@ export default function alertsAsDataTests({ loadTestFile }: FtrProviderContext)
loadTestFile(require.resolve('./install_resources'));
loadTestFile(require.resolve('./alerts_as_data'));
loadTestFile(require.resolve('./alerts_as_data_flapping'));
loadTestFile(require.resolve('./alerts_as_data_conflicts'));
});
}