mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[8.18] [SO migration] Move to previous step in update mappings wait when it fails with search_phase_execution_exception (#216693) (#217192)
# Backport This will backport the following commits from `main` to `8.18`: - [[SO migration] Move to previous step in update mappings wait when it fails with search_phase_execution_exception (#216693)](https://github.com/elastic/kibana/pull/216693) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Jesus Wahrman","email":"41008968+jesuswr@users.noreply.github.com"},"sourceCommit":{"committedDate":"2025-04-04T13:37:06Z","message":"[SO migration] Move to previous step in update mappings wait when it fails with search_phase_execution_exception (#216693)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/207096\n\nThis continues the work in https://github.com/elastic/kibana/pull/213979\n\nSometimes ES returns a 200 response containing an error field when we\nwait for the update mappings task. This case wasn't being handled. This\nPR handles that case, when we find a `search_phase_execution_exception`\nin the ES response we return a retryable error that sends us back to the\nupdate mappings state. It does it for both migration algorithms, the\npriority is ZDT but seemed like a nice to have in both.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"49380143432b654cf1b849d8c77b3abc2fb3aeb4","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Team:Core","release_note:skip","backport:prev-major","backport:current-major","v9.1.0"],"title":"[SO migration] Move to previous step in update mappings wait when it fails with search_phase_execution_exception","number":216693,"url":"https://github.com/elastic/kibana/pull/216693","mergeCommit":{"message":"[SO migration] Move to previous step in update mappings wait when it fails with search_phase_execution_exception (#216693)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/207096\n\nThis continues the work in https://github.com/elastic/kibana/pull/213979\n\nSometimes ES returns a 200 response containing an error field when we\nwait for the update mappings task. This case wasn't being handled. This\nPR handles that case, when we find a `search_phase_execution_exception`\nin the ES response we return a retryable error that sends us back to the\nupdate mappings state. It does it for both migration algorithms, the\npriority is ZDT but seemed like a nice to have in both.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"49380143432b654cf1b849d8c77b3abc2fb3aeb4"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/216693","number":216693,"mergeCommit":{"message":"[SO migration] Move to previous step in update mappings wait when it fails with search_phase_execution_exception (#216693)\n\n## Summary\n\nResolves https://github.com/elastic/kibana/issues/207096\n\nThis continues the work in https://github.com/elastic/kibana/pull/213979\n\nSometimes ES returns a 200 response containing an error field when we\nwait for the update mappings task. This case wasn't being handled. This\nPR handles that case, when we find a `search_phase_execution_exception`\nin the ES response we return a retryable error that sends us back to the\nupdate mappings state. It does it for both migration algorithms, the\npriority is ZDT but seemed like a nice to have in both.\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] The PR description includes the appropriate Release Notes section,\nand the correct `release_note:*` label is applied per the\n[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)","sha":"49380143432b654cf1b849d8c77b3abc2fb3aeb4"}}]}] BACKPORT--> Co-authored-by: Jesus Wahrman <41008968+jesuswr@users.noreply.github.com>
This commit is contained in:
parent
9112209a0e
commit
bedbfb7b16
19 changed files with 398 additions and 8 deletions
|
@ -201,6 +201,7 @@ Object {
|
|||
"retryAttempts": 5,
|
||||
"retryCount": 0,
|
||||
"retryDelay": 0,
|
||||
"skipRetryReset": false,
|
||||
"targetIndexMappings": Object {
|
||||
"properties": Object {},
|
||||
},
|
||||
|
@ -427,6 +428,7 @@ Object {
|
|||
"retryAttempts": 5,
|
||||
"retryCount": 0,
|
||||
"retryDelay": 0,
|
||||
"skipRetryReset": false,
|
||||
"targetIndexMappings": Object {
|
||||
"properties": Object {},
|
||||
},
|
||||
|
@ -657,6 +659,7 @@ Object {
|
|||
"retryAttempts": 5,
|
||||
"retryCount": 0,
|
||||
"retryDelay": 0,
|
||||
"skipRetryReset": false,
|
||||
"targetIndexMappings": Object {
|
||||
"properties": Object {},
|
||||
},
|
||||
|
@ -891,6 +894,7 @@ Object {
|
|||
"retryAttempts": 5,
|
||||
"retryCount": 0,
|
||||
"retryDelay": 0,
|
||||
"skipRetryReset": false,
|
||||
"targetIndexMappings": Object {
|
||||
"properties": Object {},
|
||||
},
|
||||
|
@ -1150,6 +1154,7 @@ Object {
|
|||
"retryAttempts": 5,
|
||||
"retryCount": 0,
|
||||
"retryDelay": 0,
|
||||
"skipRetryReset": false,
|
||||
"targetIndexMappings": Object {
|
||||
"properties": Object {},
|
||||
},
|
||||
|
@ -1387,6 +1392,7 @@ Object {
|
|||
"retryAttempts": 5,
|
||||
"retryCount": 0,
|
||||
"retryDelay": 0,
|
||||
"skipRetryReset": false,
|
||||
"targetIndexMappings": Object {
|
||||
"properties": Object {},
|
||||
},
|
||||
|
|
|
@ -49,7 +49,11 @@ import type { IndexNotGreenTimeout, IndexNotYellowTimeout } from './wait_for_ind
|
|||
import { waitForIndexStatus } from './wait_for_index_status';
|
||||
|
||||
export type { WaitForTaskResponse, WaitForTaskCompletionTimeout } from './wait_for_task';
|
||||
import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task';
|
||||
import {
|
||||
waitForTask,
|
||||
TaskCompletedWithRetriableError,
|
||||
WaitForTaskCompletionTimeout,
|
||||
} from './wait_for_task';
|
||||
|
||||
export type { UpdateByQueryResponse } from './pickup_updated_mappings';
|
||||
import { pickupUpdatedMappings } from './pickup_updated_mappings';
|
||||
|
@ -176,6 +180,7 @@ export interface AcknowledgeResponse {
|
|||
// Map of left response 'type' string -> response interface
|
||||
export interface ActionErrorTypeMap {
|
||||
wait_for_task_completion_timeout: WaitForTaskCompletionTimeout;
|
||||
task_completed_with_retriable_error: TaskCompletedWithRetriableError;
|
||||
retryable_es_client_error: RetryableEsClientError;
|
||||
index_not_found_exception: IndexNotFound;
|
||||
target_index_had_write_block: TargetIndexHadWriteBlock;
|
||||
|
|
|
@ -11,6 +11,8 @@ import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors
|
|||
import { errors as EsErrors } from '@elastic/elasticsearch';
|
||||
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { waitForPickupUpdatedMappingsTask } from './wait_for_pickup_updated_mappings_task';
|
||||
import * as Either from 'fp-ts/Either';
|
||||
import { TaskCompletedWithRetriableError } from './wait_for_task';
|
||||
|
||||
jest.mock('./catch_retryable_es_client_errors', () => {
|
||||
const { catchRetryableEsClientErrors: actualImplementation } = jest.requireActual(
|
||||
|
@ -64,4 +66,32 @@ describe('waitForPickupUpdatedMappingsTask', () => {
|
|||
});
|
||||
expect(task()).rejects.toThrowError(nonRetryableError);
|
||||
});
|
||||
|
||||
it('returns task_completed_with_retriable_error when the client returns a search_phase_execution_exception', async () => {
|
||||
const client = elasticsearchClientMock.createInternalClient();
|
||||
client.tasks.get.mockResponse({
|
||||
completed: true,
|
||||
task: {
|
||||
action: 'any action',
|
||||
cancellable: false,
|
||||
headers: {},
|
||||
id: 4273,
|
||||
node: 'any node',
|
||||
running_time_in_nanos: 123,
|
||||
start_time_in_millis: 312,
|
||||
type: 'any type',
|
||||
},
|
||||
error: { type: 'search_phase_execution_exception' },
|
||||
});
|
||||
|
||||
const task = waitForPickupUpdatedMappingsTask({ client, taskId: '4273', timeout: '2m' });
|
||||
const result = await task();
|
||||
expect(Either.isLeft(result)).toBe(true);
|
||||
expect((result as Either.Left<TaskCompletedWithRetriableError>).left).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"message": "The task being waited on encountered a search_phase_execution_exception error",
|
||||
"type": "task_completed_with_retriable_error",
|
||||
}
|
||||
`);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -10,7 +10,11 @@
|
|||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import * as Option from 'fp-ts/lib/Option';
|
||||
import { flow } from 'fp-ts/lib/function';
|
||||
import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task';
|
||||
import {
|
||||
waitForTask,
|
||||
WaitForTaskCompletionTimeout,
|
||||
TaskCompletedWithRetriableError,
|
||||
} from './wait_for_task';
|
||||
import { RetryableEsClientError } from './catch_retryable_es_client_errors';
|
||||
|
||||
export const waitForPickupUpdatedMappingsTask = flow(
|
||||
|
@ -19,7 +23,7 @@ export const waitForPickupUpdatedMappingsTask = flow(
|
|||
(
|
||||
res
|
||||
): TaskEither.TaskEither<
|
||||
RetryableEsClientError | WaitForTaskCompletionTimeout,
|
||||
RetryableEsClientError | WaitForTaskCompletionTimeout | TaskCompletedWithRetriableError,
|
||||
'pickup_updated_mappings_succeeded'
|
||||
> => {
|
||||
// We don't catch or type failures/errors because they should never
|
||||
|
@ -32,6 +36,15 @@ export const waitForPickupUpdatedMappingsTask = flow(
|
|||
JSON.stringify(res.failures.value)
|
||||
);
|
||||
} else if (Option.isSome(res.error)) {
|
||||
if (res.error.value.type === 'search_phase_execution_exception') {
|
||||
// This error is normally fixed in the next try, so let's retry
|
||||
// the update mappings task instead of throwing
|
||||
return TaskEither.left({
|
||||
type: 'task_completed_with_retriable_error' as const,
|
||||
message: `The task being waited on encountered a ${res.error.value.type} error`,
|
||||
});
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
'pickupUpdatedMappings task failed with the following error:\n' +
|
||||
JSON.stringify(res.error.value)
|
||||
|
|
|
@ -42,6 +42,19 @@ export interface WaitForTaskCompletionTimeout {
|
|||
readonly error?: Error;
|
||||
}
|
||||
|
||||
/**
|
||||
* When we use `wait_for_completion=false`, we won't get the errors right away, we'll get a
|
||||
* task id. Then we have to query the tasks API with that id and Elasticsearch will tell us
|
||||
* if there was any error in the original task inside a 200 response. In some cases we might
|
||||
* want to retry the original task.
|
||||
*/
|
||||
export interface TaskCompletedWithRetriableError {
|
||||
/** While waiting, the original task encountered an error. It might need to be retried. */
|
||||
readonly type: 'task_completed_with_retriable_error';
|
||||
readonly message: string;
|
||||
readonly error?: Error;
|
||||
}
|
||||
|
||||
const catchWaitForTaskCompletionTimeout = (
|
||||
e: EsErrors.ResponseError
|
||||
): Either.Either<WaitForTaskCompletionTimeout, never> => {
|
||||
|
|
|
@ -304,6 +304,7 @@ describe('createInitialState', () => {
|
|||
"retryAttempts": 15,
|
||||
"retryCount": 0,
|
||||
"retryDelay": 0,
|
||||
"skipRetryReset": false,
|
||||
"targetIndexMappings": Object {
|
||||
"_meta": Object {
|
||||
"indexTypesMap": Object {
|
||||
|
|
|
@ -127,6 +127,7 @@ export const createInitialState = ({
|
|||
tempIndexMappings: TEMP_INDEX_MAPPINGS,
|
||||
outdatedDocumentsQuery,
|
||||
retryCount: 0,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 0,
|
||||
retryAttempts: migrationsConfig.retryAttempts,
|
||||
batchSize: migrationsConfig.batchSize,
|
||||
|
|
|
@ -66,6 +66,7 @@ import type { ResponseType } from '../next';
|
|||
import { createInitialProgress } from './progress';
|
||||
import { model } from './model';
|
||||
import type { BulkIndexOperationTuple, BulkOperation } from './create_batches';
|
||||
import { TaskCompletedWithRetriableError } from '../actions/wait_for_task';
|
||||
|
||||
describe('migrations v2 model', () => {
|
||||
const indexMapping: IndexMapping = {
|
||||
|
@ -88,6 +89,7 @@ describe('migrations v2 model', () => {
|
|||
kibanaVersion: '7.11.0',
|
||||
logs: [],
|
||||
retryCount: 0,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 0,
|
||||
retryAttempts: 15,
|
||||
batchSize: 1000,
|
||||
|
@ -2973,6 +2975,9 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.updateTargetMappingsTaskId).toEqual('update target mappings task');
|
||||
expect(newState.retryCount).toEqual(0);
|
||||
expect(newState.retryDelay).toEqual(0);
|
||||
// make sure the updated types query is still in the new state since
|
||||
// we might want to come back if the wait for task state fails
|
||||
expect(newState.updatedTypesQuery).toEqual(updateTargetMappingsState.updatedTypesQuery);
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -2984,6 +2989,17 @@ describe('migrations v2 model', () => {
|
|||
sourceIndex: Option.some('.kibana') as Option.Some<string>,
|
||||
targetIndex: '.kibana_7.11.0_001',
|
||||
updateTargetMappingsTaskId: 'update target mappings task',
|
||||
updatedTypesQuery: Option.fromNullable({
|
||||
bool: {
|
||||
should: [
|
||||
{
|
||||
term: {
|
||||
type: 'type123',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
}),
|
||||
};
|
||||
|
||||
test('UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_META if response is right', () => {
|
||||
|
@ -3022,6 +3038,105 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.retryCount).toEqual(2);
|
||||
expect(newState.retryDelay).toEqual(4000);
|
||||
});
|
||||
|
||||
test('UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_PROPERTIES when there is an error that makes us want to retry the original task', () => {
|
||||
const res: ResponseType<'UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK'> = Either.left({
|
||||
message: 'Some error happened that makes us want to retry the original task',
|
||||
type: 'task_completed_with_retriable_error',
|
||||
});
|
||||
const newState = model(
|
||||
updateTargetMappingsWaitForTaskState,
|
||||
res
|
||||
) as UpdateTargetMappingsPropertiesWaitForTaskState;
|
||||
expect(newState.controlState).toEqual('UPDATE_TARGET_MAPPINGS_PROPERTIES');
|
||||
expect(newState.retryCount).toEqual(1);
|
||||
expect(newState.updatedTypesQuery).toEqual(
|
||||
updateTargetMappingsWaitForTaskState.updatedTypesQuery
|
||||
);
|
||||
});
|
||||
|
||||
test('UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_PROPERTIES updates correcly the retry number', () => {
|
||||
const state = Object.assign({}, updateTargetMappingsWaitForTaskState, { retryCount: 3 });
|
||||
const res: ResponseType<'UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK'> = Either.left({
|
||||
message: 'Some error happened that makes us want to retry the original task',
|
||||
type: 'task_completed_with_retriable_error',
|
||||
});
|
||||
const newState = model(state, res) as UpdateTargetMappingsPropertiesWaitForTaskState;
|
||||
expect(newState.controlState).toEqual('UPDATE_TARGET_MAPPINGS_PROPERTIES');
|
||||
expect(newState.retryCount).toEqual(4);
|
||||
expect(newState.updatedTypesQuery).toEqual(
|
||||
updateTargetMappingsWaitForTaskState.updatedTypesQuery
|
||||
);
|
||||
});
|
||||
|
||||
test('UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_PROPERTIES -> UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK updates retry attributes correctly', () => {
|
||||
const initialRetryCount = 3;
|
||||
|
||||
// First, we are in UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK
|
||||
const initialWaitState = Object.assign({}, updateTargetMappingsWaitForTaskState, {
|
||||
retryCount: initialRetryCount,
|
||||
});
|
||||
expect(initialWaitState.retryCount).toBe(initialRetryCount);
|
||||
expect(initialWaitState.skipRetryReset).toBeFalsy();
|
||||
|
||||
// Move to UPDATE_TARGET_MAPPINGS_PROPERTIES and retry it (+1 retry)
|
||||
const retryingMappingsUpdate = model(
|
||||
initialWaitState,
|
||||
Either.left({
|
||||
message: 'Some error happened that makes us want to retry the original task',
|
||||
type: 'task_completed_with_retriable_error',
|
||||
} as TaskCompletedWithRetriableError)
|
||||
) as UpdateTargetMappingsPropertiesWaitForTaskState;
|
||||
expect(retryingMappingsUpdate.retryCount).toBe(initialRetryCount + 1);
|
||||
expect(retryingMappingsUpdate.skipRetryReset).toBe(true);
|
||||
|
||||
// Retry UPDATE_TARGET_MAPPINGS_PROPERTIES again (+1 retry)
|
||||
const retryingMappingsUpdateAgain = model(
|
||||
retryingMappingsUpdate,
|
||||
Either.left({
|
||||
type: 'retryable_es_client_error',
|
||||
message: 'random retryable error',
|
||||
} as RetryableEsClientError)
|
||||
) as UpdateTargetMappingsPropertiesWaitForTaskState;
|
||||
expect(retryingMappingsUpdateAgain.retryCount).toBe(initialRetryCount + 2);
|
||||
expect(retryingMappingsUpdateAgain.skipRetryReset).toBe(true);
|
||||
|
||||
// Go again to the wait state, so retryCount should remain the same
|
||||
const finalWaitStateBeforeCompletion = model(
|
||||
retryingMappingsUpdateAgain,
|
||||
Either.right({
|
||||
taskId: 'update target mappings task',
|
||||
}) as ResponseType<'UPDATE_TARGET_MAPPINGS_PROPERTIES'>
|
||||
) as UpdateTargetMappingsPropertiesWaitForTaskState;
|
||||
expect(finalWaitStateBeforeCompletion.retryCount).toBe(initialRetryCount + 2);
|
||||
expect(finalWaitStateBeforeCompletion.skipRetryReset).toBeFalsy();
|
||||
|
||||
// The wait state completes successfully, so retryCount should reset
|
||||
const postUpdateState = model(
|
||||
finalWaitStateBeforeCompletion,
|
||||
Either.right(
|
||||
'pickup_updated_mappings_succeeded'
|
||||
) as ResponseType<'UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK'>
|
||||
) as UpdateTargetMappingsMeta;
|
||||
expect(postUpdateState.retryCount).toBe(0);
|
||||
expect(postUpdateState.skipRetryReset).toBeFalsy();
|
||||
});
|
||||
|
||||
test('UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> FATAL if task_completed_with_retriable_error has no more retry attemps', () => {
|
||||
const state = Object.assign({}, updateTargetMappingsWaitForTaskState, {
|
||||
retryCount: 8,
|
||||
retryAttempts: 8,
|
||||
});
|
||||
const res: ResponseType<'UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK'> = Either.left({
|
||||
message: 'some_retryable_error_during_update',
|
||||
type: 'task_completed_with_retriable_error',
|
||||
});
|
||||
const newState = model(state, res) as FatalState;
|
||||
expect(newState.controlState).toEqual('FATAL');
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"Unable to complete the UPDATE_TARGET_MAPPINGS_PROPERTIES step after 8 attempts, terminating. The last failure message was: some_retryable_error_during_update"`
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('UPDATE_TARGET_MAPPINGS_META', () => {
|
||||
|
|
|
@ -1558,6 +1558,16 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
// exponential delay. We will basically keep polling forever until the
|
||||
// Elasticsearch task succeeds or fails.
|
||||
return delayRetryState(stateP, res.left.message, Number.MAX_SAFE_INTEGER);
|
||||
} else if (isTypeof(left, 'task_completed_with_retriable_error')) {
|
||||
return delayRetryState(
|
||||
{
|
||||
...stateP,
|
||||
controlState: 'UPDATE_TARGET_MAPPINGS_PROPERTIES',
|
||||
skipRetryReset: true,
|
||||
},
|
||||
left.message,
|
||||
stateP.retryAttempts
|
||||
);
|
||||
} else {
|
||||
throwBadResponse(stateP, left);
|
||||
}
|
||||
|
|
|
@ -121,16 +121,35 @@ describe('resetRetryState', () => {
|
|||
hello: 'dolly',
|
||||
foo: 42,
|
||||
retryCount: 5,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 1000,
|
||||
});
|
||||
|
||||
expect(resetRetryState(state)).toEqual({
|
||||
...state,
|
||||
retryCount: 0,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 0,
|
||||
});
|
||||
});
|
||||
|
||||
it('doesnt reset retry attributes if skipRetryReset is true and resets skipRetryReset', () => {
|
||||
const state = createState({
|
||||
hello: 'dolly',
|
||||
foo: 42,
|
||||
retryCount: 5,
|
||||
skipRetryReset: true,
|
||||
retryDelay: 1000,
|
||||
});
|
||||
|
||||
expect(resetRetryState(state)).toEqual({
|
||||
...state,
|
||||
retryCount: 5,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 1000,
|
||||
});
|
||||
});
|
||||
|
||||
it('works when the retry attributes are not yet present on the state', () => {
|
||||
const state = createState({
|
||||
hello: 'dolly',
|
||||
|
@ -140,6 +159,7 @@ describe('resetRetryState', () => {
|
|||
expect(resetRetryState(state)).toEqual({
|
||||
...state,
|
||||
retryCount: 0,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 0,
|
||||
});
|
||||
});
|
||||
|
|
|
@ -12,6 +12,7 @@ import type { MigrationLog } from '../types';
|
|||
export interface RetryableState {
|
||||
controlState: string;
|
||||
retryCount: number;
|
||||
skipRetryReset: boolean;
|
||||
retryDelay: number;
|
||||
logs: MigrationLog[];
|
||||
}
|
||||
|
@ -50,7 +51,8 @@ export const delayRetryState = <S extends RetryableState>(
|
|||
export const resetRetryState = <S extends RetryableState>(state: S): S => {
|
||||
return {
|
||||
...state,
|
||||
retryCount: 0,
|
||||
retryDelay: 0,
|
||||
retryCount: state.skipRetryReset ? state.retryCount : 0,
|
||||
retryDelay: state.skipRetryReset ? state.retryDelay : 0,
|
||||
skipRetryReset: false,
|
||||
};
|
||||
};
|
||||
|
|
|
@ -51,6 +51,7 @@ export interface BaseState extends ControlState {
|
|||
readonly preMigrationScript: Option.Option<string>;
|
||||
readonly outdatedDocumentsQuery: QueryDslQueryContainer;
|
||||
readonly retryCount: number;
|
||||
readonly skipRetryReset: boolean;
|
||||
readonly retryDelay: number;
|
||||
/**
|
||||
* How many times to retry a step that fails with retryable_es_client_error
|
||||
|
@ -403,6 +404,7 @@ export interface UpdateTargetMappingsPropertiesWaitForTaskState extends PostInit
|
|||
/** Update the mappings of the target index */
|
||||
readonly controlState: 'UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK';
|
||||
readonly updateTargetMappingsTaskId: string;
|
||||
readonly updatedTypesQuery: Option.Option<QueryDslQueryContainer>;
|
||||
}
|
||||
|
||||
export interface UpdateTargetMappingsMeta extends PostInitState {
|
||||
|
|
|
@ -29,6 +29,7 @@ describe('model', () => {
|
|||
const baseState: BaseState = {
|
||||
controlState: '42',
|
||||
retryCount: 0,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 0,
|
||||
logs: [],
|
||||
skipDocumentMigration: false,
|
||||
|
|
|
@ -32,6 +32,7 @@ describe('Stage: init', () => {
|
|||
const createState = (parts: Partial<InitState> = {}): InitState => ({
|
||||
controlState: 'INIT',
|
||||
retryDelay: 0,
|
||||
skipRetryReset: false,
|
||||
retryCount: 0,
|
||||
logs: [],
|
||||
skipDocumentMigration: false,
|
||||
|
|
|
@ -14,19 +14,54 @@ import {
|
|||
type MockedMigratorContext,
|
||||
} from '../../test_helpers';
|
||||
import type { ResponseType } from '../../next';
|
||||
import type { UpdateIndexMappingsState } from '../../state';
|
||||
import type {
|
||||
State,
|
||||
UpdateIndexMappingsState,
|
||||
UpdateIndexMappingsWaitForTaskState,
|
||||
} from '../../state';
|
||||
import type { StateActionResponse } from '../types';
|
||||
import { updateIndexMappings } from './update_index_mappings';
|
||||
import { updateIndexMappingsWaitForTask } from './update_index_mappings_wait_for_task';
|
||||
import { model } from '../model';
|
||||
import { RetryableEsClientError } from '../../actions';
|
||||
import { TaskCompletedWithRetriableError } from '../../../actions/wait_for_task';
|
||||
import { FatalState } from '../../../state';
|
||||
|
||||
describe('Stage: updateIndexMappings', () => {
|
||||
let context: MockedMigratorContext;
|
||||
|
||||
const retryableError: RetryableEsClientError = {
|
||||
type: 'retryable_es_client_error',
|
||||
message: 'snapshot_in_progress_exception',
|
||||
};
|
||||
|
||||
const taskCompletedWithRetriableError: TaskCompletedWithRetriableError = {
|
||||
type: 'task_completed_with_retriable_error' as const,
|
||||
message: 'search_phase_execution_exception',
|
||||
};
|
||||
|
||||
const createState = (
|
||||
parts: Partial<UpdateIndexMappingsState> = {}
|
||||
): UpdateIndexMappingsState => ({
|
||||
...createPostInitState(),
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS',
|
||||
additiveMappingChanges: {},
|
||||
additiveMappingChanges: {
|
||||
someToken: { type: 'keyword' },
|
||||
anotherField: { type: 'text', analyzer: 'standard' },
|
||||
},
|
||||
...parts,
|
||||
});
|
||||
|
||||
const createWaitState = (
|
||||
parts: Partial<UpdateIndexMappingsWaitForTaskState> = {}
|
||||
): UpdateIndexMappingsWaitForTaskState => ({
|
||||
...createPostInitState(),
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK',
|
||||
additiveMappingChanges: {
|
||||
someOtherToken: { type: 'keyword' },
|
||||
anotherAwesomeField: { type: 'text', analyzer: 'standard' },
|
||||
},
|
||||
updateTargetMappingsTaskId: '73',
|
||||
...parts,
|
||||
});
|
||||
|
||||
|
@ -45,10 +80,131 @@ describe('Stage: updateIndexMappings', () => {
|
|||
res as StateActionResponse<'UPDATE_INDEX_MAPPINGS'>,
|
||||
context
|
||||
);
|
||||
|
||||
// it's important that newState contains additiveMappingChanges in case we want to go back
|
||||
expect(newState).toEqual({
|
||||
...state,
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK',
|
||||
updateTargetMappingsTaskId: '42',
|
||||
});
|
||||
});
|
||||
|
||||
it('UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK -> UPDATE_INDEX_MAPPINGS in case of task_completed_with_retriable_error', () => {
|
||||
const state = createWaitState();
|
||||
const res: StateActionResponse<'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK'> = Either.left(
|
||||
taskCompletedWithRetriableError
|
||||
);
|
||||
|
||||
const newState = updateIndexMappingsWaitForTask(state, res, context);
|
||||
|
||||
expect(newState).toEqual({
|
||||
...state,
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS',
|
||||
retryCount: 1,
|
||||
skipRetryReset: true,
|
||||
retryDelay: expect.any(Number),
|
||||
logs: expect.any(Array),
|
||||
});
|
||||
});
|
||||
|
||||
it('UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK -> UPDATE_INDEX_MAPPINGS after some retries', () => {
|
||||
const state = createWaitState({
|
||||
retryCount: 12,
|
||||
});
|
||||
const res: StateActionResponse<'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK'> = Either.left(
|
||||
taskCompletedWithRetriableError
|
||||
);
|
||||
|
||||
const newState = updateIndexMappingsWaitForTask(state, res, context);
|
||||
|
||||
expect(newState).toEqual({
|
||||
...state,
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS',
|
||||
retryCount: 13,
|
||||
skipRetryReset: true,
|
||||
retryDelay: expect.any(Number),
|
||||
logs: expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
level: 'error',
|
||||
message: `Action failed with 'search_phase_execution_exception'. Retrying attempt 13 in 64 seconds.`,
|
||||
}),
|
||||
]),
|
||||
});
|
||||
});
|
||||
|
||||
it('UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK -> FATAL in case of task_completed_with_retriable_error when exceeding retry count', () => {
|
||||
const state = createWaitState({
|
||||
retryCount: context.maxRetryAttempts,
|
||||
});
|
||||
const res: StateActionResponse<'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK'> = Either.left(
|
||||
taskCompletedWithRetriableError
|
||||
);
|
||||
|
||||
const newState = updateIndexMappingsWaitForTask(state, res, context) as unknown as FatalState;
|
||||
|
||||
expect(newState.controlState).toEqual('FATAL');
|
||||
expect(newState.reason).toMatchInlineSnapshot(
|
||||
`"Unable to complete the UPDATE_INDEX_MAPPINGS step after 15 attempts, terminating. The last failure message was: search_phase_execution_exception"`
|
||||
);
|
||||
});
|
||||
|
||||
describe('skipRetryReset', () => {
|
||||
test('UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_PROPERTIES -> UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK updates retry attributes correctly', () => {
|
||||
const initialRetryCount = 3;
|
||||
|
||||
// First, we are in UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK state
|
||||
const initialWaitState: State = {
|
||||
...createPostInitState(),
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK',
|
||||
additiveMappingChanges: {
|
||||
someOtherToken: { type: 'keyword' },
|
||||
anotherAwesomeField: { type: 'text', analyzer: 'standard' },
|
||||
},
|
||||
updateTargetMappingsTaskId: '73',
|
||||
retryCount: initialRetryCount,
|
||||
};
|
||||
expect(initialWaitState.retryCount).toBe(initialRetryCount);
|
||||
expect(initialWaitState.skipRetryReset).toBeFalsy();
|
||||
|
||||
// Now we move to UPDATE_TARGET_MAPPINGS_PROPERTIES and retry it (+1 retry)
|
||||
const retryingMappingsUpdate = model(
|
||||
initialWaitState,
|
||||
Either.left(taskCompletedWithRetriableError),
|
||||
context
|
||||
);
|
||||
expect(retryingMappingsUpdate.retryCount).toBe(initialRetryCount + 1);
|
||||
expect(retryingMappingsUpdate.skipRetryReset).toBe(true);
|
||||
|
||||
// We retry UPDATE_TARGET_MAPPINGS_PROPERTIES again (+1 retry)
|
||||
const retryingMappingsUpdateAgain = model(
|
||||
retryingMappingsUpdate,
|
||||
Either.left(retryableError),
|
||||
context
|
||||
);
|
||||
expect(retryingMappingsUpdateAgain.retryCount).toBe(initialRetryCount + 2);
|
||||
expect(retryingMappingsUpdateAgain.skipRetryReset).toBe(true);
|
||||
|
||||
// Now we go back to the wait state, so retryCount should remain the same
|
||||
const finalWaitStateBeforeCompletion = model(
|
||||
retryingMappingsUpdateAgain,
|
||||
Either.right({
|
||||
taskId: 'update target mappings task',
|
||||
}) as StateActionResponse<'UPDATE_INDEX_MAPPINGS'>,
|
||||
context
|
||||
);
|
||||
expect(finalWaitStateBeforeCompletion.retryCount).toBe(initialRetryCount + 2);
|
||||
expect(finalWaitStateBeforeCompletion.skipRetryReset).toBeFalsy();
|
||||
|
||||
// The wait state completes successfully, so retryCount should reset
|
||||
const postUpdateState = model(
|
||||
finalWaitStateBeforeCompletion,
|
||||
Either.right(
|
||||
'pickup_updated_mappings_succeeded'
|
||||
) as StateActionResponse<'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK'>,
|
||||
context
|
||||
);
|
||||
expect(postUpdateState.retryCount).toBe(0);
|
||||
expect(postUpdateState.skipRetryReset).toBeFalsy();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -16,7 +16,7 @@ import type { ModelStage } from '../types';
|
|||
|
||||
export const updateIndexMappingsWaitForTask: ModelStage<
|
||||
'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK',
|
||||
'UPDATE_MAPPING_MODEL_VERSIONS' | 'FATAL'
|
||||
'UPDATE_MAPPING_MODEL_VERSIONS' | 'UPDATE_INDEX_MAPPINGS' | 'FATAL'
|
||||
> = (state, res, context) => {
|
||||
if (Either.isLeft(res)) {
|
||||
const left = res.left;
|
||||
|
@ -26,6 +26,16 @@ export const updateIndexMappingsWaitForTask: ModelStage<
|
|||
// exponential delay. We will basically keep polling forever until the
|
||||
// Elasticsearch task succeeds or fails.
|
||||
return delayRetryState(state, left.message, Number.MAX_SAFE_INTEGER);
|
||||
} else if (isTypeof(left, 'task_completed_with_retriable_error')) {
|
||||
return delayRetryState(
|
||||
{
|
||||
...state,
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS',
|
||||
skipRetryReset: true,
|
||||
},
|
||||
left.message,
|
||||
context.maxRetryAttempts
|
||||
);
|
||||
} else {
|
||||
throwBadResponse(state, left);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ export const createInitialState = (context: MigratorContext): State => {
|
|||
controlState: 'INIT',
|
||||
logs: [],
|
||||
retryCount: 0,
|
||||
skipRetryReset: false,
|
||||
retryDelay: 0,
|
||||
skipDocumentMigration: !runDocumentMigration,
|
||||
};
|
||||
|
|
|
@ -22,6 +22,7 @@ import type { TransformErrorObjects } from '../../core';
|
|||
|
||||
export interface BaseState extends ControlState {
|
||||
readonly retryCount: number;
|
||||
readonly skipRetryReset: boolean;
|
||||
readonly retryDelay: number;
|
||||
readonly logs: MigrationLog[];
|
||||
/**
|
||||
|
@ -110,6 +111,7 @@ export interface UpdateIndexMappingsState extends PostInitState {
|
|||
export interface UpdateIndexMappingsWaitForTaskState extends PostInitState {
|
||||
readonly controlState: 'UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK';
|
||||
readonly updateTargetMappingsTaskId: string;
|
||||
readonly additiveMappingChanges: SavedObjectsMappingProperties;
|
||||
}
|
||||
|
||||
export interface UpdateMappingModelVersionState extends PostInitState {
|
||||
|
|
|
@ -12,6 +12,7 @@ import { PostInitState, PostDocInitState, OutdatedDocumentsSearchState } from '.
|
|||
export const createPostInitState = (): PostInitState => ({
|
||||
controlState: 'INIT',
|
||||
retryDelay: 0,
|
||||
skipRetryReset: false,
|
||||
retryCount: 0,
|
||||
logs: [],
|
||||
currentIndex: '.kibana_1',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue