[8.x] Pass bulkPartialUpdate errors to taskStore.errors (#198606) (#199205)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Pass bulkPartialUpdate errors to taskStore.errors
(#198606)](https://github.com/elastic/kibana/pull/198606)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Ersin
Erdal","email":"92688503+ersin-erdal@users.noreply.github.com"},"sourceCommit":{"committedDate":"2024-11-06T18:09:00Z","message":"Pass
bulkPartialUpdate errors to taskStore.errors (#198606)\n\nResolves:
#198428","sha":"9f4d88e4b5e491e53aa4cc0b2f6394f5a26e8d9c","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:ResponseOps","v9.0.0","backport:prev-minor"],"title":"Pass
bulkPartialUpdate errors to
taskStore.errors","number":198606,"url":"https://github.com/elastic/kibana/pull/198606","mergeCommit":{"message":"Pass
bulkPartialUpdate errors to taskStore.errors (#198606)\n\nResolves:
#198428","sha":"9f4d88e4b5e491e53aa4cc0b2f6394f5a26e8d9c"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/198606","number":198606,"mergeCommit":{"message":"Pass
bulkPartialUpdate errors to taskStore.errors (#198606)\n\nResolves:
#198428","sha":"9f4d88e4b5e491e53aa4cc0b2f6394f5a26e8d9c"}}]}]
BACKPORT-->

Co-authored-by: Ersin Erdal <92688503+ersin-erdal@users.noreply.github.com>
This commit is contained in:
Kibana Machine 2024-11-07 07:08:36 +11:00 committed by GitHub
parent 59749ddc96
commit 0432f750ee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 162 additions and 2 deletions

View file

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class BulkUpdateError extends Error {
private _statusCode: number;
private _type: string;
constructor({
statusCode,
message = 'Bulk update failed with unknown reason',
type,
}: {
statusCode: number;
message?: string;
type: string;
}) {
super(message);
this._statusCode = statusCode;
this._type = type;
}
public get statusCode() {
return this._statusCode;
}
public get type() {
return this._type;
}
}
export function getBulkUpdateStatusCode(error: Error | BulkUpdateError): number | undefined {
if (Boolean(error && error instanceof BulkUpdateError)) {
return (error as BulkUpdateError).statusCode;
}
}
export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string | undefined {
if (Boolean(error && error instanceof BulkUpdateError)) {
return (error as BulkUpdateError).type;
}
}

View file

@ -15,6 +15,7 @@ import {
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
import { MsearchError } from './msearch_error';
import { BulkUpdateError } from './bulk_update_error';
describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
@ -280,6 +281,45 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when a bulkPartialUpdate 429 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 429, message: 'test', type: 'too_many_requests' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when a bulkPartialUpdate 500 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 500, message: 'test', type: 'server_error' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when a bulkPartialUpdate 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 503, message: 'test', type: 'unavailable' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should not change configuration at the next interval when other msearch error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError(404));

View file

@ -13,6 +13,7 @@ import { isEsCannotExecuteScriptError } from './identify_es_error';
import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config';
import { TaskCost } from '../task';
import { getMsearchStatusCode } from './msearch_error';
import { getBulkUpdateStatusCode } from './bulk_update_error';
const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
@ -169,7 +170,10 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
isEsCannotExecuteScriptError(e) ||
getMsearchStatusCode(e) === 429 ||
getMsearchStatusCode(e) === 500 ||
getMsearchStatusCode(e) === 503
getMsearchStatusCode(e) === 503 ||
getBulkUpdateStatusCode(e) === 429 ||
getBulkUpdateStatusCode(e) === 500 ||
getBulkUpdateStatusCode(e) === 503
)
)
).pipe(

View file

@ -1290,6 +1290,62 @@ describe('TaskStore', () => {
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
test('pushes errors returned by the saved objects client to errors$', async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
esClient.bulk.mockResolvedValue({
errors: true,
items: [
{
update: {
_id: '1',
_index: 'test-index',
status: 403,
error: { reason: 'Error reason', type: 'cluster_block_exception' },
},
},
],
took: 10,
});
await store.bulkPartialUpdate([task]);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Error reason]`);
});
test('pushes errors for the malformed responses to errors$', async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
esClient.bulk.mockResolvedValue({
errors: false,
items: [
{
update: {
_index: 'test-index',
status: 200,
},
},
],
took: 10,
});
await store.bulkPartialUpdate([task]);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: malformed response]`);
});
});
describe('remove', () => {

View file

@ -48,6 +48,7 @@ import { claimSort } from './queries/mark_available_tasks_as_claimed';
import { MAX_PARTITIONS } from './lib/task_partitioner';
import { ErrorOutput } from './lib/bulk_operation_buffer';
import { MsearchError } from './lib/msearch_error';
import { BulkUpdateError } from './lib/bulk_update_error';
export interface StoreOpts {
esClient: ElasticsearchClient;
@ -386,11 +387,19 @@ export class TaskStore {
}
return result.items.map((item) => {
const malformedResponseType = 'malformed response';
if (!item.update || !item.update._id) {
const err = new BulkUpdateError({
message: malformedResponseType,
type: malformedResponseType,
statusCode: 500,
});
this.errors$.next(err);
return asErr({
type: 'task',
id: 'unknown',
error: { type: 'malformed response' },
error: { type: malformedResponseType },
});
}
@ -399,6 +408,12 @@ export class TaskStore {
: item.update._id;
if (item.update?.error) {
const err = new BulkUpdateError({
message: item.update.error.reason,
type: item.update.error.type,
statusCode: item.update.status,
});
this.errors$.next(err);
return asErr({
type: 'task',
id: docId,