Revert "[ResponseOps] Retry bulk update conflicts in task manager" (#149038)

Reverts elastic/kibana#147808
This commit is contained in:
Mike Côté 2023-01-17 10:18:31 -05:00 committed by GitHub
parent 1dfa106ae2
commit a4cf50cb42
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 32 additions and 579 deletions

View file

@ -1,309 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggingSystemMock, savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { SerializedConcreteTaskInstance, TaskStatus } from '../task';
import { NUM_RETRIES, retryOnBulkUpdateConflict } from './retry_on_bulk_update_conflict';
const mockSavedObjectsRepository = savedObjectsRepositoryMock.create();
const mockLogger = loggingSystemMock.create().get();
const mockedDate = new Date('2019-02-12T21:01:22.479Z');
const task1 = {
type: 'task',
id: 'task:123456',
attributes: {
runAt: mockedDate.toISOString(),
scheduledAt: mockedDate.toISOString(),
startedAt: null,
retryAt: null,
params: `{ "hello": "world" }`,
state: `{ "id": "123456" }`,
taskType: 'alert',
attempts: 3,
status: 'idle' as TaskStatus,
ownerId: null,
traceparent: '',
},
};
const task2 = {
type: 'task',
id: 'task:324242',
attributes: {
runAt: mockedDate.toISOString(),
scheduledAt: mockedDate.toISOString(),
startedAt: null,
retryAt: null,
params: `{ "hello": "world" }`,
state: `{ "foo": "bar" }`,
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
ownerId: null,
traceparent: '',
},
};
const task3 = {
type: 'task',
id: 'task:xyaaa',
attributes: {
runAt: mockedDate.toISOString(),
scheduledAt: mockedDate.toISOString(),
startedAt: null,
retryAt: null,
params: `{ "goodbye": "world" }`,
state: `{ "foo": "bar" }`,
taskType: 'action',
attempts: 3,
status: 'idle' as TaskStatus,
ownerId: null,
traceparent: '',
},
};
describe('retryOnBulkUpdateConflict', () => {
beforeEach(() => {
jest.resetAllMocks();
});
test('should not retry when all updates are successful', async () => {
const savedObjectResponse = [
{
id: task1.id,
type: task1.type,
attributes: task1.attributes,
references: [],
},
];
mockSavedObjectsRepository.bulkUpdate.mockResolvedValueOnce({
saved_objects: savedObjectResponse,
});
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
logger: mockLogger,
savedObjectsRepository: mockSavedObjectsRepository,
objects: [task1],
});
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(1);
expect(savedObjects).toEqual(savedObjectResponse);
});
test('should throw error when saved objects bulkUpdate throws an error', async () => {
mockSavedObjectsRepository.bulkUpdate.mockImplementationOnce(() => {
throw new Error('fail');
});
await expect(() =>
retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
logger: mockLogger,
savedObjectsRepository: mockSavedObjectsRepository,
objects: [task1],
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"fail"`);
});
test('should not retry and return non-conflict errors', async () => {
const savedObjectResponse = [
{
id: task1.id,
type: task1.type,
attributes: task1.attributes,
references: [],
},
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
error: {
error: `Not a conflict`,
message: `Some error that's not a conflict`,
statusCode: 404,
},
references: [],
},
];
mockSavedObjectsRepository.bulkUpdate.mockResolvedValueOnce({
saved_objects: savedObjectResponse,
});
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
logger: mockLogger,
savedObjectsRepository: mockSavedObjectsRepository,
objects: [task1, task2],
});
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(1);
expect(savedObjects).toEqual(savedObjectResponse);
});
test(`should return conflict errors when number of retries exceeds ${NUM_RETRIES}`, async () => {
const savedObjectResponse = [
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
error: {
error: `Conflict`,
message: `There was a conflict`,
statusCode: 409,
},
references: [],
},
];
mockSavedObjectsRepository.bulkUpdate.mockResolvedValue({
saved_objects: savedObjectResponse,
});
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
logger: mockLogger,
savedObjectsRepository: mockSavedObjectsRepository,
objects: [task2],
});
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(NUM_RETRIES + 1);
expect(savedObjects).toEqual(savedObjectResponse);
expect(mockLogger.warn).toBeCalledWith('Bulk update saved object conflicts, exceeded retries');
});
test('should retry as expected when there are conflicts', async () => {
mockSavedObjectsRepository.bulkUpdate
.mockResolvedValueOnce({
saved_objects: [
{
id: task1.id,
type: task1.type,
attributes: task1.attributes,
references: [],
},
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
error: {
error: `Conflict`,
message: `This is a conflict`,
statusCode: 409,
},
references: [],
},
{
id: task3.id,
type: task3.type,
attributes: task3.attributes,
error: {
error: `Conflict`,
message: `This is a conflict`,
statusCode: 409,
},
references: [],
},
],
})
.mockResolvedValueOnce({
saved_objects: [
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
error: {
error: `Conflict`,
message: `This is a conflict`,
statusCode: 409,
},
references: [],
},
{
id: task3.id,
type: task3.type,
attributes: task3.attributes,
references: [],
},
],
})
.mockResolvedValueOnce({
saved_objects: [
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
error: {
error: `Conflict`,
message: `This is a conflict`,
statusCode: 409,
},
references: [],
},
],
})
.mockResolvedValueOnce({
saved_objects: [
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
error: {
error: `Conflict`,
message: `This is a conflict`,
statusCode: 409,
},
references: [],
},
],
})
.mockResolvedValueOnce({
saved_objects: [
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
references: [],
},
],
});
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
logger: mockLogger,
savedObjectsRepository: mockSavedObjectsRepository,
objects: [task1, task2, task3],
retries: 5,
});
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(5);
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(
1,
[task1, task2, task3],
undefined
);
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(
2,
[task2, task3],
undefined
);
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(3, [task2], undefined);
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(4, [task2], undefined);
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(5, [task2], undefined);
expect(savedObjects).toEqual([
{
id: task1.id,
type: task1.type,
attributes: task1.attributes,
references: [],
},
{
id: task3.id,
type: task3.type,
attributes: task3.attributes,
references: [],
},
{
id: task2.id,
type: task2.type,
attributes: task2.attributes,
references: [],
},
]);
});
});

View file

@ -1,123 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
ISavedObjectsRepository,
Logger,
SavedObjectsBulkUpdateObject,
SavedObjectsBulkUpdateOptions,
SavedObjectsUpdateResponse,
} from '@kbn/core/server';
export const NUM_RETRIES = 2;
interface RetryOnBulkUpdateConflictOpts<T> {
logger: Logger;
savedObjectsRepository: ISavedObjectsRepository;
objects: Array<SavedObjectsBulkUpdateObject<T>>;
options?: SavedObjectsBulkUpdateOptions;
retries?: number;
}
interface RetryOnBulkUpdateConflictResults<T> {
savedObjects: Array<SavedObjectsUpdateResponse<T>>;
}
export const retryOnBulkUpdateConflict = async <T>({
logger,
savedObjectsRepository,
objects,
options,
retries = NUM_RETRIES,
}: RetryOnBulkUpdateConflictOpts<T>): Promise<RetryOnBulkUpdateConflictResults<T>> => {
return retryOnBulkUpdateConflictHelper({
logger,
savedObjectsRepository,
objects,
options,
retries,
});
};
const retryOnBulkUpdateConflictHelper = async <T>({
logger,
savedObjectsRepository,
objects,
options,
retries = NUM_RETRIES,
accResults = [],
}: RetryOnBulkUpdateConflictOpts<T> & {
accResults?: Array<SavedObjectsUpdateResponse<T>>;
}): Promise<RetryOnBulkUpdateConflictResults<T>> => {
try {
const { saved_objects: savedObjectsResults } = await savedObjectsRepository.bulkUpdate(
objects,
options
);
const currResults: Array<SavedObjectsUpdateResponse<T>> = [];
const currConflicts: Array<SavedObjectsUpdateResponse<T>> = [];
const objectsToRetry: Array<SavedObjectsBulkUpdateObject<T>> = [];
savedObjectsResults.forEach(
(savedObjectResult: SavedObjectsUpdateResponse<T>, index: number) => {
if (savedObjectResult.error && savedObjectResult.error.statusCode === 409) {
// The SavedObjectsRepository maintains the order of the docs
// so we can rely on the index in the `docs` to match an error
// on the same index in the `bulkUpdate` result
objectsToRetry.push(objects[index]);
currConflicts.push(savedObjectResult);
} else {
// Save results, whether they are successful or non-conflict errors
currResults.push(savedObjectResult);
}
}
);
const results =
retries <= 0
? [...accResults, ...currResults, ...currConflicts]
: [...accResults, ...currResults];
if (objectsToRetry.length === 0) {
return {
savedObjects: results,
};
}
if (retries <= 0) {
logger.warn(`Bulk update saved object conflicts, exceeded retries`);
return {
savedObjects: results,
};
}
await waitBeforeNextRetry(retries);
return retryOnBulkUpdateConflictHelper({
logger,
savedObjectsRepository,
objects: objectsToRetry,
options,
retries: retries - 1,
accResults: results,
});
} catch (err) {
throw err;
}
};
export const randomDelayMs = Math.floor(Math.random() * 100);
export const getExponentialDelayMultiplier = (retries: number) => 1 + (NUM_RETRIES - retries) ** 2;
export const RETRY_IF_CONFLICTS_DELAY = 250;
export const waitBeforeNextRetry = async (retries: number): Promise<void> => {
const exponentialDelayMultiplier = getExponentialDelayMultiplier(retries);
await new Promise((resolve) =>
setTimeout(resolve, RETRY_IF_CONFLICTS_DELAY * exponentialDelayMultiplier + randomDelayMs)
);
};

View file

@ -207,7 +207,6 @@ export class TaskManagerPlugin
const serializer = savedObjects.createSerializer();
const taskStore = new TaskStore({
logger: this.logger,
serializer,
savedObjectsRepository,
esClient: elasticsearch.client.asInternalUser,

View file

@ -43,7 +43,6 @@ import { EphemeralTaskRejectedDueToCapacityError } from './task_running';
const VERSION_CONFLICT_STATUS = 409;
const BULK_ACTION_SIZE = 100;
const BULK_UPDATE_NUM_RETRIES = 3;
export interface TaskSchedulingOpts {
logger: Logger;
taskStore: TaskStore;
@ -265,10 +264,7 @@ export class TaskScheduling {
}
private async bulkUpdateTasksHelper(updatedTasks: ConcreteTaskInstance[]) {
// Performs bulk update with retries
return (
await this.store.bulkUpdate(updatedTasks, BULK_UPDATE_NUM_RETRIES)
).reduce<BulkUpdateTaskResult>(
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateTaskResult>(
(acc, task) => {
if (task.tag === 'ok') {
acc.tasks.push(task.value);

View file

@ -15,11 +15,7 @@ import {
TaskLifecycleResult,
SerializedConcreteTaskInstance,
} from './task';
import {
elasticsearchServiceMock,
savedObjectsServiceMock,
loggingSystemMock,
} from '@kbn/core/server/mocks';
import { elasticsearchServiceMock, savedObjectsServiceMock } from '@kbn/core/server/mocks';
import { TaskStore, SearchOpts, AggregationOpts } from './task_store';
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { SavedObjectAttributes, SavedObjectsErrorHelpers } from '@kbn/core/server';
@ -29,7 +25,6 @@ import { AdHocTaskCounter } from './lib/adhoc_task_counter';
const savedObjectsClient = savedObjectsRepositoryMock.create();
const serializer = savedObjectsServiceMock.createSerializer();
const logger = loggingSystemMock.create().get();
const adHocTaskCounter = new AdHocTaskCounter();
const randomId = () => `id-${_.random(1, 20)}`;
@ -72,7 +67,6 @@ describe('TaskStore', () => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
logger,
serializer,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
@ -239,7 +233,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -309,7 +302,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -408,7 +400,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -512,7 +503,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -520,98 +510,6 @@ describe('TaskStore', () => {
});
});
test('correctly returns errors from saved object bulk update', async () => {
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
ownerId: null,
traceparent: '',
};
savedObjectsClient.bulkUpdate.mockResolvedValueOnce({
saved_objects: [
{
id: task.id,
type: task.taskType,
attributes: {
runAt: mockedDate.toISOString(),
scheduledAt: mockedDate.toISOString(),
startedAt: null,
retryAt: null,
params: `{ "hello": "world" }`,
state: `{ "foo": "bar" }`,
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
ownerId: null,
traceparent: '',
},
error: {
error: `Not a conflict`,
message: `Some error that's not a conflict`,
statusCode: 404,
},
references: [],
},
],
});
const result = await store.bulkUpdate([task]);
expect(result).toEqual([
{
error: {
entity: {
attempts: 3,
id: 'task:324242',
ownerId: null,
params: { hello: 'world' },
retryAt: null,
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
state: { foo: 'bar' },
status: 'idle',
taskType: 'report',
traceparent: '',
version: '123',
},
error: {
attributes: {
attempts: 3,
ownerId: null,
params: '{ "hello": "world" }',
retryAt: null,
runAt: mockedDate.toISOString(),
scheduledAt: mockedDate.toISOString(),
startedAt: null,
state: '{ "foo": "bar" }',
status: 'idle',
taskType: 'report',
traceparent: '',
},
error: {
error: 'Not a conflict',
message: "Some error that's not a conflict",
statusCode: 404,
},
id: 'task:324242',
references: [],
type: 'report',
},
},
tag: 'err',
},
]);
});
test('pushes error from saved objects client to errors$', async () => {
const task = {
runAt: mockedDate,
@ -646,7 +544,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -681,7 +578,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -716,7 +612,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -802,7 +697,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -823,7 +717,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -842,7 +735,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -861,7 +753,6 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
logger,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,

View file

@ -15,7 +15,6 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server';
import {
Logger,
SavedObject,
ISavedObjectsSerializer,
SavedObjectsRawDoc,
@ -36,10 +35,8 @@ import {
import { TaskTypeDictionary } from './task_type_dictionary';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { retryOnBulkUpdateConflict } from './lib/retry_on_bulk_update_conflict';
export interface StoreOpts {
logger: Logger;
esClient: ElasticsearchClient;
index: string;
taskManagerId: string;
@ -97,7 +94,6 @@ export class TaskStore {
public readonly errors$ = new Subject<Error>();
private esClient: ElasticsearchClient;
private logger: Logger;
private definitions: TaskTypeDictionary;
private savedObjectsRepository: ISavedObjectsRepository;
private serializer: ISavedObjectsSerializer;
@ -115,7 +111,6 @@ export class TaskStore {
constructor(opts: StoreOpts) {
this.esClient = opts.esClient;
this.index = opts.index;
this.logger = opts.logger;
this.taskManagerId = opts.taskManagerId;
this.definitions = opts.definitions;
this.serializer = opts.serializer;
@ -251,45 +246,34 @@ export class TaskStore {
* @param {Array<TaskDoc>} docs
* @returns {Promise<Array<TaskDoc>>}
*/
public async bulkUpdate(
docs: ConcreteTaskInstance[],
retries: number = 0
): Promise<BulkUpdateResult[]> {
public async bulkUpdate(docs: ConcreteTaskInstance[]): Promise<BulkUpdateResult[]> {
const attributesByDocId = docs.reduce((attrsById, doc) => {
attrsById.set(doc.id, taskInstanceToAttributes(doc));
return attrsById;
}, new Map());
let updatedSavedObjects: Array<SavedObjectsUpdateResponse<SerializedConcreteTaskInstance>>;
let updatedSavedObjects: Array<SavedObjectsUpdateResponse | Error>;
try {
({ savedObjects: updatedSavedObjects } =
await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
logger: this.logger,
savedObjectsRepository: this.savedObjectsRepository,
objects: docs.map((doc) => ({
({ saved_objects: updatedSavedObjects } =
await this.savedObjectsRepository.bulkUpdate<SerializedConcreteTaskInstance>(
docs.map((doc) => ({
type: 'task',
id: doc.id,
options: { version: doc.version },
attributes: attributesByDocId.get(doc.id)!,
})),
options: {
{
refresh: false,
},
retries,
}));
}
));
} catch (e) {
this.errors$.next(e);
throw e;
}
return updatedSavedObjects.map((updatedSavedObject) => {
const doc = docs.find((d) => d.id === updatedSavedObject.id);
return updatedSavedObject.error !== undefined
? asErr({
entity: doc,
error: updatedSavedObject,
})
: asOk(
return updatedSavedObjects.map<BulkUpdateResult>((updatedSavedObject, index) =>
isSavedObjectsUpdateResponse(updatedSavedObject)
? asOk(
savedObjectToConcreteTaskInstance({
...updatedSavedObject,
attributes: defaults(
@ -297,8 +281,15 @@ export class TaskStore {
attributesByDocId.get(updatedSavedObject.id)!
),
})
);
}) as BulkUpdateResult[];
)
: asErr({
// The SavedObjectsRepository maintains the order of the docs
// so we can rely on the index in the `docs` to match an error
// on the same index in the `bulkUpdate` result
entity: docs[index],
error: updatedSavedObject,
})
);
}
/**
@ -544,3 +535,9 @@ function ensureAggregationOnlyReturnsTaskObjects(opts: AggregationOpts): Aggrega
query,
};
}
function isSavedObjectsUpdateResponse(
result: SavedObjectsUpdateResponse | Error
): result is SavedObjectsUpdateResponse {
return result && typeof (result as SavedObjectsUpdateResponse).id === 'string';
}

View file

@ -26,7 +26,8 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
const supertest = getService('supertest');
const supertestWithoutAuth = getService('supertestWithoutAuth');
describe('disable', () => {
// Failing: See https://github.com/elastic/kibana/issues/141849
describe.skip('disable', () => {
const objectRemover = new ObjectRemover(supertest);
after(() => objectRemover.removeAll());

View file

@ -26,7 +26,8 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
const retry = getService('retry');
const supertest = getService('supertest');
describe('disable', () => {
// Failing: See https://github.com/elastic/kibana/issues/141864
describe.skip('disable', () => {
const objectRemover = new ObjectRemover(supertestWithoutAuth);
const ruleUtils = new RuleUtils({ space: Spaces.space1, supertestWithoutAuth });