mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Alerting] retry internal OCC calls within alertsClient (#77838)
During development of https://github.com/elastic/kibana/pull/75553, some issues came up with the optimistic concurrency control (OCC) we were using internally within the alertsClient, via the `version` option/property of the saved object. The referenced PR updates new fields in the alert from the taskManager task after the alertType executor runs. In some alertsClient methods, OCC is used to update the alert which are requested via user requests. And so in some cases, version conflict errors were coming up when the alert was updated by task manager, in the middle of one of these methods. Note: the SIEM function test cases stress test this REALLY well. In this PR, we wrap all the methods using OCC with a function that will retry them, a short number of times, with a short delay in between. If the original method STILL has a conflict error, it will get thrown after the retry limit. In practice, this eliminated the version conflict calls that were occurring with the SIEM tests, once we started updating the saved object in the executor. For cases where we know only attributes not contributing to AAD are being updated, a new function is provided that does a partial update on just those attributes, making partial updates for those attributes a bit safer. That will be also used by PR #75553.
This commit is contained in:
parent
f5ca7d82d5
commit
feab3e3b02
8 changed files with 801 additions and 33 deletions
|
@ -1696,14 +1696,22 @@ describe('muteAll()', () => {
|
|||
muteAll: false,
|
||||
},
|
||||
references: [],
|
||||
version: '123',
|
||||
});
|
||||
|
||||
await alertsClient.muteAll({ id: '1' });
|
||||
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', {
|
||||
muteAll: true,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: 'elastic',
|
||||
});
|
||||
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
|
||||
'alert',
|
||||
'1',
|
||||
{
|
||||
muteAll: true,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: 'elastic',
|
||||
},
|
||||
{
|
||||
version: '123',
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
describe('authorization', () => {
|
||||
|
@ -1785,11 +1793,18 @@ describe('unmuteAll()', () => {
|
|||
});
|
||||
|
||||
await alertsClient.unmuteAll({ id: '1' });
|
||||
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', {
|
||||
muteAll: false,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: 'elastic',
|
||||
});
|
||||
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
|
||||
'alert',
|
||||
'1',
|
||||
{
|
||||
muteAll: false,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: 'elastic',
|
||||
},
|
||||
{
|
||||
version: '123',
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
describe('authorization', () => {
|
||||
|
|
|
@ -45,6 +45,8 @@ import { parseIsoOrRelativeDate } from './lib/iso_or_relative_date';
|
|||
import { alertInstanceSummaryFromEventLog } from './lib/alert_instance_summary_from_event_log';
|
||||
import { IEvent } from '../../event_log/server';
|
||||
import { parseDuration } from '../common/parse_duration';
|
||||
import { retryIfConflicts } from './lib/retry_if_conflicts';
|
||||
import { partiallyUpdateAlert } from './saved_objects';
|
||||
|
||||
export interface RegistryAlertTypeWithAuth extends RegistryAlertType {
|
||||
authorizedConsumers: string[];
|
||||
|
@ -421,6 +423,14 @@ export class AlertsClient {
|
|||
}
|
||||
|
||||
public async update({ id, data }: UpdateOptions): Promise<PartialAlert> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.update('${id}')`,
|
||||
async () => await this.updateWithOCC({ id, data })
|
||||
);
|
||||
}
|
||||
|
||||
private async updateWithOCC({ id, data }: UpdateOptions): Promise<PartialAlert> {
|
||||
let alertSavedObject: SavedObject<RawAlert>;
|
||||
|
||||
try {
|
||||
|
@ -529,7 +539,15 @@ export class AlertsClient {
|
|||
};
|
||||
}
|
||||
|
||||
public async updateApiKey({ id }: { id: string }) {
|
||||
public async updateApiKey({ id }: { id: string }): Promise<void> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.updateApiKey('${id}')`,
|
||||
async () => await this.updateApiKeyWithOCC({ id })
|
||||
);
|
||||
}
|
||||
|
||||
private async updateApiKeyWithOCC({ id }: { id: string }) {
|
||||
let apiKeyToInvalidate: string | null = null;
|
||||
let attributes: RawAlert;
|
||||
let version: string | undefined;
|
||||
|
@ -597,7 +615,15 @@ export class AlertsClient {
|
|||
}
|
||||
}
|
||||
|
||||
public async enable({ id }: { id: string }) {
|
||||
public async enable({ id }: { id: string }): Promise<void> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.enable('${id}')`,
|
||||
async () => await this.enableWithOCC({ id })
|
||||
);
|
||||
}
|
||||
|
||||
private async enableWithOCC({ id }: { id: string }) {
|
||||
let apiKeyToInvalidate: string | null = null;
|
||||
let attributes: RawAlert;
|
||||
let version: string | undefined;
|
||||
|
@ -658,7 +684,15 @@ export class AlertsClient {
|
|||
}
|
||||
}
|
||||
|
||||
public async disable({ id }: { id: string }) {
|
||||
public async disable({ id }: { id: string }): Promise<void> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.disable('${id}')`,
|
||||
async () => await this.disableWithOCC({ id })
|
||||
);
|
||||
}
|
||||
|
||||
private async disableWithOCC({ id }: { id: string }) {
|
||||
let apiKeyToInvalidate: string | null = null;
|
||||
let attributes: RawAlert;
|
||||
let version: string | undefined;
|
||||
|
@ -711,8 +745,19 @@ export class AlertsClient {
|
|||
}
|
||||
}
|
||||
|
||||
public async muteAll({ id }: { id: string }) {
|
||||
const { attributes } = await this.unsecuredSavedObjectsClient.get<RawAlert>('alert', id);
|
||||
public async muteAll({ id }: { id: string }): Promise<void> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.muteAll('${id}')`,
|
||||
async () => await this.muteAllWithOCC({ id })
|
||||
);
|
||||
}
|
||||
|
||||
private async muteAllWithOCC({ id }: { id: string }) {
|
||||
const { attributes, version } = await this.unsecuredSavedObjectsClient.get<RawAlert>(
|
||||
'alert',
|
||||
id
|
||||
);
|
||||
await this.authorization.ensureAuthorized(
|
||||
attributes.alertTypeId,
|
||||
attributes.consumer,
|
||||
|
@ -723,19 +768,34 @@ export class AlertsClient {
|
|||
await this.actionsAuthorization.ensureAuthorized('execute');
|
||||
}
|
||||
|
||||
await this.unsecuredSavedObjectsClient.update(
|
||||
'alert',
|
||||
const updateAttributes = this.updateMeta({
|
||||
muteAll: true,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: await this.getUserName(),
|
||||
});
|
||||
const updateOptions = { version };
|
||||
|
||||
await partiallyUpdateAlert(
|
||||
this.unsecuredSavedObjectsClient,
|
||||
id,
|
||||
this.updateMeta({
|
||||
muteAll: true,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: await this.getUserName(),
|
||||
})
|
||||
updateAttributes,
|
||||
updateOptions
|
||||
);
|
||||
}
|
||||
|
||||
public async unmuteAll({ id }: { id: string }) {
|
||||
const { attributes } = await this.unsecuredSavedObjectsClient.get<RawAlert>('alert', id);
|
||||
public async unmuteAll({ id }: { id: string }): Promise<void> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.unmuteAll('${id}')`,
|
||||
async () => await this.unmuteAllWithOCC({ id })
|
||||
);
|
||||
}
|
||||
|
||||
private async unmuteAllWithOCC({ id }: { id: string }) {
|
||||
const { attributes, version } = await this.unsecuredSavedObjectsClient.get<RawAlert>(
|
||||
'alert',
|
||||
id
|
||||
);
|
||||
await this.authorization.ensureAuthorized(
|
||||
attributes.alertTypeId,
|
||||
attributes.consumer,
|
||||
|
@ -746,18 +806,30 @@ export class AlertsClient {
|
|||
await this.actionsAuthorization.ensureAuthorized('execute');
|
||||
}
|
||||
|
||||
await this.unsecuredSavedObjectsClient.update(
|
||||
'alert',
|
||||
const updateAttributes = this.updateMeta({
|
||||
muteAll: false,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: await this.getUserName(),
|
||||
});
|
||||
const updateOptions = { version };
|
||||
|
||||
await partiallyUpdateAlert(
|
||||
this.unsecuredSavedObjectsClient,
|
||||
id,
|
||||
this.updateMeta({
|
||||
muteAll: false,
|
||||
mutedInstanceIds: [],
|
||||
updatedBy: await this.getUserName(),
|
||||
})
|
||||
updateAttributes,
|
||||
updateOptions
|
||||
);
|
||||
}
|
||||
|
||||
public async muteInstance({ alertId, alertInstanceId }: MuteOptions) {
|
||||
public async muteInstance({ alertId, alertInstanceId }: MuteOptions): Promise<void> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.muteInstance('${alertId}')`,
|
||||
async () => await this.muteInstanceWithOCC({ alertId, alertInstanceId })
|
||||
);
|
||||
}
|
||||
|
||||
private async muteInstanceWithOCC({ alertId, alertInstanceId }: MuteOptions) {
|
||||
const { attributes, version } = await this.unsecuredSavedObjectsClient.get<Alert>(
|
||||
'alert',
|
||||
alertId
|
||||
|
@ -788,7 +860,15 @@ export class AlertsClient {
|
|||
}
|
||||
}
|
||||
|
||||
public async unmuteInstance({
|
||||
public async unmuteInstance({ alertId, alertInstanceId }: MuteOptions): Promise<void> {
|
||||
return await retryIfConflicts(
|
||||
this.logger,
|
||||
`alertsClient.unmuteInstance('${alertId}')`,
|
||||
async () => await this.unmuteInstanceWithOCC({ alertId, alertInstanceId })
|
||||
);
|
||||
}
|
||||
|
||||
private async unmuteInstanceWithOCC({
|
||||
alertId,
|
||||
alertInstanceId,
|
||||
}: {
|
||||
|
|
|
@ -0,0 +1,359 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { cloneDeep } from 'lodash';
|
||||
|
||||
import { AlertsClient, ConstructorOptions } from './alerts_client';
|
||||
import { savedObjectsClientMock, loggingSystemMock } from '../../../../src/core/server/mocks';
|
||||
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
|
||||
import { alertTypeRegistryMock } from './alert_type_registry.mock';
|
||||
import { alertsAuthorizationMock } from './authorization/alerts_authorization.mock';
|
||||
import { encryptedSavedObjectsMock } from '../../encrypted_saved_objects/server/mocks';
|
||||
import { actionsClientMock, actionsAuthorizationMock } from '../../actions/server/mocks';
|
||||
import { AlertsAuthorization } from './authorization/alerts_authorization';
|
||||
import { ActionsAuthorization } from '../../actions/server';
|
||||
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
|
||||
import { RetryForConflictsAttempts } from './lib/retry_if_conflicts';
|
||||
import { TaskStatus } from '../../../plugins/task_manager/server/task';
|
||||
|
||||
let alertsClient: AlertsClient;
|
||||
|
||||
const MockAlertId = 'alert-id';
|
||||
|
||||
const ConflictAfterRetries = RetryForConflictsAttempts + 1;
|
||||
|
||||
const taskManager = taskManagerMock.start();
|
||||
const alertTypeRegistry = alertTypeRegistryMock.create();
|
||||
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();
|
||||
|
||||
const encryptedSavedObjects = encryptedSavedObjectsMock.createClient();
|
||||
const authorization = alertsAuthorizationMock.create();
|
||||
const actionsAuthorization = actionsAuthorizationMock.create();
|
||||
|
||||
const kibanaVersion = 'v7.10.0';
|
||||
const logger = loggingSystemMock.create().get();
|
||||
const alertsClientParams: jest.Mocked<ConstructorOptions> = {
|
||||
taskManager,
|
||||
alertTypeRegistry,
|
||||
unsecuredSavedObjectsClient,
|
||||
authorization: (authorization as unknown) as AlertsAuthorization,
|
||||
actionsAuthorization: (actionsAuthorization as unknown) as ActionsAuthorization,
|
||||
spaceId: 'default',
|
||||
namespace: 'default',
|
||||
getUserName: jest.fn(),
|
||||
createAPIKey: jest.fn(),
|
||||
invalidateAPIKey: jest.fn(),
|
||||
logger,
|
||||
encryptedSavedObjectsClient: encryptedSavedObjects,
|
||||
getActionsClient: jest.fn(),
|
||||
getEventLogClient: jest.fn(),
|
||||
kibanaVersion,
|
||||
};
|
||||
|
||||
// this suite consists of two suites running tests against mutable alertsClient APIs:
|
||||
// - one to run tests where an SO update conflicts once
|
||||
// - one to run tests where an SO update conflicts too many times
|
||||
describe('alerts_client_conflict_retries', () => {
|
||||
// tests that mutable operations work if only one SO conflict occurs
|
||||
describe(`1 retry works for method`, () => {
|
||||
beforeEach(() => {
|
||||
mockSavedObjectUpdateConflictErrorTimes(1);
|
||||
});
|
||||
|
||||
testFn(update, true);
|
||||
testFn(updateApiKey, true);
|
||||
testFn(enable, true);
|
||||
testFn(disable, true);
|
||||
testFn(muteAll, true);
|
||||
testFn(unmuteAll, true);
|
||||
testFn(muteInstance, true);
|
||||
testFn(unmuteInstance, true);
|
||||
});
|
||||
|
||||
// tests that mutable operations fail if too many SO conflicts occurs
|
||||
describe(`${ConflictAfterRetries} retries fails with conflict error`, () => {
|
||||
beforeEach(() => {
|
||||
mockSavedObjectUpdateConflictErrorTimes(ConflictAfterRetries);
|
||||
});
|
||||
|
||||
testFn(update, false);
|
||||
testFn(updateApiKey, false);
|
||||
testFn(enable, false);
|
||||
testFn(disable, false);
|
||||
testFn(muteAll, false);
|
||||
testFn(unmuteAll, false);
|
||||
testFn(muteInstance, false);
|
||||
testFn(unmuteInstance, false);
|
||||
});
|
||||
});
|
||||
|
||||
// alertsClients methods being tested
|
||||
// - success is passed as an indication if the alertsClient method
|
||||
// is expected to succeed or not, based on the number of conflicts
|
||||
// set up in the `beforeEach()` method
|
||||
|
||||
async function update(success: boolean) {
|
||||
try {
|
||||
await alertsClient.update({
|
||||
id: MockAlertId,
|
||||
data: {
|
||||
schedule: { interval: '5s' },
|
||||
name: 'cba',
|
||||
tags: ['bar'],
|
||||
params: { bar: true },
|
||||
throttle: '10s',
|
||||
actions: [],
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
// only checking the warn messages in this test
|
||||
expect(logger.warn).lastCalledWith(
|
||||
`alertsClient.update('alert-id') conflict, exceeded retries`
|
||||
);
|
||||
return expectConflict(success, err, 'create');
|
||||
}
|
||||
expectSuccess(success, 2, 'create');
|
||||
|
||||
// only checking the debug messages in this test
|
||||
expect(logger.debug).nthCalledWith(1, `alertsClient.update('alert-id') conflict, retrying ...`);
|
||||
}
|
||||
|
||||
async function updateApiKey(success: boolean) {
|
||||
try {
|
||||
await alertsClient.updateApiKey({ id: MockAlertId });
|
||||
} catch (err) {
|
||||
return expectConflict(success, err);
|
||||
}
|
||||
|
||||
expectSuccess(success);
|
||||
}
|
||||
|
||||
async function enable(success: boolean) {
|
||||
setupRawAlertMocks({}, { enabled: false });
|
||||
|
||||
try {
|
||||
await alertsClient.enable({ id: MockAlertId });
|
||||
} catch (err) {
|
||||
return expectConflict(success, err);
|
||||
}
|
||||
|
||||
// a successful enable call makes 2 calls to update, so that's 3 total,
|
||||
// 1 with conflict + 2 on success
|
||||
expectSuccess(success, 3);
|
||||
}
|
||||
|
||||
async function disable(success: boolean) {
|
||||
try {
|
||||
await alertsClient.disable({ id: MockAlertId });
|
||||
} catch (err) {
|
||||
return expectConflict(success, err);
|
||||
}
|
||||
|
||||
expectSuccess(success);
|
||||
}
|
||||
|
||||
async function muteAll(success: boolean) {
|
||||
try {
|
||||
await alertsClient.muteAll({ id: MockAlertId });
|
||||
} catch (err) {
|
||||
return expectConflict(success, err);
|
||||
}
|
||||
|
||||
expectSuccess(success);
|
||||
}
|
||||
|
||||
async function unmuteAll(success: boolean) {
|
||||
try {
|
||||
await alertsClient.unmuteAll({ id: MockAlertId });
|
||||
} catch (err) {
|
||||
return expectConflict(success, err);
|
||||
}
|
||||
|
||||
expectSuccess(success);
|
||||
}
|
||||
|
||||
async function muteInstance(success: boolean) {
|
||||
try {
|
||||
await alertsClient.muteInstance({ alertId: MockAlertId, alertInstanceId: 'instance-id' });
|
||||
} catch (err) {
|
||||
return expectConflict(success, err);
|
||||
}
|
||||
|
||||
expectSuccess(success);
|
||||
}
|
||||
|
||||
async function unmuteInstance(success: boolean) {
|
||||
setupRawAlertMocks({}, { mutedInstanceIds: ['instance-id'] });
|
||||
try {
|
||||
await alertsClient.unmuteInstance({ alertId: MockAlertId, alertInstanceId: 'instance-id' });
|
||||
} catch (err) {
|
||||
return expectConflict(success, err);
|
||||
}
|
||||
|
||||
expectSuccess(success);
|
||||
}
|
||||
|
||||
// tests to run when the method is expected to succeed
|
||||
function expectSuccess(
|
||||
success: boolean,
|
||||
count: number = 2,
|
||||
method: 'update' | 'create' = 'update'
|
||||
) {
|
||||
expect(success).toBe(true);
|
||||
expect(unsecuredSavedObjectsClient[method]).toHaveBeenCalledTimes(count);
|
||||
// message content checked in the update test
|
||||
expect(logger.debug).toHaveBeenCalled();
|
||||
}
|
||||
|
||||
// tests to run when the method is expected to fail
|
||||
function expectConflict(success: boolean, err: Error, method: 'update' | 'create' = 'update') {
|
||||
const conflictErrorMessage = SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId)
|
||||
.message;
|
||||
|
||||
expect(`${err}`).toBe(`Error: ${conflictErrorMessage}`);
|
||||
expect(success).toBe(false);
|
||||
expect(unsecuredSavedObjectsClient[method]).toHaveBeenCalledTimes(ConflictAfterRetries);
|
||||
// message content checked in the update test
|
||||
expect(logger.debug).toBeCalledTimes(RetryForConflictsAttempts);
|
||||
expect(logger.warn).toBeCalledTimes(1);
|
||||
}
|
||||
|
||||
// wrapper to call the test function with a it's own name
|
||||
function testFn(fn: (success: boolean) => unknown, success: boolean) {
|
||||
test(`${fn.name}`, async () => await fn(success));
|
||||
}
|
||||
|
||||
// set up mocks for update or create (the update() method uses create!)
|
||||
function mockSavedObjectUpdateConflictErrorTimes(times: number) {
|
||||
// default success value
|
||||
const mockUpdateValue = {
|
||||
id: MockAlertId,
|
||||
type: 'alert',
|
||||
attributes: {
|
||||
actions: [],
|
||||
scheduledTaskId: 'scheduled-task-id',
|
||||
},
|
||||
references: [],
|
||||
};
|
||||
|
||||
unsecuredSavedObjectsClient.update.mockResolvedValue(mockUpdateValue);
|
||||
unsecuredSavedObjectsClient.create.mockResolvedValue(mockUpdateValue);
|
||||
|
||||
// queue up specified number of errors before a success call
|
||||
for (let i = 0; i < times; i++) {
|
||||
unsecuredSavedObjectsClient.update.mockRejectedValueOnce(
|
||||
SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId)
|
||||
);
|
||||
unsecuredSavedObjectsClient.create.mockRejectedValueOnce(
|
||||
SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// set up mocks needed to get the tested methods to run
|
||||
function setupRawAlertMocks(
|
||||
overrides: Record<string, unknown> = {},
|
||||
attributeOverrides: Record<string, unknown> = {}
|
||||
) {
|
||||
const rawAlert = {
|
||||
id: MockAlertId,
|
||||
type: 'alert',
|
||||
attributes: {
|
||||
enabled: true,
|
||||
tags: ['foo'],
|
||||
alertTypeId: 'myType',
|
||||
schedule: { interval: '10s' },
|
||||
consumer: 'myApp',
|
||||
scheduledTaskId: 'task-123',
|
||||
params: {},
|
||||
throttle: null,
|
||||
actions: [],
|
||||
muteAll: false,
|
||||
mutedInstanceIds: [],
|
||||
...attributeOverrides,
|
||||
},
|
||||
references: [],
|
||||
version: '123',
|
||||
...overrides,
|
||||
};
|
||||
const decryptedRawAlert = {
|
||||
...rawAlert,
|
||||
attributes: {
|
||||
...rawAlert.attributes,
|
||||
apiKey: Buffer.from('123:abc').toString('base64'),
|
||||
},
|
||||
};
|
||||
|
||||
unsecuredSavedObjectsClient.get.mockReset();
|
||||
encryptedSavedObjects.getDecryptedAsInternalUser.mockReset();
|
||||
|
||||
// splitting this out as it's easier to set a breakpoint :-)
|
||||
// eslint-disable-next-line prettier/prettier
|
||||
unsecuredSavedObjectsClient.get.mockImplementation(async () =>
|
||||
cloneDeep(rawAlert)
|
||||
);
|
||||
|
||||
encryptedSavedObjects.getDecryptedAsInternalUser.mockImplementation(async () =>
|
||||
cloneDeep(decryptedRawAlert)
|
||||
);
|
||||
}
|
||||
|
||||
// setup for each test
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
|
||||
alertsClientParams.createAPIKey.mockResolvedValue({ apiKeysEnabled: false });
|
||||
alertsClientParams.invalidateAPIKey.mockResolvedValue({
|
||||
apiKeysEnabled: true,
|
||||
result: {
|
||||
invalidated_api_keys: [],
|
||||
previously_invalidated_api_keys: [],
|
||||
error_count: 0,
|
||||
},
|
||||
});
|
||||
alertsClientParams.getUserName.mockResolvedValue('elastic');
|
||||
|
||||
taskManager.runNow.mockResolvedValue({ id: '' });
|
||||
taskManager.schedule.mockResolvedValue({
|
||||
id: 'scheduled-task-id',
|
||||
scheduledAt: new Date(),
|
||||
attempts: 0,
|
||||
status: TaskStatus.Idle,
|
||||
runAt: new Date(),
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
state: {},
|
||||
ownerId: null,
|
||||
taskType: 'task-type',
|
||||
params: {},
|
||||
});
|
||||
|
||||
const actionsClient = actionsClientMock.create();
|
||||
actionsClient.getBulk.mockResolvedValue([]);
|
||||
alertsClientParams.getActionsClient.mockResolvedValue(actionsClient);
|
||||
|
||||
alertTypeRegistry.get.mockImplementation((id) => ({
|
||||
id: '123',
|
||||
name: 'Test',
|
||||
actionGroups: [{ id: 'default', name: 'Default' }],
|
||||
defaultActionGroupId: 'default',
|
||||
async executor() {},
|
||||
producer: 'alerts',
|
||||
}));
|
||||
|
||||
alertTypeRegistry.get.mockReturnValue({
|
||||
id: 'myType',
|
||||
name: 'Test',
|
||||
actionGroups: [{ id: 'default', name: 'Default' }],
|
||||
defaultActionGroupId: 'default',
|
||||
async executor() {},
|
||||
producer: 'alerts',
|
||||
});
|
||||
|
||||
alertsClient = new AlertsClient(alertsClientParams);
|
||||
|
||||
setupRawAlertMocks();
|
||||
});
|
78
x-pack/plugins/alerts/server/lib/retry_if_conflicts.test.ts
Normal file
78
x-pack/plugins/alerts/server/lib/retry_if_conflicts.test.ts
Normal file
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
|
||||
import { retryIfConflicts, RetryForConflictsAttempts } from './retry_if_conflicts';
|
||||
import { loggingSystemMock } from '../../../../../src/core/server/mocks';
|
||||
|
||||
describe('retry_if_conflicts', () => {
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
test('should work when operation is a success', async () => {
|
||||
const result = await retryIfConflicts(MockLogger, MockOperationName, OperationSuccessful);
|
||||
expect(result).toBe(MockResult);
|
||||
});
|
||||
|
||||
test('should throw error if not a conflict error', async () => {
|
||||
await expect(
|
||||
retryIfConflicts(MockLogger, MockOperationName, OperationFailure)
|
||||
).rejects.toThrowError('wops');
|
||||
});
|
||||
|
||||
for (let i = 1; i <= RetryForConflictsAttempts; i++) {
|
||||
test(`should work when operation conflicts ${i} times`, async () => {
|
||||
const result = await retryIfConflicts(
|
||||
MockLogger,
|
||||
MockOperationName,
|
||||
getOperationConflictsTimes(i)
|
||||
);
|
||||
expect(result).toBe(MockResult);
|
||||
expect(MockLogger.debug).toBeCalledTimes(i);
|
||||
for (let j = 0; j < i; j++) {
|
||||
expect(MockLogger.debug).nthCalledWith(i, `${MockOperationName} conflict, retrying ...`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
test(`should throw conflict error when conflicts > ${RetryForConflictsAttempts} times`, async () => {
|
||||
await expect(
|
||||
retryIfConflicts(
|
||||
MockLogger,
|
||||
MockOperationName,
|
||||
getOperationConflictsTimes(RetryForConflictsAttempts + 1)
|
||||
)
|
||||
).rejects.toThrowError(SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId));
|
||||
expect(MockLogger.debug).toBeCalledTimes(RetryForConflictsAttempts);
|
||||
expect(MockLogger.warn).toBeCalledTimes(1);
|
||||
expect(MockLogger.warn).toBeCalledWith(`${MockOperationName} conflict, exceeded retries`);
|
||||
});
|
||||
});
|
||||
|
||||
const MockAlertId = 'alert-id';
|
||||
const MockOperationName = 'conflict-retryable-operation';
|
||||
const MockLogger = loggingSystemMock.create().get();
|
||||
const MockResult = 42;
|
||||
|
||||
async function OperationSuccessful() {
|
||||
return MockResult;
|
||||
}
|
||||
|
||||
async function OperationFailure() {
|
||||
throw new Error('wops');
|
||||
}
|
||||
|
||||
function getOperationConflictsTimes(times: number) {
|
||||
return async function OperationConflictsTimes() {
|
||||
times--;
|
||||
if (times >= 0) {
|
||||
throw SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId);
|
||||
}
|
||||
|
||||
return MockResult;
|
||||
};
|
||||
}
|
58
x-pack/plugins/alerts/server/lib/retry_if_conflicts.ts
Normal file
58
x-pack/plugins/alerts/server/lib/retry_if_conflicts.ts
Normal file
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
// This module provides a helper to perform retries on a function if the
|
||||
// function ends up throwing a SavedObject 409 conflict. This can happen
|
||||
// when alert SO's are updated in the background, and will avoid having to
|
||||
// have the caller make explicit conflict checks, where the conflict was
|
||||
// caused by a background update.
|
||||
|
||||
import { Logger, SavedObjectsErrorHelpers } from '../../../../../src/core/server';
|
||||
|
||||
type RetryableForConflicts<T> = () => Promise<T>;
|
||||
|
||||
// number of times to retry when conflicts occur
|
||||
// note: it seems unlikely that we'd need more than one retry, but leaving
|
||||
// this statically configurable in case we DO need > 1
|
||||
export const RetryForConflictsAttempts = 1;
|
||||
|
||||
// milliseconds to wait before retrying when conflicts occur
|
||||
// note: we considered making this random, to help avoid a stampede, but
|
||||
// with 1 retry it probably doesn't matter, and adding randomness could
|
||||
// make it harder to diagnose issues
|
||||
const RetryForConflictsDelay = 250;
|
||||
|
||||
// retry an operation if it runs into 409 Conflict's, up to a limit
|
||||
export async function retryIfConflicts<T>(
|
||||
logger: Logger,
|
||||
name: string,
|
||||
operation: RetryableForConflicts<T>,
|
||||
retries: number = RetryForConflictsAttempts
|
||||
): Promise<T> {
|
||||
// run the operation, return if no errors or throw if not a conflict error
|
||||
try {
|
||||
return await operation();
|
||||
} catch (err) {
|
||||
if (!SavedObjectsErrorHelpers.isConflictError(err)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
// must be a conflict; if no retries left, throw it
|
||||
if (retries <= 0) {
|
||||
logger.warn(`${name} conflict, exceeded retries`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
// delay a bit before retrying
|
||||
logger.debug(`${name} conflict, retrying ...`);
|
||||
await waitBeforeNextRetry();
|
||||
return await retryIfConflicts(logger, name, operation, retries - 1);
|
||||
}
|
||||
}
|
||||
|
||||
async function waitBeforeNextRetry(): Promise<void> {
|
||||
await new Promise((resolve) => setTimeout(resolve, RetryForConflictsDelay));
|
||||
}
|
|
@ -9,6 +9,23 @@ import mappings from './mappings.json';
|
|||
import { getMigrations } from './migrations';
|
||||
import { EncryptedSavedObjectsPluginSetup } from '../../../encrypted_saved_objects/server';
|
||||
|
||||
export { partiallyUpdateAlert } from './partially_update_alert';
|
||||
|
||||
export const AlertAttributesExcludedFromAAD = [
|
||||
'scheduledTaskId',
|
||||
'muteAll',
|
||||
'mutedInstanceIds',
|
||||
'updatedBy',
|
||||
];
|
||||
|
||||
// useful for Pick<RawAlert, AlertAttributesExcludedFromAADType> which is a
|
||||
// type which is a subset of RawAlert with just attributes excluded from AAD
|
||||
export type AlertAttributesExcludedFromAADType =
|
||||
| 'scheduledTaskId'
|
||||
| 'muteAll'
|
||||
| 'mutedInstanceIds'
|
||||
| 'updatedBy';
|
||||
|
||||
export function setupSavedObjects(
|
||||
savedObjects: SavedObjectsServiceSetup,
|
||||
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import {
|
||||
SavedObjectsClientContract,
|
||||
ISavedObjectsRepository,
|
||||
SavedObjectsErrorHelpers,
|
||||
} from '../../../../../src/core/server';
|
||||
|
||||
import { partiallyUpdateAlert, PartiallyUpdateableAlertAttributes } from './partially_update_alert';
|
||||
import { savedObjectsClientMock } from '../../../../../src/core/server/mocks';
|
||||
|
||||
const MockSavedObjectsClientContract = savedObjectsClientMock.create();
|
||||
const MockISavedObjectsRepository = (MockSavedObjectsClientContract as unknown) as jest.Mocked<
|
||||
ISavedObjectsRepository
|
||||
>;
|
||||
|
||||
describe('partially_update_alert', () => {
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
for (const [soClientName, soClient] of Object.entries(getMockSavedObjectClients()))
|
||||
describe(`using ${soClientName}`, () => {
|
||||
test('should work with no options', async () => {
|
||||
soClient.update.mockResolvedValueOnce(MockUpdateValue);
|
||||
|
||||
await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes);
|
||||
expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {});
|
||||
});
|
||||
|
||||
test('should work with extraneous attributes ', async () => {
|
||||
const attributes = (InvalidAttributes as unknown) as PartiallyUpdateableAlertAttributes;
|
||||
soClient.update.mockResolvedValueOnce(MockUpdateValue);
|
||||
|
||||
await partiallyUpdateAlert(soClient, MockAlertId, attributes);
|
||||
expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {});
|
||||
});
|
||||
|
||||
test('should handle SO errors', async () => {
|
||||
soClient.update.mockRejectedValueOnce(new Error('wops'));
|
||||
|
||||
await expect(
|
||||
partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes)
|
||||
).rejects.toThrowError('wops');
|
||||
});
|
||||
|
||||
test('should handle the version option', async () => {
|
||||
soClient.update.mockResolvedValueOnce(MockUpdateValue);
|
||||
|
||||
await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes, { version: '1.2.3' });
|
||||
expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {
|
||||
version: '1.2.3',
|
||||
});
|
||||
});
|
||||
|
||||
test('should handle the ignore404 option', async () => {
|
||||
const err = SavedObjectsErrorHelpers.createGenericNotFoundError();
|
||||
soClient.update.mockRejectedValueOnce(err);
|
||||
|
||||
await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes, { ignore404: true });
|
||||
expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {});
|
||||
});
|
||||
|
||||
test('should handle the namespace option', async () => {
|
||||
soClient.update.mockResolvedValueOnce(MockUpdateValue);
|
||||
|
||||
await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes, {
|
||||
namespace: 'bat.cave',
|
||||
});
|
||||
expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {
|
||||
namespace: 'bat.cave',
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function getMockSavedObjectClients(): Record<
|
||||
string,
|
||||
jest.Mocked<SavedObjectsClientContract | ISavedObjectsRepository>
|
||||
> {
|
||||
return {
|
||||
SavedObjectsClientContract: MockSavedObjectsClientContract,
|
||||
// doesn't appear to be a mock for this, but it's basically the same as the above,
|
||||
// so just cast it to make sure we catch any type errors
|
||||
ISavedObjectsRepository: MockISavedObjectsRepository,
|
||||
};
|
||||
}
|
||||
|
||||
const DefaultAttributes = {
|
||||
scheduledTaskId: 'scheduled-task-id',
|
||||
muteAll: true,
|
||||
mutedInstanceIds: ['muted-instance-id-1', 'muted-instance-id-2'],
|
||||
updatedBy: 'someone',
|
||||
};
|
||||
|
||||
const InvalidAttributes = { ...DefaultAttributes, foo: 'bar' };
|
||||
|
||||
const MockAlertId = 'alert-id';
|
||||
|
||||
const MockUpdateValue = {
|
||||
id: MockAlertId,
|
||||
type: 'alert',
|
||||
attributes: {
|
||||
actions: [],
|
||||
scheduledTaskId: 'scheduled-task-id',
|
||||
},
|
||||
references: [],
|
||||
};
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { pick } from 'lodash';
|
||||
import { RawAlert } from '../types';
|
||||
|
||||
import {
|
||||
SavedObjectsClient,
|
||||
SavedObjectsErrorHelpers,
|
||||
SavedObjectsUpdateOptions,
|
||||
} from '../../../../../src/core/server';
|
||||
|
||||
import { AlertAttributesExcludedFromAAD, AlertAttributesExcludedFromAADType } from './index';
|
||||
|
||||
export type PartiallyUpdateableAlertAttributes = Pick<RawAlert, AlertAttributesExcludedFromAADType>;
|
||||
|
||||
export interface PartiallyUpdateAlertSavedObjectOptions {
|
||||
version?: string;
|
||||
ignore404?: boolean;
|
||||
namespace?: string; // only should be used with ISavedObjectsRepository
|
||||
}
|
||||
|
||||
// typed this way so we can send a SavedObjectClient or SavedObjectRepository
|
||||
type SavedObjectClientForUpdate = Pick<SavedObjectsClient, 'update'>;
|
||||
|
||||
// direct, partial update to an alert saved object via scoped SavedObjectsClient
|
||||
// using namespace set in the client
|
||||
export async function partiallyUpdateAlert(
|
||||
savedObjectsClient: SavedObjectClientForUpdate,
|
||||
id: string,
|
||||
attributes: PartiallyUpdateableAlertAttributes,
|
||||
options: PartiallyUpdateAlertSavedObjectOptions = {}
|
||||
): Promise<void> {
|
||||
// ensure we only have the valid attributes excluded from AAD
|
||||
const attributeUpdates = pick(attributes, AlertAttributesExcludedFromAAD);
|
||||
const updateOptions: SavedObjectsUpdateOptions = pick(options, 'namespace', 'version');
|
||||
|
||||
try {
|
||||
await savedObjectsClient.update<RawAlert>('alert', id, attributeUpdates, updateOptions);
|
||||
} catch (err) {
|
||||
if (options?.ignore404 && SavedObjectsErrorHelpers.isNotFoundError(err)) {
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue