[9.0] Zdt retry on common failures (#213979) (#214074)

# Backport

This will backport the following commits from `main` to `9.0`:
- [Zdt retry on common failures
(#213979)](https://github.com/elastic/kibana/pull/213979)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Jesus
Wahrman","email":"41008968+jesuswr@users.noreply.github.com"},"sourceCommit":{"committedDate":"2025-03-12T09:10:47Z","message":"Zdt
retry on common failures (#213979)\n\n## Summary\n\nresolves
https://github.com/elastic/kibana/issues/207096\n\nAdded a new handler
to `readWithPit`, `pickupUpdatedMappings` and\n`checkForUnknownDocs`.
This handler retries when it receives an error\nresponse including
`type: search_phase_execution_exception`.\n\n\n### Checklist\n\nCheck
the PR satisfies following conditions. \n\nReviewers should verify this
PR satisfies this list as well.\n\n- [x] [Unit or
functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere
updated or added to match the most common scenarios\n- [x] The PR
description includes the appropriate Release Notes section,\nand the
correct `release_note:*` label is applied per
the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"68b2bde0b032efb2fab3e8a30a7d5a9e0b601f7e","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Feature:Saved
Objects","release_note:skip","backport:prev-major","Epic:ZDTmigrations","backport:current-major","v9.1.0"],"title":"Zdt
retry on common
failures","number":213979,"url":"https://github.com/elastic/kibana/pull/213979","mergeCommit":{"message":"Zdt
retry on common failures (#213979)\n\n## Summary\n\nresolves
https://github.com/elastic/kibana/issues/207096\n\nAdded a new handler
to `readWithPit`, `pickupUpdatedMappings` and\n`checkForUnknownDocs`.
This handler retries when it receives an error\nresponse including
`type: search_phase_execution_exception`.\n\n\n### Checklist\n\nCheck
the PR satisfies following conditions. \n\nReviewers should verify this
PR satisfies this list as well.\n\n- [x] [Unit or
functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere
updated or added to match the most common scenarios\n- [x] The PR
description includes the appropriate Release Notes section,\nand the
correct `release_note:*` label is applied per
the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"68b2bde0b032efb2fab3e8a30a7d5a9e0b601f7e"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/213979","number":213979,"mergeCommit":{"message":"Zdt
retry on common failures (#213979)\n\n## Summary\n\nresolves
https://github.com/elastic/kibana/issues/207096\n\nAdded a new handler
to `readWithPit`, `pickupUpdatedMappings` and\n`checkForUnknownDocs`.
This handler retries when it receives an error\nresponse including
`type: search_phase_execution_exception`.\n\n\n### Checklist\n\nCheck
the PR satisfies following conditions. \n\nReviewers should verify this
PR satisfies this list as well.\n\n- [x] [Unit or
functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere
updated or added to match the most common scenarios\n- [x] The PR
description includes the appropriate Release Notes section,\nand the
correct `release_note:*` label is applied per
the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"68b2bde0b032efb2fab3e8a30a7d5a9e0b601f7e"}}]}]
BACKPORT-->

Co-authored-by: Jesus Wahrman <41008968+jesuswr@users.noreply.github.com>
This commit is contained in:
Kibana Machine 2025-03-12 22:15:19 +11:00 committed by GitHub
parent 46ef0fd07b
commit 9414a3fe72
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 309 additions and 46 deletions

View file

@ -9,7 +9,10 @@
import { errors as esErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import {
catchRetryableEsClientErrors,
catchRetryableSearchPhaseExecutionException,
} from './catch_retryable_es_client_errors';
describe('catchRetryableEsClientErrors', () => {
it('rejects non-retryable response errors', async () => {
@ -94,3 +97,29 @@ describe('catchRetryableEsClientErrors', () => {
);
});
});
describe('catchRetryableSearchPhaseExecutionException', () => {
it('retries search phase execution exception ', async () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
body: { error: { type: 'search_phase_execution_exception' } },
})
);
expect(
((await Promise.reject(error).catch(catchRetryableSearchPhaseExecutionException)) as any).left
).toMatchObject({
message: 'search_phase_execution_exception',
type: 'retryable_es_client_error',
});
});
it('does not retry other errors', async () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
body: { error: { type: 'cluster_block_exception' } },
})
);
await expect(
Promise.reject(error).catch(catchRetryableSearchPhaseExecutionException)
).rejects.toBe(error);
});
});

View file

@ -30,3 +30,17 @@ export const catchRetryableEsClientErrors = (
throw e;
}
};
export const catchRetryableSearchPhaseExecutionException = (
e: EsErrors.ResponseError
): Either.Either<RetryableEsClientError, never> => {
if (e?.body?.error?.type === 'search_phase_execution_exception') {
return Either.left({
type: 'retryable_es_client_error' as const,
message: e?.message,
error: e,
});
} else {
throw e;
}
};

View file

@ -8,15 +8,13 @@
*/
import * as Either from 'fp-ts/lib/Either';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import * as errorHandlers from './catch_retryable_es_client_errors';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { checkForUnknownDocs } from './check_for_unknown_docs';
import { createAggregateTypesSearchResponse } from './check_for_unknown_docs.mocks';
jest.mock('./catch_retryable_es_client_errors');
describe('checkForUnknownDocs', () => {
const excludeOnUpgradeQuery: QueryDslQueryContainer = {
bool: { must: [{ term: { hello: 'dolly' } }] },
@ -39,18 +37,94 @@ describe('checkForUnknownDocs', () => {
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
excludeOnUpgradeQuery,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
const result = await task();
expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual(
expect.objectContaining({
type: 'retryable_es_client_error',
})
);
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
});
it('calls catchRetryableSearchPhaseExecutionException when the promise rejects', async () => {
// Create a mock client that rejects all methods with a search_phase_execution_exception response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
body: { error: { type: 'search_phase_execution_exception' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
excludeOnUpgradeQuery,
});
const result = await task();
expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual(
expect.objectContaining({
type: 'retryable_es_client_error',
})
);
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
// the second handler shouldn't be called since the first one is not throwing
expect(catchClientErrorsSpy).not.toHaveBeenCalled();
});
it('should throw because neither handler can retry', async () => {
// Create a mock client that rejects all methods with a 502 status code response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 502,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
excludeOnUpgradeQuery,
});
await expect(task()).rejects.toThrow();
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
});
it('calls `client.search` with the correct parameters', async () => {

View file

@ -20,6 +20,7 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsRawDocSource } from '@kbn/core-saved-objects-server';
import {
catchRetryableEsClientErrors,
catchRetryableSearchPhaseExecutionException,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { addExcludedTypesToBoolQuery } from '../model/helpers';
@ -127,5 +128,6 @@ export const checkForUnknownDocs =
return Either.right({});
})
.catch(catchRetryableSearchPhaseExecutionException)
.catch(catchRetryableEsClientErrors);
};

View file

@ -7,36 +7,103 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import * as Either from 'fp-ts/lib/Either';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import * as errorHandlers from './catch_retryable_es_client_errors';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { pickupUpdatedMappings } from './pickup_updated_mappings';
jest.mock('./catch_retryable_es_client_errors');
describe('pickupUpdatedMappings', () => {
beforeEach(() => {
jest.clearAllMocks();
});
// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
it('calls both handlers when the promise rejection cannot be retried by either', async () => {
// Create a mock client that rejects all methods with a 502 status code
// response
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 502,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = pickupUpdatedMappings(client, 'my_index', 1000);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
// Should throw because both handlers can't retry 502 responses
await expect(task()).rejects.toThrow();
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
});
it('calls both handlers when the promise rejection cannot be handled by catchRetryableSearchPhaseExecutionException', async () => {
// Create a mock client that rejects all methods with a 503 status code
// response
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = pickupUpdatedMappings(client, 'my_index', 1000);
const result = await task();
expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual(
expect.objectContaining({
type: 'retryable_es_client_error',
})
);
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
});
it('calls only the first handler when the promise rejection can be handled by it', async () => {
// Create a mock client that rejects all methods with search_phase_execution_exception
// response
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'search_phase_execution_exception' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = pickupUpdatedMappings(client, 'my_index', 1000);
const result = await task();
expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual(
expect.objectContaining({
type: 'retryable_es_client_error',
})
);
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
// the second handler shouldn't be called since the first one is not throwing
expect(catchClientErrorsSpy).not.toBeCalled();
});
});

View file

@ -13,6 +13,7 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import {
catchRetryableEsClientErrors,
catchRetryableSearchPhaseExecutionException,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
@ -61,5 +62,6 @@ export const pickupUpdatedMappings =
.then(({ task: taskId }) => {
return Either.right({ taskId: String(taskId!) });
})
.catch(catchRetryableSearchPhaseExecutionException)
.catch(catchRetryableEsClientErrors);
};

View file

@ -7,12 +7,11 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import * as Either from 'fp-ts/lib/Either';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { readWithPit } from './read_with_pit';
jest.mock('./catch_retryable_es_client_errors');
import * as errorHandlers from './catch_retryable_es_client_errors';
describe('readWithPit', () => {
beforeEach(() => {
@ -20,12 +19,7 @@ describe('readWithPit', () => {
});
it('calls esClient.search with the appropriate params', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
total: 0,
hits: [],
},
})
Promise.resolve({ hits: { hits: [] } })
);
await readWithPit({
@ -97,17 +91,96 @@ describe('readWithPit', () => {
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = readWithPit({
client,
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
const result = await task();
expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual(
expect.objectContaining({
type: 'retryable_es_client_error',
})
);
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
});
it('calls catchRetryableSearchPhaseExecutionException when the promise rejects', async () => {
// Create a mock client that rejects all methods with a search_phase_execution_exception
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
body: { error: { type: 'search_phase_execution_exception' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = readWithPit({
client,
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
});
const result = await task();
expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual(
expect.objectContaining({
type: 'retryable_es_client_error',
})
);
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
// the second handler shouldn't be called since the first one is not throwing
expect(catchClientErrorsSpy).not.toHaveBeenCalled();
});
it('throws if neither handler can retry', async () => {
// Create a mock client that rejects all methods with a 502 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 502,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
const catchSearchPhaseExceptionSpy = jest.spyOn(
errorHandlers,
'catchRetryableSearchPhaseExecutionException'
);
const task = readWithPit({
client,
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
});
// Should throw because both handlers can't retry 502 responses
await expect(task()).rejects.toThrow();
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
});
});

View file

@ -15,6 +15,7 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import {
catchRetryableEsClientErrors,
catchRetryableSearchPhaseExecutionException,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit';
@ -121,5 +122,6 @@ export const readWithPit =
throw e;
}
})
.catch(catchRetryableSearchPhaseExecutionException)
.catch(catchRetryableEsClientErrors);
};