[Response Ops][Task Manager] Add bulk update function that directly updates using the esClient (#191760)

Resolves https://github.com/elastic/kibana/issues/187704

## Summary

Creates a new `bulkPartialUpdate` function in the `task_store` class
that uses the ES client to perform bulk partial updates instead of the
Saved Objects client. Updates the update in the `mget` task claimer to
use this new function.

## To verify

Run this branch with the `xpack.task_manager.claim_strategy: 'mget'` and
ensure that all tasks are running as expected.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2024-09-10 18:22:18 -04:00 committed by GitHub
parent 20566d0262
commit a141818c4f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 1112 additions and 645 deletions

View file

@ -456,6 +456,10 @@ export interface ConcreteTaskInstance extends TaskInstance {
partition?: number;
}
export type PartialConcreteTaskInstance = Partial<ConcreteTaskInstance> & {
id: ConcreteTaskInstance['id'];
};
export interface ConcreteTaskInstanceVersion {
/** The _id of the the document (not the SO id) */
esId: string;
@ -490,3 +494,7 @@ export type SerializedConcreteTaskInstance = Omit<
runAt: string;
partition?: number;
};
export type PartialSerializedConcreteTaskInstance = Partial<SerializedConcreteTaskInstance> & {
id: SerializedConcreteTaskInstance['id'];
};

View file

@ -16,7 +16,6 @@
import apm, { Logger } from 'elastic-apm-node';
import { Subject, Observable } from 'rxjs';
import { omit } from 'lodash';
import { TaskTypeDictionary } from '../task_type_dictionary';
import {
TaskClaimerOpts,
@ -24,7 +23,13 @@ import {
getEmptyClaimOwnershipResult,
getExcludedTaskTypes,
} from '.';
import { ConcreteTaskInstance, TaskStatus, ConcreteTaskInstanceVersion, TaskCost } from '../task';
import {
ConcreteTaskInstance,
TaskStatus,
ConcreteTaskInstanceVersion,
TaskCost,
PartialConcreteTaskInstance,
} from '../task';
import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running';
import { TASK_MANAGER_MARK_AS_CLAIMED } from '../queries/task_claiming';
import { TaskClaim, asTaskClaimEvent, startTaskTimer } from '../task_events';
@ -188,12 +193,11 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
// build the updated task objects we'll claim
const now = new Date();
const taskUpdates: ConcreteTaskInstance[] = [];
const taskUpdates: PartialConcreteTaskInstance[] = [];
for (const task of tasksToRun) {
taskUpdates.push({
// omits "enabled" field from task updates so we don't overwrite
// any user initiated changes to "enabled" while the task was running
...omit(task, 'enabled'),
id: task.id,
version: task.version,
scheduledAt:
task.retryAt != null && new Date(task.retryAt).getTime() < Date.now()
? task.retryAt
@ -207,65 +211,74 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}
// perform the task object updates, deal with errors
const updatedTasks: ConcreteTaskInstance[] = [];
const updatedTaskIds: string[] = [];
let conflicts = staleTasks.length;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;
const updateResults = await taskStore.bulkUpdate(taskUpdates, {
validate: false,
excludeLargeFields: true,
});
const updateResults = await taskStore.bulkPartialUpdate(taskUpdates);
for (const updateResult of updateResults) {
if (isOk(updateResult)) {
updatedTasks.push(updateResult.value);
updatedTaskIds.push(updateResult.value.id);
} else {
const { id, type, error } = updateResult.error;
if (error.statusCode === 409) {
const { id, type, error, status } = updateResult.error;
// check for 409 conflict errors
if (status === 409) {
conflicts++;
} else {
logger.error(`Error updating task ${id}:${type} during claim: ${error.message}`, logMeta);
logger.error(
`Error updating task ${id}:${type} during claim: ${JSON.stringify(error)}`,
logMeta
);
bulkUpdateErrors++;
}
}
}
// perform an mget to get the full task instance for claiming
const fullTasksToRun = (await taskStore.bulkGet(updatedTasks.map((task) => task.id))).reduce<
ConcreteTaskInstance[]
>((acc, task) => {
if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`, logMeta);
bulkGetErrors++;
}
return acc;
}, []);
const fullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce<ConcreteTaskInstance[]>(
(acc, task) => {
if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(
`Error getting full task ${id}:${type} during claim: ${error.message}`,
logMeta
);
bulkGetErrors++;
}
return acc;
},
[]
);
// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler
let removedCount = 0;
if (removedTasks.length > 0) {
const tasksToRemove = Array.from(removedTasks);
const tasksToRemoveUpdates: PartialConcreteTaskInstance[] = [];
for (const task of tasksToRemove) {
task.status = TaskStatus.Unrecognized;
tasksToRemoveUpdates.push({
id: task.id,
status: TaskStatus.Unrecognized,
});
}
// don't worry too much about errors, we'll get them next time
try {
const removeResults = await taskStore.bulkUpdate(tasksToRemove, {
validate: false,
excludeLargeFields: true,
});
const removeResults = await taskStore.bulkPartialUpdate(tasksToRemoveUpdates);
for (const removeResult of removeResults) {
if (isOk(removeResult)) {
removedCount++;
} else {
const { id, type, error } = removeResult.error;
logger.warn(
`Error updating task ${id}:${type} to mark as unrecognized during claim: ${error.message}`,
`Error updating task ${id}:${type} to mark as unrecognized during claim: ${JSON.stringify(
error
)}`,
logMeta
);
}

View file

@ -24,6 +24,7 @@ export const taskStoreMock = {
schedule: jest.fn(),
bulkSchedule: jest.fn(),
bulkUpdate: jest.fn(),
bulkPartialUpdate: jest.fn(),
bulkRemove: jest.fn(),
get: jest.fn(),
getLifecycle: jest.fn(),

View file

@ -8,7 +8,7 @@
import { schema } from '@kbn/config-schema';
import { Client } from '@elastic/elasticsearch';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import _, { omit } from 'lodash';
import _ from 'lodash';
import { first } from 'rxjs';
import {
@ -17,14 +17,18 @@ import {
TaskLifecycleResult,
SerializedConcreteTaskInstance,
} from './task';
import { elasticsearchServiceMock, savedObjectsServiceMock } from '@kbn/core/server/mocks';
import {
ElasticsearchClientMock,
elasticsearchServiceMock,
savedObjectsServiceMock,
} from '@kbn/core/server/mocks';
import { TaskStore, SearchOpts, AggregationOpts, taskInstanceToAttributes } from './task_store';
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { SavedObjectAttributes, SavedObjectsErrorHelpers } from '@kbn/core/server';
import { TaskTypeDictionary } from './task_type_dictionary';
import { mockLogger } from './test_utils';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { asErr } from './lib/result_type';
import { asErr, asOk } from './lib/result_type';
import { UpdateByQueryResponse } from '@elastic/elasticsearch/lib/api/types';
const mockGetValidatedTaskInstanceFromReading = jest.fn();
@ -347,7 +351,7 @@ describe('TaskStore', () => {
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
test('excludes state and params from source when excludeState is true', async () => {
test('excludes state and params from source when limitResponse is true', async () => {
const { args } = await testFetch({}, [], true);
expect(args).toMatchObject({
index: 'tasky',
@ -889,61 +893,6 @@ describe('TaskStore', () => {
);
});
test(`logs warning and doesn't validate whenever excludeLargeFields option is passed-in`, async () => {
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
ownerId: null,
traceparent: '',
};
savedObjectsClient.bulkUpdate.mockResolvedValue({
saved_objects: [
{
id: '324242',
type: 'task',
attributes: {
...task,
state: '{"foo":"bar"}',
params: '{"hello":"world"}',
},
references: [],
version: '123',
},
],
});
await store.bulkUpdate([task], { validate: true, excludeLargeFields: true });
expect(logger.warn).toHaveBeenCalledWith(
`Skipping validation for bulk update because excludeLargeFields=true.`
);
expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, {
validate: false,
});
expect(savedObjectsClient.bulkUpdate).toHaveBeenCalledWith(
[
{
id: task.id,
type: 'task',
version: task.version,
attributes: omit(taskInstanceToAttributes(task, task.id), ['state', 'params']),
},
],
{ refresh: false }
);
});
test('pushes error from saved objects client to errors$', async () => {
const task = {
runAt: mockedDate,
@ -970,6 +919,373 @@ describe('TaskStore', () => {
});
});
describe('bulkPartialUpdate', () => {
let store: TaskStore;
let esClient: ElasticsearchClientMock;
const logger = mockLogger();
beforeAll(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
store = new TaskStore({
logger,
index: 'tasky',
taskManagerId: '',
serializer,
esClient,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
requestTimeouts: {
update_by_query: 1000,
},
});
});
test(`should return immediately if no docs to update`, async () => {
await store.bulkPartialUpdate([]);
expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled();
expect(esClient.bulk).not.toHaveBeenCalled();
});
test(`should perform partial update using esClient`, async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
ownerId: 'testtest',
traceparent: '',
};
esClient.bulk.mockResolvedValue({
errors: false,
took: 0,
items: [
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_id: 'task:324242',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
],
});
const result = await store.bulkPartialUpdate([task]);
expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled();
expect(esClient.bulk).toHaveBeenCalledWith({
body: [
{ update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } },
{
doc: {
task: {
attempts: 3,
ownerId: 'testtest',
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
status: 'idle',
taskType: 'report',
traceparent: '',
},
},
},
],
index: 'tasky',
refresh: false,
});
expect(result).toEqual([asOk(task)]);
});
test(`should perform partial update with minimal fields`, async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};
esClient.bulk.mockResolvedValue({
errors: false,
took: 0,
items: [
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_id: 'task:324242',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
],
});
const result = await store.bulkPartialUpdate([task]);
expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled();
expect(esClient.bulk).toHaveBeenCalledWith({
body: [
{ update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } },
{ doc: { task: { attempts: 3 } } },
],
index: 'tasky',
refresh: false,
});
expect(result).toEqual([asOk(task)]);
});
test(`should perform partial update with no version`, async () => {
const task = {
id: '324242',
attempts: 3,
};
esClient.bulk.mockResolvedValue({
errors: false,
took: 0,
items: [
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_id: 'task:324242',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
],
});
const result = await store.bulkPartialUpdate([task]);
expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled();
expect(esClient.bulk).toHaveBeenCalledWith({
body: [{ update: { _id: 'task:324242' } }, { doc: { task: { attempts: 3 } } }],
index: 'tasky',
refresh: false,
});
expect(result).toEqual([asOk(task)]);
});
test(`should gracefully handle errors within the response`, async () => {
const task1 = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};
const task2 = {
id: '45343254',
version: 'WzQsMV0=',
status: 'running' as TaskStatus,
};
const task3 = {
id: '7845',
version: 'WzQsMV0=',
runAt: mockedDate,
};
esClient.bulk.mockResolvedValue({
errors: false,
took: 0,
items: [
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_id: 'task:324242',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_id: 'task:45343254',
_version: 2,
error: {
type: 'document_missing_exception',
reason: '[5]: document missing',
index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA',
shard: '0',
index: '.kibana_task_manager_8.16.0_001',
},
status: 404,
},
},
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_id: 'task:7845',
_version: 2,
status: 409,
error: { type: 'anything', reason: 'some-reason', index: 'some-index' },
},
},
],
});
const result = await store.bulkPartialUpdate([task1, task2, task3]);
expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled();
expect(esClient.bulk).toHaveBeenCalledWith({
body: [
{ update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } },
{ doc: { task: { attempts: 3 } } },
{ update: { _id: 'task:45343254', if_primary_term: 1, if_seq_no: 4 } },
{ doc: { task: { status: 'running' } } },
{ update: { _id: 'task:7845', if_primary_term: 1, if_seq_no: 4 } },
{ doc: { task: { runAt: mockedDate.toISOString() } } },
],
index: 'tasky',
refresh: false,
});
expect(result).toEqual([
asOk(task1),
asErr({
type: 'task',
id: '45343254',
status: 404,
error: {
type: 'document_missing_exception',
reason: '[5]: document missing',
index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA',
shard: '0',
index: '.kibana_task_manager_8.16.0_001',
},
}),
asErr({
type: 'task',
id: '7845',
status: 409,
error: { type: 'anything', reason: 'some-reason', index: 'some-index' },
}),
]);
});
test(`should gracefully handle malformed errors within the response`, async () => {
const task1 = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};
const task2 = {
id: '45343254',
version: 'WzQsMV0=',
status: 'running' as TaskStatus,
};
esClient.bulk.mockResolvedValue({
errors: false,
took: 0,
items: [
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_id: 'task:324242',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
{
update: {
_index: '.kibana_task_manager_8.16.0_001',
_version: 2,
error: {
type: 'document_missing_exception',
reason: '[5]: document missing',
index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA',
shard: '0',
index: '.kibana_task_manager_8.16.0_001',
},
status: 404,
},
},
],
});
const result = await store.bulkPartialUpdate([task1, task2]);
expect(mockGetValidatedTaskInstanceForUpdating).not.toHaveBeenCalled();
expect(esClient.bulk).toHaveBeenCalledWith({
body: [
{ update: { _id: 'task:324242', if_primary_term: 1, if_seq_no: 4 } },
{ doc: { task: { attempts: 3 } } },
{ update: { _id: 'task:45343254', if_primary_term: 1, if_seq_no: 4 } },
{ doc: { task: { status: 'running' } } },
],
index: 'tasky',
refresh: false,
});
expect(result).toEqual([
asOk(task1),
asErr({
type: 'task',
id: 'unknown',
error: { type: 'malformed response' },
}),
]);
});
test('pushes error from saved objects client to errors$', async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
esClient.bulk.mockRejectedValue(new Error('Failure'));
await expect(store.bulkPartialUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure"`
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});
describe('remove', () => {
let store: TaskStore;

View file

@ -26,6 +26,7 @@ import {
ElasticsearchClient,
} from '@kbn/core/server';
import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal';
import { RequestTimeoutsConfig } from './config';
import { asOk, asErr, Result } from './lib/result_type';
@ -36,6 +37,8 @@ import {
TaskLifecycle,
TaskLifecycleResult,
SerializedConcreteTaskInstance,
PartialConcreteTaskInstance,
PartialSerializedConcreteTaskInstance,
} from './task';
import { TaskTypeDictionary } from './task_type_dictionary';
@ -87,7 +90,6 @@ export interface FetchResult {
export interface BulkUpdateOpts {
validate: boolean;
excludeLargeFields?: boolean;
}
export type BulkUpdateResult = Result<
@ -95,6 +97,11 @@ export type BulkUpdateResult = Result<
{ type: string; id: string; error: SavedObjectError }
>;
export type PartialBulkUpdateResult = Result<
PartialConcreteTaskInstance,
{ type: string; id: string; status?: number; error: estypes.ErrorCause }
>;
export type BulkGetResult = Array<
Result<ConcreteTaskInstance, { type: string; id: string; error: SavedObjectError }>
>;
@ -114,7 +121,6 @@ export class TaskStore {
public readonly taskManagerId: string;
public readonly errors$ = new Subject<Error>();
public readonly taskValidator: TaskValidator;
private readonly logger: Logger;
private esClient: ElasticsearchClient;
private esClientWithoutRetries: ElasticsearchClient;
@ -141,7 +147,6 @@ export class TaskStore {
this.serializer = opts.serializer;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.adHocTaskCounter = opts.adHocTaskCounter;
this.logger = opts.logger;
this.taskValidator = new TaskValidator({
logger: opts.logger,
definitions: opts.definitions,
@ -302,23 +307,13 @@ export class TaskStore {
*/
public async bulkUpdate(
docs: ConcreteTaskInstance[],
{ validate, excludeLargeFields = false }: BulkUpdateOpts
{ validate }: BulkUpdateOpts
): Promise<BulkUpdateResult[]> {
// if we're excluding large fields (state and params), we cannot apply validation so log a warning
if (validate && excludeLargeFields) {
validate = false;
this.logger.warn(`Skipping validation for bulk update because excludeLargeFields=true.`);
}
const attributesByDocId = docs.reduce((attrsById, doc) => {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate,
});
const taskAttributes = taskInstanceToAttributes(taskInstance, doc.id);
attrsById.set(
doc.id,
excludeLargeFields ? omit(taskAttributes, 'state', 'params') : taskAttributes
);
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance, doc.id));
return attrsById;
}, new Map());
@ -364,6 +359,67 @@ export class TaskStore {
});
}
public async bulkPartialUpdate(
docs: PartialConcreteTaskInstance[]
): Promise<PartialBulkUpdateResult[]> {
if (docs.length === 0) {
return [];
}
const bulkBody = [];
for (const doc of docs) {
bulkBody.push({
update: {
_id: `task:${doc.id}`,
...(doc.version ? decodeRequestVersion(doc.version) : {}),
},
});
bulkBody.push({
doc: {
task: partialTaskInstanceToAttributes(doc),
},
});
}
let result: estypes.BulkResponse;
try {
result = await this.esClient.bulk({ index: this.index, refresh: false, body: bulkBody });
} catch (e) {
this.errors$.next(e);
throw e;
}
return result.items.map((item) => {
if (!item.update || !item.update._id) {
return asErr({
type: 'task',
id: 'unknown',
error: { type: 'malformed response' },
});
}
const docId = item.update._id.startsWith('task:')
? item.update._id.slice(5)
: item.update._id;
if (item.update?.error) {
return asErr({
type: 'task',
id: docId,
status: item.update.status,
error: item.update.error,
});
}
const doc = docs.find((d) => d.id === docId);
return asOk({
...doc,
id: docId,
});
});
}
/**
* Removes the specified task from the index.
*
@ -719,6 +775,20 @@ export function taskInstanceToAttributes(
} as SerializedConcreteTaskInstance;
}
export function partialTaskInstanceToAttributes(
doc: PartialConcreteTaskInstance
): PartialSerializedConcreteTaskInstance {
return {
...omit(doc, 'id', 'version'),
...(doc.params ? { params: JSON.stringify(doc.params) } : {}),
...(doc.state ? { state: JSON.stringify(doc.state) } : {}),
...(doc.scheduledAt ? { scheduledAt: doc.scheduledAt.toISOString() } : {}),
...(doc.startedAt ? { startedAt: doc.startedAt.toISOString() } : {}),
...(doc.retryAt ? { retryAt: doc.retryAt.toISOString() } : {}),
...(doc.runAt ? { runAt: doc.runAt.toISOString() } : {}),
} as PartialSerializedConcreteTaskInstance;
}
export function savedObjectToConcreteTaskInstance(
savedObject: Omit<SavedObject<SerializedConcreteTaskInstance>, 'references'>
): ConcreteTaskInstance {

View file

@ -26,7 +26,8 @@
"@kbn/core-saved-objects-api-server",
"@kbn/logging",
"@kbn/core-lifecycle-server",
"@kbn/cloud-plugin"
"@kbn/cloud-plugin",
"@kbn/core-saved-objects-base-server-internal"
],
"exclude": ["target/**/*"]
}