mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
Zdt retry on common failures (#213979)
## Summary resolves https://github.com/elastic/kibana/issues/207096 Added a new handler to `readWithPit`, `pickupUpdatedMappings` and `checkForUnknownDocs`. This handler retries when it receives an error response including `type: search_phase_execution_exception`. ### Checklist Check the PR satisfies following conditions. Reviewers should verify this PR satisfies this list as well. - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [x] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
parent
14b0c611b1
commit
68b2bde0b0
8 changed files with 309 additions and 46 deletions
|
@ -9,7 +9,10 @@
|
|||
|
||||
import { errors as esErrors } from '@elastic/elasticsearch';
|
||||
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
|
||||
import {
|
||||
catchRetryableEsClientErrors,
|
||||
catchRetryableSearchPhaseExecutionException,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
|
||||
describe('catchRetryableEsClientErrors', () => {
|
||||
it('rejects non-retryable response errors', async () => {
|
||||
|
@ -94,3 +97,29 @@ describe('catchRetryableEsClientErrors', () => {
|
|||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('catchRetryableSearchPhaseExecutionException', () => {
|
||||
it('retries search phase execution exception ', async () => {
|
||||
const error = new esErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
body: { error: { type: 'search_phase_execution_exception' } },
|
||||
})
|
||||
);
|
||||
expect(
|
||||
((await Promise.reject(error).catch(catchRetryableSearchPhaseExecutionException)) as any).left
|
||||
).toMatchObject({
|
||||
message: 'search_phase_execution_exception',
|
||||
type: 'retryable_es_client_error',
|
||||
});
|
||||
});
|
||||
it('does not retry other errors', async () => {
|
||||
const error = new esErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
body: { error: { type: 'cluster_block_exception' } },
|
||||
})
|
||||
);
|
||||
await expect(
|
||||
Promise.reject(error).catch(catchRetryableSearchPhaseExecutionException)
|
||||
).rejects.toBe(error);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -30,3 +30,17 @@ export const catchRetryableEsClientErrors = (
|
|||
throw e;
|
||||
}
|
||||
};
|
||||
|
||||
export const catchRetryableSearchPhaseExecutionException = (
|
||||
e: EsErrors.ResponseError
|
||||
): Either.Either<RetryableEsClientError, never> => {
|
||||
if (e?.body?.error?.type === 'search_phase_execution_exception') {
|
||||
return Either.left({
|
||||
type: 'retryable_es_client_error' as const,
|
||||
message: e?.message,
|
||||
error: e,
|
||||
});
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -8,15 +8,13 @@
|
|||
*/
|
||||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
|
||||
import * as errorHandlers from './catch_retryable_es_client_errors';
|
||||
import { errors as EsErrors } from '@elastic/elasticsearch';
|
||||
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
|
||||
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { checkForUnknownDocs } from './check_for_unknown_docs';
|
||||
import { createAggregateTypesSearchResponse } from './check_for_unknown_docs.mocks';
|
||||
|
||||
jest.mock('./catch_retryable_es_client_errors');
|
||||
|
||||
describe('checkForUnknownDocs', () => {
|
||||
const excludeOnUpgradeQuery: QueryDslQueryContainer = {
|
||||
bool: { must: [{ term: { hello: 'dolly' } }] },
|
||||
|
@ -39,18 +37,94 @@ describe('checkForUnknownDocs', () => {
|
|||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = checkForUnknownDocs({
|
||||
client,
|
||||
indexName: '.kibana_8.0.0',
|
||||
knownTypes,
|
||||
excludeOnUpgradeQuery,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
|
||||
|
||||
const result = await task();
|
||||
expect(Either.isLeft(result)).toBe(true);
|
||||
expect((result as Either.Left<any>).left).toEqual(
|
||||
expect.objectContaining({
|
||||
type: 'retryable_es_client_error',
|
||||
})
|
||||
);
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
|
||||
it('calls catchRetryableSearchPhaseExecutionException when the promise rejects', async () => {
|
||||
// Create a mock client that rejects all methods with a search_phase_execution_exception response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
body: { error: { type: 'search_phase_execution_exception' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = checkForUnknownDocs({
|
||||
client,
|
||||
indexName: '.kibana_8.0.0',
|
||||
knownTypes,
|
||||
excludeOnUpgradeQuery,
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(Either.isLeft(result)).toBe(true);
|
||||
expect((result as Either.Left<any>).left).toEqual(
|
||||
expect.objectContaining({
|
||||
type: 'retryable_es_client_error',
|
||||
})
|
||||
);
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
// the second handler shouldn't be called since the first one is not throwing
|
||||
expect(catchClientErrorsSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should throw because neither handler can retry', async () => {
|
||||
// Create a mock client that rejects all methods with a 502 status code response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 502,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = checkForUnknownDocs({
|
||||
client,
|
||||
indexName: '.kibana_8.0.0',
|
||||
knownTypes,
|
||||
excludeOnUpgradeQuery,
|
||||
});
|
||||
|
||||
await expect(task()).rejects.toThrow();
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
|
||||
it('calls `client.search` with the correct parameters', async () => {
|
||||
|
|
|
@ -20,6 +20,7 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
|||
import type { SavedObjectsRawDocSource } from '@kbn/core-saved-objects-server';
|
||||
import {
|
||||
catchRetryableEsClientErrors,
|
||||
catchRetryableSearchPhaseExecutionException,
|
||||
type RetryableEsClientError,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
import { addExcludedTypesToBoolQuery } from '../model/helpers';
|
||||
|
@ -127,5 +128,6 @@ export const checkForUnknownDocs =
|
|||
|
||||
return Either.right({});
|
||||
})
|
||||
.catch(catchRetryableSearchPhaseExecutionException)
|
||||
.catch(catchRetryableEsClientErrors);
|
||||
};
|
||||
|
|
|
@ -7,36 +7,103 @@
|
|||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import { errors as EsErrors } from '@elastic/elasticsearch';
|
||||
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
|
||||
import * as errorHandlers from './catch_retryable_es_client_errors';
|
||||
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { pickupUpdatedMappings } from './pickup_updated_mappings';
|
||||
|
||||
jest.mock('./catch_retryable_es_client_errors');
|
||||
|
||||
describe('pickupUpdatedMappings', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
// Create a mock client that rejects all methods with a 503 status code
|
||||
// response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 503,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
|
||||
it('calls both handlers when the promise rejection cannot be retried by either', async () => {
|
||||
// Create a mock client that rejects all methods with a 502 status code
|
||||
// response
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 502,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = pickupUpdatedMappings(client, 'my_index', 1000);
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
|
||||
// Should throw because both handlers can't retry 502 responses
|
||||
await expect(task()).rejects.toThrow();
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
it('calls both handlers when the promise rejection cannot be handled by catchRetryableSearchPhaseExecutionException', async () => {
|
||||
// Create a mock client that rejects all methods with a 503 status code
|
||||
// response
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 503,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = pickupUpdatedMappings(client, 'my_index', 1000);
|
||||
const result = await task();
|
||||
|
||||
expect(Either.isLeft(result)).toBe(true);
|
||||
expect((result as Either.Left<any>).left).toEqual(
|
||||
expect.objectContaining({
|
||||
type: 'retryable_es_client_error',
|
||||
})
|
||||
);
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
it('calls only the first handler when the promise rejection can be handled by it', async () => {
|
||||
// Create a mock client that rejects all methods with search_phase_execution_exception
|
||||
// response
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 503,
|
||||
body: { error: { type: 'search_phase_execution_exception' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = pickupUpdatedMappings(client, 'my_index', 1000);
|
||||
const result = await task();
|
||||
|
||||
expect(Either.isLeft(result)).toBe(true);
|
||||
expect((result as Either.Left<any>).left).toEqual(
|
||||
expect.objectContaining({
|
||||
type: 'retryable_es_client_error',
|
||||
})
|
||||
);
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
// the second handler shouldn't be called since the first one is not throwing
|
||||
expect(catchClientErrorsSpy).not.toBeCalled();
|
||||
});
|
||||
});
|
||||
|
|
|
@ -13,6 +13,7 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
|||
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
|
||||
import {
|
||||
catchRetryableEsClientErrors,
|
||||
catchRetryableSearchPhaseExecutionException,
|
||||
type RetryableEsClientError,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
|
||||
|
@ -61,5 +62,6 @@ export const pickupUpdatedMappings =
|
|||
.then(({ task: taskId }) => {
|
||||
return Either.right({ taskId: String(taskId!) });
|
||||
})
|
||||
.catch(catchRetryableSearchPhaseExecutionException)
|
||||
.catch(catchRetryableEsClientErrors);
|
||||
};
|
||||
|
|
|
@ -7,12 +7,11 @@
|
|||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import { errors as EsErrors } from '@elastic/elasticsearch';
|
||||
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { readWithPit } from './read_with_pit';
|
||||
|
||||
jest.mock('./catch_retryable_es_client_errors');
|
||||
import * as errorHandlers from './catch_retryable_es_client_errors';
|
||||
|
||||
describe('readWithPit', () => {
|
||||
beforeEach(() => {
|
||||
|
@ -20,12 +19,7 @@ describe('readWithPit', () => {
|
|||
});
|
||||
it('calls esClient.search with the appropriate params', async () => {
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createSuccessTransportRequestPromise({
|
||||
hits: {
|
||||
total: 0,
|
||||
hits: [],
|
||||
},
|
||||
})
|
||||
Promise.resolve({ hits: { hits: [] } })
|
||||
);
|
||||
|
||||
await readWithPit({
|
||||
|
@ -97,17 +91,96 @@ describe('readWithPit', () => {
|
|||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = readWithPit({
|
||||
client,
|
||||
pitId: 'pitId',
|
||||
query: { match_all: {} },
|
||||
batchSize: 10_000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
|
||||
const result = await task();
|
||||
|
||||
expect(Either.isLeft(result)).toBe(true);
|
||||
expect((result as Either.Left<any>).left).toEqual(
|
||||
expect.objectContaining({
|
||||
type: 'retryable_es_client_error',
|
||||
})
|
||||
);
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
|
||||
it('calls catchRetryableSearchPhaseExecutionException when the promise rejects', async () => {
|
||||
// Create a mock client that rejects all methods with a search_phase_execution_exception
|
||||
// response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
body: { error: { type: 'search_phase_execution_exception' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = readWithPit({
|
||||
client,
|
||||
pitId: 'pitId',
|
||||
query: { match_all: {} },
|
||||
batchSize: 10_000,
|
||||
});
|
||||
const result = await task();
|
||||
|
||||
expect(Either.isLeft(result)).toBe(true);
|
||||
expect((result as Either.Left<any>).left).toEqual(
|
||||
expect.objectContaining({
|
||||
type: 'retryable_es_client_error',
|
||||
})
|
||||
);
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
// the second handler shouldn't be called since the first one is not throwing
|
||||
expect(catchClientErrorsSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('throws if neither handler can retry', async () => {
|
||||
// Create a mock client that rejects all methods with a 502 status code
|
||||
// response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 502,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
const catchClientErrorsSpy = jest.spyOn(errorHandlers, 'catchRetryableEsClientErrors');
|
||||
const catchSearchPhaseExceptionSpy = jest.spyOn(
|
||||
errorHandlers,
|
||||
'catchRetryableSearchPhaseExecutionException'
|
||||
);
|
||||
|
||||
const task = readWithPit({
|
||||
client,
|
||||
pitId: 'pitId',
|
||||
query: { match_all: {} },
|
||||
batchSize: 10_000,
|
||||
});
|
||||
|
||||
// Should throw because both handlers can't retry 502 responses
|
||||
await expect(task()).rejects.toThrow();
|
||||
expect(catchSearchPhaseExceptionSpy).toHaveBeenCalledWith(retryableError);
|
||||
expect(catchClientErrorsSpy).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -15,6 +15,7 @@ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
|||
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
|
||||
import {
|
||||
catchRetryableEsClientErrors,
|
||||
catchRetryableSearchPhaseExecutionException,
|
||||
type RetryableEsClientError,
|
||||
} from './catch_retryable_es_client_errors';
|
||||
import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit';
|
||||
|
@ -121,5 +122,6 @@ export const readWithPit =
|
|||
throw e;
|
||||
}
|
||||
})
|
||||
.catch(catchRetryableSearchPhaseExecutionException)
|
||||
.catch(catchRetryableEsClientErrors);
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue