mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
Prevent write blocking target index during reindex migration (#185939)
## Summary Addresses https://github.com/elastic/kibana/issues/185918 The idea is to simply check whether the index that a migrator is trying to `write_block` (aka the source of the reindex operation) matches the target index name. In this case: * We assume that other migrators are half way through, ahead of us. * We abort operation and trust other instances' migrators to finish the job. * Subsequent restart, when migration has finished, should basically be a no-op.
This commit is contained in:
parent
2a6172c090
commit
dbdc797781
11 changed files with 137 additions and 6 deletions
|
@ -8,7 +8,7 @@
|
|||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { pipe } from 'fp-ts/lib/function';
|
||||
import { errors as EsErrors } from '@elastic/elasticsearch';
|
||||
import type {
|
||||
ElasticsearchClient,
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { pipe } from 'fp-ts/lib/function';
|
||||
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import type {
|
||||
ElasticsearchClient,
|
||||
|
|
|
@ -27,6 +27,9 @@ export { initAction } from './initialize_action';
|
|||
export type { FetchIndexResponse, FetchIndicesParams } from './fetch_indices';
|
||||
export { fetchIndices } from './fetch_indices';
|
||||
|
||||
export type { SafeWriteBlockParams } from './safe_write_block';
|
||||
export { safeWriteBlock } from './safe_write_block';
|
||||
|
||||
export type { SetWriteBlockParams } from './set_write_block';
|
||||
export { setWriteBlock } from './set_write_block';
|
||||
|
||||
|
@ -158,6 +161,11 @@ export interface EsResponseTooLargeError {
|
|||
contentLength: number;
|
||||
}
|
||||
|
||||
export interface SourceEqualsTarget {
|
||||
type: 'source_equals_target';
|
||||
index: string;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface AcknowledgeResponse {
|
||||
acknowledged: boolean;
|
||||
|
@ -185,6 +193,7 @@ export interface ActionErrorTypeMap {
|
|||
index_mappings_incomplete: IndexMappingsIncomplete;
|
||||
types_changed: TypesChanged;
|
||||
operation_not_supported: OperationNotSupported;
|
||||
source_equals_target: SourceEqualsTarget;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { pipe } from 'fp-ts/lib/function';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import {
|
||||
catchRetryableEsClientErrors,
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { safeWriteBlock } from './safe_write_block';
|
||||
|
||||
jest.mock('./set_write_block');
|
||||
import { setWriteBlock } from './set_write_block';
|
||||
|
||||
const setWriteBlockMock = setWriteBlock as jest.MockedFn<typeof setWriteBlock>;
|
||||
|
||||
describe('safeWriteBlock', () => {
|
||||
beforeEach(() => {
|
||||
setWriteBlockMock.mockReset();
|
||||
setWriteBlockMock.mockReturnValueOnce(
|
||||
TaskEither.fromEither(Either.right('set_write_block_succeeded' as const))
|
||||
);
|
||||
});
|
||||
|
||||
const client = elasticsearchClientMock.createInternalClient();
|
||||
it('returns a Left response if source and target indices match', async () => {
|
||||
const task = safeWriteBlock({
|
||||
client,
|
||||
sourceIndex: '.kibana_8.15.0_001',
|
||||
targetIndex: '.kibana_8.15.0_001',
|
||||
});
|
||||
const res = await task();
|
||||
expect(res).toEqual(Either.left({ type: 'source_equals_target', index: '.kibana_8.15.0_001' }));
|
||||
expect(setWriteBlockMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('calls setWriteBlock if indices are different', async () => {
|
||||
const task = safeWriteBlock({
|
||||
client,
|
||||
sourceIndex: '.kibana_7.13.0_001',
|
||||
targetIndex: '.kibana_8.15.0_001',
|
||||
timeout: '28s',
|
||||
});
|
||||
const res = await task();
|
||||
expect(res).toEqual(Either.right('set_write_block_succeeded' as const));
|
||||
expect(setWriteBlockMock).toHaveBeenCalledTimes(1);
|
||||
expect(setWriteBlockMock).toHaveBeenCalledWith({
|
||||
client,
|
||||
index: '.kibana_7.13.0_001',
|
||||
timeout: '28s',
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import { pipe } from 'fp-ts/lib/function';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { RetryableEsClientError } from './catch_retryable_es_client_errors';
|
||||
import { DEFAULT_TIMEOUT, type SourceEqualsTarget, type IndexNotFound } from '.';
|
||||
import { setWriteBlock } from './set_write_block';
|
||||
|
||||
/** @internal */
|
||||
export interface SafeWriteBlockParams {
|
||||
client: ElasticsearchClient;
|
||||
sourceIndex: string;
|
||||
targetIndex: string;
|
||||
timeout?: string;
|
||||
}
|
||||
|
||||
export const safeWriteBlock = ({
|
||||
client,
|
||||
sourceIndex,
|
||||
targetIndex,
|
||||
timeout = DEFAULT_TIMEOUT,
|
||||
}: SafeWriteBlockParams): TaskEither.TaskEither<
|
||||
SourceEqualsTarget | IndexNotFound | RetryableEsClientError,
|
||||
'set_write_block_succeeded'
|
||||
> => {
|
||||
const assertSourceAndTargetDifferTask: TaskEither.TaskEither<
|
||||
SourceEqualsTarget,
|
||||
'source_and_target_differ'
|
||||
> = TaskEither.fromEither(
|
||||
sourceIndex === targetIndex
|
||||
? Either.left({ type: 'source_equals_target' as const, index: sourceIndex })
|
||||
: Either.right('source_and_target_differ' as const)
|
||||
);
|
||||
|
||||
return pipe(
|
||||
assertSourceAndTargetDifferTask,
|
||||
TaskEither.chainW(() => setWriteBlock({ client, index: sourceIndex, timeout }))
|
||||
);
|
||||
};
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { pipe } from 'fp-ts/lib/function';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
import { omit } from 'lodash';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { pipe } from 'fp-ts/lib/function';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { IndexMapping, VirtualVersionMap } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import { diffMappings } from '../core/diff_mappings';
|
||||
|
|
|
@ -1645,6 +1645,15 @@ describe('migrations v2 model', () => {
|
|||
expect(newState.retryCount).toEqual(0);
|
||||
expect(newState.retryDelay).toEqual(0);
|
||||
});
|
||||
test('SET_SOURCE_WRITE_BLOCK -> REFRESH_TARGET if source index matches target index', () => {
|
||||
const index = `.kibana_${setWriteBlockState.kibanaVersion}_001`;
|
||||
const res: ResponseType<'SET_SOURCE_WRITE_BLOCK'> = Either.left({
|
||||
type: 'source_equals_target' as const,
|
||||
index,
|
||||
});
|
||||
const newState = model(setWriteBlockState, res);
|
||||
expect(newState.controlState).toEqual('REFRESH_TARGET');
|
||||
});
|
||||
});
|
||||
|
||||
describe('CALCULATE_EXCLUDE_FILTERS', () => {
|
||||
|
|
|
@ -736,6 +736,14 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
...stateP,
|
||||
controlState: 'CALCULATE_EXCLUDE_FILTERS',
|
||||
};
|
||||
} else if (isTypeof(res.left, 'source_equals_target')) {
|
||||
// As part of a reindex-migration, we wanted to block the source index to prevent updates
|
||||
// However, this migrator's source index matches the target index.
|
||||
// Thus, another instance's migrator is ahead of us. We skip the clone steps and continue the flow
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'REFRESH_TARGET',
|
||||
};
|
||||
} else if (isTypeof(res.left, 'index_not_found_exception')) {
|
||||
// We don't handle the following errors as the migration algorithm
|
||||
// will never cause them to occur:
|
||||
|
|
|
@ -125,7 +125,11 @@ export const nextActionMap = (
|
|||
knownTypes: state.knownTypes,
|
||||
}),
|
||||
SET_SOURCE_WRITE_BLOCK: (state: SetSourceWriteBlockState) =>
|
||||
Actions.setWriteBlock({ client, index: state.sourceIndex.value }),
|
||||
Actions.safeWriteBlock({
|
||||
client,
|
||||
sourceIndex: state.sourceIndex.value,
|
||||
targetIndex: state.targetIndex,
|
||||
}),
|
||||
CALCULATE_EXCLUDE_FILTERS: (state: CalculateExcludeFiltersState) =>
|
||||
Actions.calculateExcludeFilters({
|
||||
client,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue