mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 01:13:23 -04:00
[ResponseOps] Retry bulk update conflicts in task manager (#147808)
Resolves https://github.com/elastic/kibana/issues/145316, https://github.com/elastic/kibana/issues/141849, https://github.com/elastic/kibana/issues/141864 ## Summary Adds a retry on conflict error to the saved objects bulk update call made by task manager. Errors are returned by the saved object client inside an array (with a success response). Previously, we were not inspecting the response array, just returning the full data. With this PR, we are inspecting the response array specifically for conflict errors and retrying the update for just those tasks. This `bulkUpdate` function is used both internally by task manager and externally by the rules client. I default the number of retries to 0 for bulk updates from the task manager in order to preserve existing behavior (and in order not to increase the amount of time it takes for task manager to run) but use 3 retries when used externally. Also unskipped the two flaky disable tests and ran them through the flaky test runner 400 times with no failures. * https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/1652 * https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/1653 * https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/1660 * https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/1661 Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
f1de9a4bc0
commit
a6232c4835
8 changed files with 579 additions and 32 deletions
|
@ -0,0 +1,309 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { loggingSystemMock, savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
|
||||
import { SerializedConcreteTaskInstance, TaskStatus } from '../task';
|
||||
import { NUM_RETRIES, retryOnBulkUpdateConflict } from './retry_on_bulk_update_conflict';
|
||||
|
||||
const mockSavedObjectsRepository = savedObjectsRepositoryMock.create();
|
||||
const mockLogger = loggingSystemMock.create().get();
|
||||
const mockedDate = new Date('2019-02-12T21:01:22.479Z');
|
||||
|
||||
const task1 = {
|
||||
type: 'task',
|
||||
id: 'task:123456',
|
||||
attributes: {
|
||||
runAt: mockedDate.toISOString(),
|
||||
scheduledAt: mockedDate.toISOString(),
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
params: `{ "hello": "world" }`,
|
||||
state: `{ "id": "123456" }`,
|
||||
taskType: 'alert',
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
},
|
||||
};
|
||||
|
||||
const task2 = {
|
||||
type: 'task',
|
||||
id: 'task:324242',
|
||||
attributes: {
|
||||
runAt: mockedDate.toISOString(),
|
||||
scheduledAt: mockedDate.toISOString(),
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
params: `{ "hello": "world" }`,
|
||||
state: `{ "foo": "bar" }`,
|
||||
taskType: 'report',
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
},
|
||||
};
|
||||
|
||||
const task3 = {
|
||||
type: 'task',
|
||||
id: 'task:xyaaa',
|
||||
attributes: {
|
||||
runAt: mockedDate.toISOString(),
|
||||
scheduledAt: mockedDate.toISOString(),
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
params: `{ "goodbye": "world" }`,
|
||||
state: `{ "foo": "bar" }`,
|
||||
taskType: 'action',
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
},
|
||||
};
|
||||
|
||||
describe('retryOnBulkUpdateConflict', () => {
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
test('should not retry when all updates are successful', async () => {
|
||||
const savedObjectResponse = [
|
||||
{
|
||||
id: task1.id,
|
||||
type: task1.type,
|
||||
attributes: task1.attributes,
|
||||
references: [],
|
||||
},
|
||||
];
|
||||
mockSavedObjectsRepository.bulkUpdate.mockResolvedValueOnce({
|
||||
saved_objects: savedObjectResponse,
|
||||
});
|
||||
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
|
||||
logger: mockLogger,
|
||||
savedObjectsRepository: mockSavedObjectsRepository,
|
||||
objects: [task1],
|
||||
});
|
||||
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(1);
|
||||
expect(savedObjects).toEqual(savedObjectResponse);
|
||||
});
|
||||
|
||||
test('should throw error when saved objects bulkUpdate throws an error', async () => {
|
||||
mockSavedObjectsRepository.bulkUpdate.mockImplementationOnce(() => {
|
||||
throw new Error('fail');
|
||||
});
|
||||
await expect(() =>
|
||||
retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
|
||||
logger: mockLogger,
|
||||
savedObjectsRepository: mockSavedObjectsRepository,
|
||||
objects: [task1],
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"fail"`);
|
||||
});
|
||||
|
||||
test('should not retry and return non-conflict errors', async () => {
|
||||
const savedObjectResponse = [
|
||||
{
|
||||
id: task1.id,
|
||||
type: task1.type,
|
||||
attributes: task1.attributes,
|
||||
references: [],
|
||||
},
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
error: {
|
||||
error: `Not a conflict`,
|
||||
message: `Some error that's not a conflict`,
|
||||
statusCode: 404,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
];
|
||||
mockSavedObjectsRepository.bulkUpdate.mockResolvedValueOnce({
|
||||
saved_objects: savedObjectResponse,
|
||||
});
|
||||
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
|
||||
logger: mockLogger,
|
||||
savedObjectsRepository: mockSavedObjectsRepository,
|
||||
objects: [task1, task2],
|
||||
});
|
||||
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(1);
|
||||
expect(savedObjects).toEqual(savedObjectResponse);
|
||||
});
|
||||
|
||||
test(`should return conflict errors when number of retries exceeds ${NUM_RETRIES}`, async () => {
|
||||
const savedObjectResponse = [
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
error: {
|
||||
error: `Conflict`,
|
||||
message: `There was a conflict`,
|
||||
statusCode: 409,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
];
|
||||
mockSavedObjectsRepository.bulkUpdate.mockResolvedValue({
|
||||
saved_objects: savedObjectResponse,
|
||||
});
|
||||
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
|
||||
logger: mockLogger,
|
||||
savedObjectsRepository: mockSavedObjectsRepository,
|
||||
objects: [task2],
|
||||
});
|
||||
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(NUM_RETRIES + 1);
|
||||
expect(savedObjects).toEqual(savedObjectResponse);
|
||||
|
||||
expect(mockLogger.warn).toBeCalledWith('Bulk update saved object conflicts, exceeded retries');
|
||||
});
|
||||
|
||||
test('should retry as expected when there are conflicts', async () => {
|
||||
mockSavedObjectsRepository.bulkUpdate
|
||||
.mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: task1.id,
|
||||
type: task1.type,
|
||||
attributes: task1.attributes,
|
||||
references: [],
|
||||
},
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
error: {
|
||||
error: `Conflict`,
|
||||
message: `This is a conflict`,
|
||||
statusCode: 409,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
{
|
||||
id: task3.id,
|
||||
type: task3.type,
|
||||
attributes: task3.attributes,
|
||||
error: {
|
||||
error: `Conflict`,
|
||||
message: `This is a conflict`,
|
||||
statusCode: 409,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
],
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
error: {
|
||||
error: `Conflict`,
|
||||
message: `This is a conflict`,
|
||||
statusCode: 409,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
{
|
||||
id: task3.id,
|
||||
type: task3.type,
|
||||
attributes: task3.attributes,
|
||||
references: [],
|
||||
},
|
||||
],
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
error: {
|
||||
error: `Conflict`,
|
||||
message: `This is a conflict`,
|
||||
statusCode: 409,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
],
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
error: {
|
||||
error: `Conflict`,
|
||||
message: `This is a conflict`,
|
||||
statusCode: 409,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
],
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
references: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
const { savedObjects } = await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
|
||||
logger: mockLogger,
|
||||
savedObjectsRepository: mockSavedObjectsRepository,
|
||||
objects: [task1, task2, task3],
|
||||
retries: 5,
|
||||
});
|
||||
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenCalledTimes(5);
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
[task1, task2, task3],
|
||||
undefined
|
||||
);
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
[task2, task3],
|
||||
undefined
|
||||
);
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(3, [task2], undefined);
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(4, [task2], undefined);
|
||||
expect(mockSavedObjectsRepository.bulkUpdate).toHaveBeenNthCalledWith(5, [task2], undefined);
|
||||
expect(savedObjects).toEqual([
|
||||
{
|
||||
id: task1.id,
|
||||
type: task1.type,
|
||||
attributes: task1.attributes,
|
||||
references: [],
|
||||
},
|
||||
{
|
||||
id: task3.id,
|
||||
type: task3.type,
|
||||
attributes: task3.attributes,
|
||||
references: [],
|
||||
},
|
||||
{
|
||||
id: task2.id,
|
||||
type: task2.type,
|
||||
attributes: task2.attributes,
|
||||
references: [],
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
ISavedObjectsRepository,
|
||||
Logger,
|
||||
SavedObjectsBulkUpdateObject,
|
||||
SavedObjectsBulkUpdateOptions,
|
||||
SavedObjectsUpdateResponse,
|
||||
} from '@kbn/core/server';
|
||||
|
||||
export const NUM_RETRIES = 2;
|
||||
|
||||
interface RetryOnBulkUpdateConflictOpts<T> {
|
||||
logger: Logger;
|
||||
savedObjectsRepository: ISavedObjectsRepository;
|
||||
objects: Array<SavedObjectsBulkUpdateObject<T>>;
|
||||
options?: SavedObjectsBulkUpdateOptions;
|
||||
retries?: number;
|
||||
}
|
||||
|
||||
interface RetryOnBulkUpdateConflictResults<T> {
|
||||
savedObjects: Array<SavedObjectsUpdateResponse<T>>;
|
||||
}
|
||||
|
||||
export const retryOnBulkUpdateConflict = async <T>({
|
||||
logger,
|
||||
savedObjectsRepository,
|
||||
objects,
|
||||
options,
|
||||
retries = NUM_RETRIES,
|
||||
}: RetryOnBulkUpdateConflictOpts<T>): Promise<RetryOnBulkUpdateConflictResults<T>> => {
|
||||
return retryOnBulkUpdateConflictHelper({
|
||||
logger,
|
||||
savedObjectsRepository,
|
||||
objects,
|
||||
options,
|
||||
retries,
|
||||
});
|
||||
};
|
||||
|
||||
const retryOnBulkUpdateConflictHelper = async <T>({
|
||||
logger,
|
||||
savedObjectsRepository,
|
||||
objects,
|
||||
options,
|
||||
retries = NUM_RETRIES,
|
||||
accResults = [],
|
||||
}: RetryOnBulkUpdateConflictOpts<T> & {
|
||||
accResults?: Array<SavedObjectsUpdateResponse<T>>;
|
||||
}): Promise<RetryOnBulkUpdateConflictResults<T>> => {
|
||||
try {
|
||||
const { saved_objects: savedObjectsResults } = await savedObjectsRepository.bulkUpdate(
|
||||
objects,
|
||||
options
|
||||
);
|
||||
|
||||
const currResults: Array<SavedObjectsUpdateResponse<T>> = [];
|
||||
const currConflicts: Array<SavedObjectsUpdateResponse<T>> = [];
|
||||
const objectsToRetry: Array<SavedObjectsBulkUpdateObject<T>> = [];
|
||||
savedObjectsResults.forEach(
|
||||
(savedObjectResult: SavedObjectsUpdateResponse<T>, index: number) => {
|
||||
if (savedObjectResult.error && savedObjectResult.error.statusCode === 409) {
|
||||
// The SavedObjectsRepository maintains the order of the docs
|
||||
// so we can rely on the index in the `docs` to match an error
|
||||
// on the same index in the `bulkUpdate` result
|
||||
objectsToRetry.push(objects[index]);
|
||||
currConflicts.push(savedObjectResult);
|
||||
} else {
|
||||
// Save results, whether they are successful or non-conflict errors
|
||||
currResults.push(savedObjectResult);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
const results =
|
||||
retries <= 0
|
||||
? [...accResults, ...currResults, ...currConflicts]
|
||||
: [...accResults, ...currResults];
|
||||
|
||||
if (objectsToRetry.length === 0) {
|
||||
return {
|
||||
savedObjects: results,
|
||||
};
|
||||
}
|
||||
|
||||
if (retries <= 0) {
|
||||
logger.warn(`Bulk update saved object conflicts, exceeded retries`);
|
||||
|
||||
return {
|
||||
savedObjects: results,
|
||||
};
|
||||
}
|
||||
|
||||
await waitBeforeNextRetry(retries);
|
||||
|
||||
return retryOnBulkUpdateConflictHelper({
|
||||
logger,
|
||||
savedObjectsRepository,
|
||||
objects: objectsToRetry,
|
||||
options,
|
||||
retries: retries - 1,
|
||||
accResults: results,
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
|
||||
export const randomDelayMs = Math.floor(Math.random() * 100);
|
||||
export const getExponentialDelayMultiplier = (retries: number) => 1 + (NUM_RETRIES - retries) ** 2;
|
||||
export const RETRY_IF_CONFLICTS_DELAY = 250;
|
||||
export const waitBeforeNextRetry = async (retries: number): Promise<void> => {
|
||||
const exponentialDelayMultiplier = getExponentialDelayMultiplier(retries);
|
||||
|
||||
await new Promise((resolve) =>
|
||||
setTimeout(resolve, RETRY_IF_CONFLICTS_DELAY * exponentialDelayMultiplier + randomDelayMs)
|
||||
);
|
||||
};
|
|
@ -205,6 +205,7 @@ export class TaskManagerPlugin
|
|||
|
||||
const serializer = savedObjects.createSerializer();
|
||||
const taskStore = new TaskStore({
|
||||
logger: this.logger,
|
||||
serializer,
|
||||
savedObjectsRepository,
|
||||
esClient: elasticsearch.client.asInternalUser,
|
||||
|
|
|
@ -43,6 +43,7 @@ import { EphemeralTaskRejectedDueToCapacityError } from './task_running';
|
|||
|
||||
const VERSION_CONFLICT_STATUS = 409;
|
||||
const BULK_ACTION_SIZE = 100;
|
||||
const BULK_UPDATE_NUM_RETRIES = 3;
|
||||
export interface TaskSchedulingOpts {
|
||||
logger: Logger;
|
||||
taskStore: TaskStore;
|
||||
|
@ -264,7 +265,10 @@ export class TaskScheduling {
|
|||
}
|
||||
|
||||
private async bulkUpdateTasksHelper(updatedTasks: ConcreteTaskInstance[]) {
|
||||
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateTaskResult>(
|
||||
// Performs bulk update with retries
|
||||
return (
|
||||
await this.store.bulkUpdate(updatedTasks, BULK_UPDATE_NUM_RETRIES)
|
||||
).reduce<BulkUpdateTaskResult>(
|
||||
(acc, task) => {
|
||||
if (task.tag === 'ok') {
|
||||
acc.tasks.push(task.value);
|
||||
|
|
|
@ -15,7 +15,11 @@ import {
|
|||
TaskLifecycleResult,
|
||||
SerializedConcreteTaskInstance,
|
||||
} from './task';
|
||||
import { elasticsearchServiceMock, savedObjectsServiceMock } from '@kbn/core/server/mocks';
|
||||
import {
|
||||
elasticsearchServiceMock,
|
||||
savedObjectsServiceMock,
|
||||
loggingSystemMock,
|
||||
} from '@kbn/core/server/mocks';
|
||||
import { TaskStore, SearchOpts, AggregationOpts } from './task_store';
|
||||
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
|
||||
import { SavedObjectAttributes, SavedObjectsErrorHelpers } from '@kbn/core/server';
|
||||
|
@ -25,6 +29,7 @@ import { AdHocTaskCounter } from './lib/adhoc_task_counter';
|
|||
|
||||
const savedObjectsClient = savedObjectsRepositoryMock.create();
|
||||
const serializer = savedObjectsServiceMock.createSerializer();
|
||||
const logger = loggingSystemMock.create().get();
|
||||
const adHocTaskCounter = new AdHocTaskCounter();
|
||||
|
||||
const randomId = () => `id-${_.random(1, 20)}`;
|
||||
|
@ -67,6 +72,7 @@ describe('TaskStore', () => {
|
|||
store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
logger,
|
||||
serializer,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
|
@ -233,6 +239,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -302,6 +309,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -400,6 +408,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -503,6 +512,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -510,6 +520,98 @@ describe('TaskStore', () => {
|
|||
});
|
||||
});
|
||||
|
||||
test('correctly returns errors from saved object bulk update', async () => {
|
||||
const task = {
|
||||
runAt: mockedDate,
|
||||
scheduledAt: mockedDate,
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
id: 'task:324242',
|
||||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
version: '123',
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
};
|
||||
|
||||
savedObjectsClient.bulkUpdate.mockResolvedValueOnce({
|
||||
saved_objects: [
|
||||
{
|
||||
id: task.id,
|
||||
type: task.taskType,
|
||||
attributes: {
|
||||
runAt: mockedDate.toISOString(),
|
||||
scheduledAt: mockedDate.toISOString(),
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
params: `{ "hello": "world" }`,
|
||||
state: `{ "foo": "bar" }`,
|
||||
taskType: 'report',
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
},
|
||||
error: {
|
||||
error: `Not a conflict`,
|
||||
message: `Some error that's not a conflict`,
|
||||
statusCode: 404,
|
||||
},
|
||||
references: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
const result = await store.bulkUpdate([task]);
|
||||
expect(result).toEqual([
|
||||
{
|
||||
error: {
|
||||
entity: {
|
||||
attempts: 3,
|
||||
id: 'task:324242',
|
||||
ownerId: null,
|
||||
params: { hello: 'world' },
|
||||
retryAt: null,
|
||||
runAt: mockedDate,
|
||||
scheduledAt: mockedDate,
|
||||
startedAt: null,
|
||||
state: { foo: 'bar' },
|
||||
status: 'idle',
|
||||
taskType: 'report',
|
||||
traceparent: '',
|
||||
version: '123',
|
||||
},
|
||||
error: {
|
||||
attributes: {
|
||||
attempts: 3,
|
||||
ownerId: null,
|
||||
params: '{ "hello": "world" }',
|
||||
retryAt: null,
|
||||
runAt: mockedDate.toISOString(),
|
||||
scheduledAt: mockedDate.toISOString(),
|
||||
startedAt: null,
|
||||
state: '{ "foo": "bar" }',
|
||||
status: 'idle',
|
||||
taskType: 'report',
|
||||
traceparent: '',
|
||||
},
|
||||
error: {
|
||||
error: 'Not a conflict',
|
||||
message: "Some error that's not a conflict",
|
||||
statusCode: 404,
|
||||
},
|
||||
id: 'task:324242',
|
||||
references: [],
|
||||
type: 'report',
|
||||
},
|
||||
},
|
||||
tag: 'err',
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
test('pushes error from saved objects client to errors$', async () => {
|
||||
const task = {
|
||||
runAt: mockedDate,
|
||||
|
@ -544,6 +646,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -578,6 +681,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -612,6 +716,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -697,6 +802,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -717,6 +823,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -735,6 +842,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
@ -753,6 +861,7 @@ describe('TaskStore', () => {
|
|||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
logger,
|
||||
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
|
|
|
@ -15,6 +15,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
|||
import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server';
|
||||
|
||||
import {
|
||||
Logger,
|
||||
SavedObject,
|
||||
ISavedObjectsSerializer,
|
||||
SavedObjectsRawDoc,
|
||||
|
@ -35,8 +36,10 @@ import {
|
|||
|
||||
import { TaskTypeDictionary } from './task_type_dictionary';
|
||||
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
|
||||
import { retryOnBulkUpdateConflict } from './lib/retry_on_bulk_update_conflict';
|
||||
|
||||
export interface StoreOpts {
|
||||
logger: Logger;
|
||||
esClient: ElasticsearchClient;
|
||||
index: string;
|
||||
taskManagerId: string;
|
||||
|
@ -94,6 +97,7 @@ export class TaskStore {
|
|||
public readonly errors$ = new Subject<Error>();
|
||||
|
||||
private esClient: ElasticsearchClient;
|
||||
private logger: Logger;
|
||||
private definitions: TaskTypeDictionary;
|
||||
private savedObjectsRepository: ISavedObjectsRepository;
|
||||
private serializer: ISavedObjectsSerializer;
|
||||
|
@ -111,6 +115,7 @@ export class TaskStore {
|
|||
constructor(opts: StoreOpts) {
|
||||
this.esClient = opts.esClient;
|
||||
this.index = opts.index;
|
||||
this.logger = opts.logger;
|
||||
this.taskManagerId = opts.taskManagerId;
|
||||
this.definitions = opts.definitions;
|
||||
this.serializer = opts.serializer;
|
||||
|
@ -246,34 +251,45 @@ export class TaskStore {
|
|||
* @param {Array<TaskDoc>} docs
|
||||
* @returns {Promise<Array<TaskDoc>>}
|
||||
*/
|
||||
public async bulkUpdate(docs: ConcreteTaskInstance[]): Promise<BulkUpdateResult[]> {
|
||||
public async bulkUpdate(
|
||||
docs: ConcreteTaskInstance[],
|
||||
retries: number = 0
|
||||
): Promise<BulkUpdateResult[]> {
|
||||
const attributesByDocId = docs.reduce((attrsById, doc) => {
|
||||
attrsById.set(doc.id, taskInstanceToAttributes(doc));
|
||||
return attrsById;
|
||||
}, new Map());
|
||||
|
||||
let updatedSavedObjects: Array<SavedObjectsUpdateResponse | Error>;
|
||||
let updatedSavedObjects: Array<SavedObjectsUpdateResponse<SerializedConcreteTaskInstance>>;
|
||||
try {
|
||||
({ saved_objects: updatedSavedObjects } =
|
||||
await this.savedObjectsRepository.bulkUpdate<SerializedConcreteTaskInstance>(
|
||||
docs.map((doc) => ({
|
||||
({ savedObjects: updatedSavedObjects } =
|
||||
await retryOnBulkUpdateConflict<SerializedConcreteTaskInstance>({
|
||||
logger: this.logger,
|
||||
savedObjectsRepository: this.savedObjectsRepository,
|
||||
objects: docs.map((doc) => ({
|
||||
type: 'task',
|
||||
id: doc.id,
|
||||
options: { version: doc.version },
|
||||
attributes: attributesByDocId.get(doc.id)!,
|
||||
})),
|
||||
{
|
||||
options: {
|
||||
refresh: false,
|
||||
}
|
||||
));
|
||||
},
|
||||
retries,
|
||||
}));
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return updatedSavedObjects.map<BulkUpdateResult>((updatedSavedObject, index) =>
|
||||
isSavedObjectsUpdateResponse(updatedSavedObject)
|
||||
? asOk(
|
||||
return updatedSavedObjects.map((updatedSavedObject) => {
|
||||
const doc = docs.find((d) => d.id === updatedSavedObject.id);
|
||||
return updatedSavedObject.error !== undefined
|
||||
? asErr({
|
||||
entity: doc,
|
||||
error: updatedSavedObject,
|
||||
})
|
||||
: asOk(
|
||||
savedObjectToConcreteTaskInstance({
|
||||
...updatedSavedObject,
|
||||
attributes: defaults(
|
||||
|
@ -281,15 +297,8 @@ export class TaskStore {
|
|||
attributesByDocId.get(updatedSavedObject.id)!
|
||||
),
|
||||
})
|
||||
)
|
||||
: asErr({
|
||||
// The SavedObjectsRepository maintains the order of the docs
|
||||
// so we can rely on the index in the `docs` to match an error
|
||||
// on the same index in the `bulkUpdate` result
|
||||
entity: docs[index],
|
||||
error: updatedSavedObject,
|
||||
})
|
||||
);
|
||||
);
|
||||
}) as BulkUpdateResult[];
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -535,9 +544,3 @@ function ensureAggregationOnlyReturnsTaskObjects(opts: AggregationOpts): Aggrega
|
|||
query,
|
||||
};
|
||||
}
|
||||
|
||||
function isSavedObjectsUpdateResponse(
|
||||
result: SavedObjectsUpdateResponse | Error
|
||||
): result is SavedObjectsUpdateResponse {
|
||||
return result && typeof (result as SavedObjectsUpdateResponse).id === 'string';
|
||||
}
|
||||
|
|
|
@ -26,8 +26,7 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
|
|||
const supertest = getService('supertest');
|
||||
const supertestWithoutAuth = getService('supertestWithoutAuth');
|
||||
|
||||
// Failing: See https://github.com/elastic/kibana/issues/141849
|
||||
describe.skip('disable', () => {
|
||||
describe('disable', () => {
|
||||
const objectRemover = new ObjectRemover(supertest);
|
||||
|
||||
after(() => objectRemover.removeAll());
|
||||
|
|
|
@ -26,8 +26,7 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
|
|||
const retry = getService('retry');
|
||||
const supertest = getService('supertest');
|
||||
|
||||
// Failing: See https://github.com/elastic/kibana/issues/141864
|
||||
describe.skip('disable', () => {
|
||||
describe('disable', () => {
|
||||
const objectRemover = new ObjectRemover(supertestWithoutAuth);
|
||||
const ruleUtils = new RuleUtils({ space: Spaces.space1, supertestWithoutAuth });
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue