[Migrations] Update all aliases with a single updateAliases() when relocating SO documents (#158940)

Fixes https://github.com/elastic/kibana/issues/158733

The goal of this modification is to enforce migrators of all indices
involved in a relocation (e.g. as part of the [dot kibana
split](https://github.com/elastic/kibana/issues/104081)) to create the
index aliases in the same `updateAliases()` call.

This way, either:
* all the indices involved in the [dot kibana
split](https://github.com/elastic/kibana/issues/104081) relocation will
be completely upgraded (with the appropriate aliases).
* or none of them will.
This commit is contained in:
Gerard Soldevila 2023-06-05 10:19:24 +02:00 committed by GitHub
parent 9bf4ad7d9f
commit 94fb44ae0c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 333 additions and 193 deletions

View file

@ -12,7 +12,6 @@ import { pipe } from 'fp-ts/lib/pipeable';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { AcknowledgeResponse } from '.';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
@ -46,6 +45,9 @@ export interface CreateIndexParams {
aliases?: string[];
timeout?: string;
}
export type CreateIndexSuccessResponse = 'create_index_succeeded' | 'index_already_exists';
/**
* Creates an index with the given mappings
*
@ -64,11 +66,11 @@ export const createIndex = ({
timeout = DEFAULT_TIMEOUT,
}: CreateIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
'create_index_succeeded'
CreateIndexSuccessResponse
> => {
const createIndexTask: TaskEither.TaskEither<
RetryableEsClientError | ClusterShardLimitExceeded,
AcknowledgeResponse
CreateIndexSuccessResponse
> = () => {
const aliasesObject = aliasArrayToRecord(aliases);
@ -103,31 +105,12 @@ export const createIndex = ({
},
},
})
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
* updated on all nodes with the newly created index, but it
* probably will be created sometime soon.
* - shards_acknowledged=false, we timed out before all shards were
* started
* - acknowledged=true, shards_acknowledged=true, index creation complete
*/
return Either.right({
acknowledged: Boolean(res.acknowledged),
shardsAcknowledged: res.shards_acknowledged,
});
.then(() => {
return Either.right('create_index_succeeded' as const);
})
.catch((error) => {
if (error?.body?.error?.type === 'resource_already_exists_exception') {
/**
* If the target index already exists it means a previous create
* operation had already been started. However, we can't be sure
* that all shards were started so return shardsAcknowledged: false
*/
return Either.right({
acknowledged: true,
shardsAcknowledged: false,
});
return Either.right('index_already_exists' as const);
} else if (isClusterShardLimitExceeded(error?.body?.error)) {
return Either.left({
type: 'cluster_shard_limit_exceeded' as const,
@ -143,11 +126,12 @@ export const createIndex = ({
createIndexTask,
TaskEither.chain<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
AcknowledgeResponse,
'create_index_succeeded'
CreateIndexSuccessResponse,
CreateIndexSuccessResponse
>((res) => {
// Systematicaly wait until the target index has a 'green' status meaning
// the primary (and on multi node clusters) the replica has been started
// When the index status is 'green' we know that all shards were started
// see https://github.com/elastic/kibana/issues/157968
return pipe(
waitForIndexStatus({
@ -156,10 +140,7 @@ export const createIndex = ({
timeout: DEFAULT_TIMEOUT,
status: 'green',
}),
TaskEither.map(() => {
/** When the index status is 'green' we know that all shards were started */
return 'create_index_succeeded';
})
TaskEither.map(() => res)
);
})
);

View file

@ -106,7 +106,8 @@ export {
import type { UnknownDocsFound } from './check_for_unknown_docs';
import type { IncompatibleClusterRoutingAllocation } from './initialize_action';
import { ClusterShardLimitExceeded } from './create_index';
import type { ClusterShardLimitExceeded } from './create_index';
import type { SynchronizationFailed } from './synchronize_migrators';
export type {
CheckForUnknownDocsParams,
@ -174,6 +175,7 @@ export interface ActionErrorTypeMap {
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
es_response_too_large: EsResponseTooLargeError;
synchronization_failed: SynchronizationFailed;
}
/**

View file

@ -6,38 +6,36 @@
* Side Public License, v 1.
*/
import { synchronizeMigrators } from './synchronize_migrators';
import { type Defer, defer } from '../kibana_migrator_utils';
import { type WaitGroup, waitGroup as createWaitGroup } from '../kibana_migrator_utils';
describe('synchronizeMigrators', () => {
let defers: Array<Defer<void>>;
let allDefersPromise: Promise<any>;
let migratorsDefers: Array<Defer<void>>;
let waitGroups: Array<WaitGroup<void>>;
let allWaitGroupsPromise: Promise<any>;
let migratorsWaitGroups: Array<WaitGroup<void>>;
beforeEach(() => {
jest.clearAllMocks();
defers = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(defer);
allDefersPromise = Promise.all(defers.map(({ promise }) => promise));
waitGroups = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(createWaitGroup);
allWaitGroupsPromise = Promise.all(waitGroups.map(({ promise }) => promise));
migratorsDefers = defers.map(({ resolve, reject }) => ({
migratorsWaitGroups = waitGroups.map(({ resolve, reject }) => ({
resolve: jest.fn(resolve),
reject: jest.fn(reject),
promise: allDefersPromise,
promise: allWaitGroupsPromise,
}));
});
describe('when all migrators reach the synchronization point with a correct state', () => {
it('unblocks all migrators and resolves Right', async () => {
const tasks = migratorsDefers.map((migratorDefer) => synchronizeMigrators(migratorDefer));
const tasks = migratorsWaitGroups.map((waitGroup) => synchronizeMigrators({ waitGroup }));
const res = await Promise.all(tasks.map((task) => task()));
migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.resolve).toHaveBeenCalledTimes(1)
);
migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.reject).not.toHaveBeenCalled()
migratorsWaitGroups.forEach((waitGroup) =>
expect(waitGroup.resolve).toHaveBeenCalledTimes(1)
);
migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled());
expect(res).toEqual([
{ _tag: 'Right', right: 'synchronized_successfully' },
@ -48,13 +46,11 @@ describe('synchronizeMigrators', () => {
it('migrators are not unblocked until the last one reaches the synchronization point', async () => {
let resolved: number = 0;
migratorsDefers.forEach((migratorDefer) => migratorDefer.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
migratorsWaitGroups.forEach((waitGroup) => waitGroup.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;
// we simulate that only kibana_task_manager and kibana migrators get to the sync point
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));
// we don't await for them, or we would be locked forever
Promise.all(tasks.map((task) => task()));
@ -65,7 +61,7 @@ describe('synchronizeMigrators', () => {
expect(resolved).toEqual(0);
// finally, the last migrator gets to the synchronization point
await synchronizeMigrators(casesDefer)();
await synchronizeMigrators({ waitGroup: casesDefer })();
expect(resolved).toEqual(3);
});
});
@ -75,31 +71,29 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;
// we first make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');
// the other migrators then try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));
expect(Promise.all(tasks.map((task) => task()))).resolves.toEqual([
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
@ -116,15 +110,13 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;
// some migrators try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));
// we then make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');
@ -133,14 +125,14 @@ describe('synchronizeMigrators', () => {
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},

View file

@ -8,20 +8,33 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { Defer } from '../kibana_migrator_utils';
import type { WaitGroup } from '../kibana_migrator_utils';
export interface SyncFailed {
type: 'sync_failed';
/** @internal */
export interface SynchronizationFailed {
type: 'synchronization_failed';
error: Error;
}
export function synchronizeMigrators(
defer: Defer<void>
): TaskEither.TaskEither<SyncFailed, 'synchronized_successfully'> {
/** @internal */
export interface SynchronizeMigratorsParams<T, U> {
waitGroup: WaitGroup<T>;
thenHook?: (res: any) => Either.Right<U>;
payload?: T;
}
export function synchronizeMigrators<T, U>({
waitGroup,
payload,
thenHook = () =>
Either.right(
'synchronized_successfully' as const
) as Either.Right<'synchronized_successfully'> as unknown as Either.Right<U>,
}: SynchronizeMigratorsParams<T, U>): TaskEither.TaskEither<SynchronizationFailed, U> {
return () => {
defer.resolve();
return defer.promise
.then(() => Either.right('synchronized_successfully' as const))
.catch((error) => Either.left({ type: 'sync_failed' as const, error }));
waitGroup.resolve(payload);
return waitGroup.promise
.then((res) => (thenHook ? thenHook(res) : res))
.catch((error) => Either.left({ type: 'synchronization_failed' as const, error }));
};
}

View file

@ -17,16 +17,16 @@ import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import { loggerMock } from '@kbn/logging-mocks';
import {
calculateTypeStatuses,
createMultiPromiseDefer,
createWaitGroupMap,
getCurrentIndexTypesMap,
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
} from './kibana_migrator_utils';
import { INDEX_MAP_BEFORE_SPLIT } from './kibana_migrator_utils.fixtures';
describe('createMultiPromiseDefer', () => {
describe('createWaitGroupMap', () => {
it('creates defer objects with the same Promise', () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
expect(Object.keys(defers)).toHaveLength(2);
expect(defers['.kibana'].promise).toEqual(defers['.kibana_cases'].promise);
expect(defers['.kibana'].resolve).not.toEqual(defers['.kibana_cases'].resolve);
@ -34,7 +34,7 @@ describe('createMultiPromiseDefer', () => {
});
it('the common Promise resolves when all defers resolve', async () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
let resolved = 0;
Object.values(defers).forEach((defer) => defer.promise.then(() => ++resolved));
defers['.kibana'].resolve();

View file

@ -15,8 +15,8 @@ import { TypeStatus, type TypeStatusDetails } from './kibana_migrator_constants'
// even though this utility class is present in @kbn/kibana-utils-plugin, we can't easily import it from Core
// aka. one does not simply reuse code
export class Defer<T> {
public resolve!: (data: T) => void;
class Defer<T> {
public resolve!: (data?: T) => void;
public reject!: (error: any) => void;
public promise: Promise<any> = new Promise<any>((resolve, reject) => {
(this as any).resolve = resolve;
@ -24,12 +24,26 @@ export class Defer<T> {
});
}
export const defer = () => new Defer<void>();
export type WaitGroup<T> = Defer<T>;
export function createMultiPromiseDefer(indices: string[]): Record<string, Defer<void>> {
const defers: Array<Defer<void>> = indices.map(defer);
const all = Promise.all(defers.map(({ promise }) => promise));
return indices.reduce<Record<string, Defer<any>>>((acc, indexName, i) => {
export function waitGroup<T>(): WaitGroup<T> {
return new Defer<T>();
}
export function createWaitGroupMap<T, U>(
keys: string[],
thenHook: (res: T[]) => U = (res) => res as unknown as U
): Record<string, WaitGroup<T>> {
if (!keys?.length) {
return {};
}
const defers: Array<WaitGroup<T>> = keys.map(() => waitGroup<T>());
// every member of the WaitGroup will wait for all members to resolve
const all = Promise.all(defers.map(({ promise }) => promise)).then(thenHook);
return keys.reduce<Record<string, WaitGroup<T>>>((acc, indexName, i) => {
const { resolve, reject } = defers[i];
acc[indexName] = { resolve, reject, promise: all };
return acc;
@ -87,7 +101,7 @@ export async function getIndicesInvolvedInRelocation({
defaultIndexTypesMap: IndexTypesMap;
logger: Logger;
}): Promise<string[]> {
const indicesWithMovingTypesSet = new Set<string>();
const indicesWithRelocatingTypesSet = new Set<string>();
const currentIndexTypesMap = await getCurrentIndexTypesMap({
client,
@ -106,11 +120,11 @@ export async function getIndicesInvolvedInRelocation({
Object.values(typeIndexDistribution)
.filter(({ status }) => status === TypeStatus.Moved)
.forEach(({ currentIndex, targetIndex }) => {
indicesWithMovingTypesSet.add(currentIndex!);
indicesWithMovingTypesSet.add(targetIndex!);
indicesWithRelocatingTypesSet.add(currentIndex!);
indicesWithRelocatingTypesSet.add(targetIndex!);
});
return Array.from(indicesWithMovingTypesSet);
return Array.from(indicesWithRelocatingTypesSet);
}
export function indexMapToIndexTypesMap(indexMap: IndexMap): IndexTypesMap {

View file

@ -19,6 +19,9 @@ import type { AliasAction, FetchIndexResponse } from '../actions';
import type { BulkIndexOperationTuple } from './create_batches';
import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state';
/** @internal */
export const REINDEX_TEMP_SUFFIX = '_reindex_temp';
/** @internal */
export type Aliases = Partial<Record<string, string>>;
@ -311,7 +314,7 @@ export function getMigrationType({
* @returns A temporary index name to reindex documents
*/
export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string =>
`${indexPrefix}_${kibanaVersion}_reindex_temp`;
`${indexPrefix}_${kibanaVersion}${REINDEX_TEMP_SUFFIX}`;
/** Increase batchSize by 20% until a maximum of maxBatchSize */
export const increaseBatchSize = (

View file

@ -1791,7 +1791,7 @@ describe('migrations v2 model', () => {
test('READY_TO_REINDEX_SYNC -> FATAL if the synchronization between migrators fails', () => {
const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.left({
type: 'sync_failed',
type: 'synchronization_failed',
error: new Error('Other migrators failed to reach the synchronization point'),
});
const newState = model(state, res);
@ -2052,7 +2052,7 @@ describe('migrations v2 model', () => {
});
test('DONE_REINDEXING_SYNC -> FATAL if the synchronization between migrators fails', () => {
const res: ResponseType<'DONE_REINDEXING_SYNC'> = Either.left({
type: 'sync_failed',
type: 'synchronization_failed',
error: new Error('Other migrators failed to reach the synchronization point'),
});
const newState = model(state, res);
@ -2971,10 +2971,10 @@ describe('migrations v2 model', () => {
sourceIndex: Option.none as Option.None,
targetIndex: '.kibana_7.11.0_001',
};
test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY', () => {
test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS', () => {
const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded');
const newState = model(createNewTargetState, res);
expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY');
expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
@ -2988,7 +2988,7 @@ describe('migrations v2 model', () => {
expect(newState.retryCount).toEqual(1);
expect(newState.retryDelay).toEqual(2000);
});
test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY resets the retry count and delay', () => {
test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS resets the retry count and delay', () => {
const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded');
const testState = {
...createNewTargetState,
@ -2997,7 +2997,7 @@ describe('migrations v2 model', () => {
};
const newState = model(testState, res);
expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY');
expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});

View file

@ -46,6 +46,7 @@ import {
increaseBatchSize,
hasLaterVersionAlias,
aliasVersion,
REINDEX_TEMP_SUFFIX,
} from './helpers';
import { buildTempIndexMap, createBatches } from './create_batches';
import type { MigrationLog } from '../types';
@ -86,7 +87,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
const retryErrorMessage = `[${left.type}] Incompatible Elasticsearch cluster settings detected. Remove the persistent and transient Elasticsearch cluster setting 'cluster.routing.allocation.enable' or set it to a value of 'all' to allow migrations to proceed. Refer to ${stateP.migrationDocLinks.routingAllocationDisabled} for more information on how to resolve the issue.`;
return delayRetryState(stateP, retryErrorMessage, stateP.retryAttempts);
} else {
return throwBadResponse(stateP, left);
throwBadResponse(stateP, left);
}
} else if (Either.isRight(res)) {
// cluster routing allocation is enabled and we can continue with the migration as normal
@ -266,7 +267,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
};
}
} else {
return throwBadResponse(stateP, res);
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'WAIT_FOR_MIGRATION_COMPLETION') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -314,14 +315,14 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// If the write block failed because the index doesn't exist, it means
// another instance already completed the legacy pre-migration. Proceed
// to the next step.
if (isTypeof(res.left, 'index_not_found_exception')) {
const left = res.left;
if (isTypeof(left, 'index_not_found_exception')) {
return { ...stateP, controlState: 'LEGACY_CREATE_REINDEX_TARGET' };
} else {
// @ts-expect-error TS doesn't correctly narrow this type to never
return throwBadResponse(stateP, res);
throwBadResponse(stateP, left);
}
} else {
return throwBadResponse(stateP, res);
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'LEGACY_CREATE_REINDEX_TARGET') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -343,7 +344,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`,
};
} else {
return throwBadResponse(stateP, left);
throwBadResponse(stateP, left);
}
} else if (Either.isRight(res)) {
return {
@ -476,10 +477,10 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
const retryErrorMessage = `${left.message} Refer to ${stateP.migrationDocLinks.repeatedTimeoutRequests} for information on how to resolve the issue.`;
return delayRetryState(stateP, retryErrorMessage, stateP.retryAttempts);
} else {
return throwBadResponse(stateP, left);
throwBadResponse(stateP, left);
}
} else {
return throwBadResponse(stateP, res);
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'UPDATE_SOURCE_MAPPINGS_PROPERTIES') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -723,13 +724,11 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
...stateP,
controlState: 'CALCULATE_EXCLUDE_FILTERS',
};
} else if (isTypeof(res.left, 'index_not_found_exception')) {
} else {
// We don't handle the following errors as the migration algorithm
// will never cause them to occur:
// - index_not_found_exception
return throwBadResponse(stateP, res.left as never);
} else {
return throwBadResponse(stateP, res.left);
throwBadResponse(stateP, res.left as never);
}
} else if (stateP.controlState === 'CALCULATE_EXCLUDE_FILTERS') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -753,7 +752,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
],
};
} else {
return throwBadResponse(stateP, res);
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'CREATE_REINDEX_TEMP') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -788,7 +787,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`,
};
} else {
return throwBadResponse(stateP, left);
throwBadResponse(stateP, left);
}
} else {
// If the createIndex action receives an 'resource_already_exists_exception'
@ -813,14 +812,20 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
return { ...stateP, controlState: 'DONE_REINDEXING_SYNC' };
}
} else if (Either.isLeft(res)) {
return {
...stateP,
controlState: 'FATAL',
reason: 'An error occurred whilst waiting for other migrators to get to this step.',
throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem
};
const left = res.left;
if (isTypeof(left, 'synchronization_failed')) {
return {
...stateP,
controlState: 'FATAL',
reason: 'An error occurred whilst waiting for other migrators to get to this step.',
throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem
};
} else {
throwBadResponse(stateP, left);
}
} else {
return throwBadResponse(stateP, res as never);
throwBadResponse(stateP, res as never);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -954,14 +959,20 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
sourceIndexMappings: Option.none,
};
} else if (Either.isLeft(res)) {
return {
...stateP,
controlState: 'FATAL',
reason: 'An error occurred whilst waiting for other migrators to get to this step.',
throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem
};
const left = res.left;
if (isTypeof(left, 'synchronization_failed')) {
return {
...stateP,
controlState: 'FATAL',
reason: 'An error occurred whilst waiting for other migrators to get to this step.',
throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem
};
} else {
throwBadResponse(stateP, left);
}
} else {
return throwBadResponse(stateP, res as never);
throwBadResponse(stateP, res as never);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_TRANSFORM') {
// We follow a similar control flow as for
@ -1458,7 +1469,9 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// index.
return {
...stateP,
controlState: 'MARK_VERSION_INDEX_READY',
controlState: stateP.mustRelocateDocuments
? 'MARK_VERSION_INDEX_READY_SYNC'
: 'MARK_VERSION_INDEX_READY',
versionIndexReadyActions: stateP.versionIndexReadyActions,
};
} else {
@ -1474,9 +1487,19 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
} else if (stateP.controlState === 'CREATE_NEW_TARGET') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
if (res.right === 'index_already_exists') {
// We were supposed to be on a "fresh deployment" state (we did not find any aliases)
// but the target index already exists. Assume it can be from a previous upgrade attempt that:
// - managed to clone ..._reindex_temp into target
// - but did NOT finish the process (aka did not get to update the index aliases)
return {
...stateP,
controlState: 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT',
};
}
return {
...stateP,
controlState: 'MARK_VERSION_INDEX_READY',
controlState: 'CHECK_VERSION_INDEX_READY_ACTIONS',
};
} else if (Either.isLeft(res)) {
const left = res.left;
@ -1495,7 +1518,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`,
};
} else {
return throwBadResponse(stateP, left);
throwBadResponse(stateP, left);
}
} else {
// If the createIndex action receives an 'resource_already_exists_exception'
@ -1503,7 +1526,10 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// left responses to handle here.
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'MARK_VERSION_INDEX_READY') {
} else if (
stateP.controlState === 'MARK_VERSION_INDEX_READY' ||
stateP.controlState === 'MARK_VERSION_INDEX_READY_SYNC'
) {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
return { ...stateP, controlState: 'DONE' };
@ -1516,7 +1542,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// migration from the same source.
return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' };
} else if (isTypeof(left, 'index_not_found_exception')) {
if (left.index === stateP.tempIndex) {
if (left.index.endsWith(REINDEX_TEMP_SUFFIX)) {
// another instance has already completed the migration and deleted
// the temporary index
return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' };
@ -1531,6 +1557,13 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// cause it to occur (this error is only relevant to the LEGACY_DELETE
// step).
throwBadResponse(stateP, left as never);
} else if (isTypeof(left, 'synchronization_failed')) {
return {
...stateP,
controlState: 'FATAL',
reason: 'An error occurred whilst waiting for other migrators to get to this step.',
throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem
};
} else {
throwBadResponse(stateP, left);
}
@ -1584,6 +1617,6 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// The state-action machine will never call the model in the terminating states
throwBadControlState(stateP as never);
} else {
return throwBadControlState(stateP);
throwBadControlState(stateP);
}
};

View file

@ -7,7 +7,7 @@
*/
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { defer } from './kibana_migrator_utils';
import { waitGroup } from './kibana_migrator_utils';
import { next } from './next';
import type { State } from './state';
@ -15,12 +15,24 @@ describe('migrations v2 next', () => {
it.todo('when state.retryDelay > 0 delays execution of the next action');
it('DONE returns null', () => {
const state = { controlState: 'DONE' } as State;
const action = next({} as ElasticsearchClient, (() => {}) as any, defer(), defer())(state);
const action = next(
{} as ElasticsearchClient,
(() => {}) as any,
waitGroup(),
waitGroup(),
waitGroup()
)(state);
expect(action).toEqual(null);
});
it('FATAL returns null', () => {
const state = { controlState: 'FATAL', reason: '' } as State;
const action = next({} as ElasticsearchClient, (() => {}) as any, defer(), defer())(state);
const action = next(
{} as ElasticsearchClient,
(() => {}) as any,
waitGroup(),
waitGroup(),
waitGroup()
)(state);
expect(action).toEqual(null);
});
});

View file

@ -9,7 +9,7 @@
import * as Option from 'fp-ts/lib/Option';
import { omit } from 'lodash';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { Defer } from './kibana_migrator_utils';
import type { WaitGroup } from './kibana_migrator_utils';
import type {
AllActionStates,
CalculateExcludeFiltersState,
@ -72,8 +72,9 @@ export type ResponseType<ControlState extends AllActionStates> = Awaited<
export const nextActionMap = (
client: ElasticsearchClient,
transformRawDocs: TransformRawDocs,
readyToReindex: Defer<void>,
doneReindexing: Defer<void>
readyToReindex: WaitGroup<void>,
doneReindexing: WaitGroup<void>,
updateRelocationAliases: WaitGroup<Actions.AliasAction[]>
) => {
return {
INIT: (state: InitState) =>
@ -142,7 +143,10 @@ export const nextActionMap = (
aliases: [state.tempIndexAlias],
mappings: state.tempIndexMappings,
}),
READY_TO_REINDEX_SYNC: () => Actions.synchronizeMigrators(readyToReindex),
READY_TO_REINDEX_SYNC: () =>
Actions.synchronizeMigrators({
waitGroup: readyToReindex,
}),
REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) =>
Actions.openPit({ client, index: state.sourceIndex.value }),
REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) =>
@ -181,7 +185,10 @@ export const nextActionMap = (
*/
refresh: false,
}),
DONE_REINDEXING_SYNC: () => Actions.synchronizeMigrators(doneReindexing),
DONE_REINDEXING_SYNC: () =>
Actions.synchronizeMigrators({
waitGroup: doneReindexing,
}),
SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) =>
Actions.setWriteBlock({ client, index: state.tempIndex }),
CLONE_TEMP_TO_TARGET: (state: CloneTempToTarget) =>
@ -249,6 +256,12 @@ export const nextActionMap = (
}),
MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) =>
Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }),
MARK_VERSION_INDEX_READY_SYNC: (state: MarkVersionIndexReady) =>
Actions.synchronizeMigrators({
waitGroup: updateRelocationAliases,
payload: state.versionIndexReadyActions.value,
thenHook: (res) => res,
}),
MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) =>
Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }),
LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) =>
@ -279,10 +292,17 @@ export const nextActionMap = (
export const next = (
client: ElasticsearchClient,
transformRawDocs: TransformRawDocs,
readyToReindex: Defer<void>,
doneReindexing: Defer<void>
readyToReindex: WaitGroup<void>,
doneReindexing: WaitGroup<void>,
updateRelocationAliases: WaitGroup<Actions.AliasAction[]>
) => {
const map = nextActionMap(client, transformRawDocs, readyToReindex, doneReindexing);
const map = nextActionMap(
client,
transformRawDocs,
readyToReindex,
doneReindexing,
updateRelocationAliases
);
return (state: State) => {
const delay = createDelayFn(state);

View file

@ -13,7 +13,7 @@ import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-m
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import type { MigrationResult } from '@kbn/core-saved-objects-base-server-internal';
import { createInitialState } from './initial_state';
import { Defer } from './kibana_migrator_utils';
import { waitGroup } from './kibana_migrator_utils';
import { migrationStateActionMachine } from './migrations_state_action_machine';
import { next } from './next';
import { runResilientMigrator, type RunResilientMigratorParams } from './run_resilient_migrator';
@ -128,8 +128,9 @@ const mockOptions = (): RunResilientMigratorParams => {
},
},
},
readyToReindex: new Defer(),
doneReindexing: new Defer(),
readyToReindex: waitGroup(),
doneReindexing: waitGroup(),
updateRelocationAliases: waitGroup(),
logger,
transformRawDocs: jest.fn(),
preMigrationScript: "ctx._id = ctx._source.type + ':' + ctx._id",

View file

@ -17,7 +17,7 @@ import type {
MigrationResult,
IndexTypesMap,
} from '@kbn/core-saved-objects-base-server-internal';
import type { Defer } from './kibana_migrator_utils';
import type { WaitGroup } from './kibana_migrator_utils';
import type { TransformRawDocs } from './types';
import { next } from './next';
import { model } from './model';
@ -25,6 +25,7 @@ import { createInitialState } from './initial_state';
import { migrationStateActionMachine } from './migrations_state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import type { State } from './state';
import type { AliasAction } from './actions';
/**
* To avoid the Elasticsearch-js client aborting our requests before we
@ -48,8 +49,9 @@ export interface RunResilientMigratorParams {
indexTypesMap: IndexTypesMap;
targetMappings: IndexMapping;
preMigrationScript?: string;
readyToReindex: Defer<any>;
doneReindexing: Defer<any>;
readyToReindex: WaitGroup<void>;
doneReindexing: WaitGroup<void>;
updateRelocationAliases: WaitGroup<AliasAction[]>;
logger: Logger;
transformRawDocs: TransformRawDocs;
coreMigrationVersionPerType: SavedObjectsMigrationVersion;
@ -76,6 +78,7 @@ export async function runResilientMigrator({
preMigrationScript,
readyToReindex,
doneReindexing,
updateRelocationAliases,
transformRawDocs,
coreMigrationVersionPerType,
migrationVersionPerType,
@ -103,7 +106,13 @@ export async function runResilientMigrator({
return migrationStateActionMachine({
initialState,
logger,
next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing),
next: next(
migrationClient,
transformRawDocs,
readyToReindex,
doneReindexing,
updateRelocationAliases
),
model,
abort: async (state?: State) => {
// At this point, we could reject this migrator's defers and unblock other migrators

View file

@ -22,8 +22,8 @@ import { buildTypesMappings, createIndexMap } from './core';
import {
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
createMultiPromiseDefer,
Defer,
createWaitGroupMap,
waitGroup,
} from './kibana_migrator_utils';
import { runResilientMigrator } from './run_resilient_migrator';
import { indexTypesMapMock, savedObjectTypeRegistryMock } from './run_resilient_migrator.fixtures';
@ -41,7 +41,7 @@ jest.mock('./kibana_migrator_utils', () => {
return {
...actual,
indexMapToIndexTypesMap: jest.fn(actual.indexMapToIndexTypesMap),
createMultiPromiseDefer: jest.fn(actual.createMultiPromiseDefer),
createWaitGroupMap: jest.fn(actual.createWaitGroupMap),
getIndicesInvolvedInRelocation: jest.fn(() => Promise.resolve(['.my_index', '.other_index'])),
};
});
@ -79,9 +79,7 @@ const mockCreateIndexMap = createIndexMap as jest.MockedFunction<typeof createIn
const mockIndexMapToIndexTypesMap = indexMapToIndexTypesMap as jest.MockedFunction<
typeof indexMapToIndexTypesMap
>;
const mockCreateMultiPromiseDefer = createMultiPromiseDefer as jest.MockedFunction<
typeof createMultiPromiseDefer
>;
const mockCreateWaitGroupMap = createWaitGroupMap as jest.MockedFunction<typeof createWaitGroupMap>;
const mockGetIndicesInvolvedInRelocation = getIndicesInvolvedInRelocation as jest.MockedFunction<
typeof getIndicesInvolvedInRelocation
>;
@ -93,7 +91,7 @@ describe('runV2Migration', () => {
beforeEach(() => {
mockCreateIndexMap.mockClear();
mockIndexMapToIndexTypesMap.mockClear();
mockCreateMultiPromiseDefer.mockClear();
mockCreateWaitGroupMap.mockClear();
mockGetIndicesInvolvedInRelocation.mockClear();
mockRunResilientMigrator.mockClear();
});
@ -143,9 +141,14 @@ describe('runV2Migration', () => {
const options = mockOptions();
options.documentMigrator.prepareMigrations();
await runV2Migration(options);
expect(createMultiPromiseDefer).toBeCalledTimes(2);
expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']);
expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']);
expect(mockCreateWaitGroupMap).toBeCalledTimes(3);
expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']);
expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']);
expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(
3,
['.my_index', '.other_index'],
expect.any(Function) // we expect to receive a method to update all aliases in this hook
);
});
it('calls runResilientMigrator for each migrator it must spawn', async () => {
@ -168,6 +171,7 @@ describe('runV2Migration', () => {
mustRelocateDocuments: true,
readyToReindex: expect.any(Object),
doneReindexing: expect.any(Object),
updateRelocationAliases: expect.any(Object),
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
@ -178,6 +182,7 @@ describe('runV2Migration', () => {
mustRelocateDocuments: true,
readyToReindex: expect.any(Object),
doneReindexing: expect.any(Object),
updateRelocationAliases: expect.any(Object),
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
@ -188,14 +193,15 @@ describe('runV2Migration', () => {
mustRelocateDocuments: false,
readyToReindex: undefined,
doneReindexing: undefined,
updateRelocationAliases: undefined,
})
);
});
it('awaits on all runResilientMigrator promises, and resolves with the results of each of them', async () => {
const myIndexMigratorDefer = new Defer<MigrationResult>();
const otherIndexMigratorDefer = new Defer();
const taskIndexMigratorDefer = new Defer();
const myIndexMigratorDefer = waitGroup<MigrationResult>();
const otherIndexMigratorDefer = waitGroup();
const taskIndexMigratorDefer = waitGroup();
let migrationResults: MigrationResult[] | undefined;
mockRunResilientMigrator.mockReturnValueOnce(myIndexMigratorDefer.promise);

View file

@ -24,12 +24,14 @@ import Semver from 'semver';
import type { DocumentMigrator } from './document_migrator';
import { buildActiveMappings, createIndexMap } from './core';
import {
createMultiPromiseDefer,
createWaitGroupMap,
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
} from './kibana_migrator_utils';
import { runResilientMigrator } from './run_resilient_migrator';
import { migrateRawDocsSafely } from './core/migrate_raw_docs';
import type { AliasAction } from './actions/update_aliases';
import { updateAliases } from './actions';
export interface RunV2MigrationOpts {
/** The current Kibana version */
@ -74,7 +76,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise<Migra
options.logger.debug(`migrationVersion: ${migrationVersion} saved object type: ${type}`);
});
// build a indexTypesMap from the info present in tye typeRegistry, e.g.:
// build a indexTypesMap from the info present in the typeRegistry, e.g.:
// {
// '.kibana': ['typeA', 'typeB', ...]
// '.kibana_task_manager': ['task', ...]
@ -85,7 +87,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise<Migra
// compare indexTypesMap with the one present (or not) in the .kibana index meta
// and check if some SO types have been moved to different indices
const indicesWithMovingTypes = await getIndicesInvolvedInRelocation({
const indicesWithRelocatingTypes = await getIndicesInvolvedInRelocation({
mainIndex: options.kibanaIndexPrefix,
client: options.elasticsearchClient,
indexTypesMap,
@ -93,27 +95,36 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise<Migra
defaultIndexTypesMap: options.defaultIndexTypesMap,
});
// we create 2 synchronization objects (2 synchronization points) for each of the
// we create synchronization objects (synchronization points) for each of the
// migrators involved in relocations, aka each of the migrators that will:
// A) reindex some documents TO other indices
// B) receive some documents FROM other indices
// C) both
const readyToReindexDefers = createMultiPromiseDefer(indicesWithMovingTypes);
const doneReindexingDefers = createMultiPromiseDefer(indicesWithMovingTypes);
const readyToReindexWaitGroupMap = createWaitGroupMap(indicesWithRelocatingTypes);
const doneReindexingWaitGroupMap = createWaitGroupMap(indicesWithRelocatingTypes);
const updateAliasesWaitGroupMap = createWaitGroupMap<AliasAction[], Promise<any>>(
indicesWithRelocatingTypes,
(allAliasActions) =>
updateAliases({
client: options.elasticsearchClient,
aliasActions: allAliasActions.flat(),
})()
);
// build a list of all migrators that must be started
const migratorIndices = new Set(Object.keys(indexMap));
// indices involved in a relocation might no longer be present in current mappings
// the types in indices involved in relocation might not have mappings in the current mappings anymore
// but if their SOs must be relocated to another index, we still need a migrator to do the job
indicesWithMovingTypes.forEach((index) => migratorIndices.add(index));
indicesWithRelocatingTypes.forEach((index) => migratorIndices.add(index));
const migrators = Array.from(migratorIndices).map((indexName, i) => {
return {
migrate: (): Promise<MigrationResult> => {
const readyToReindex = readyToReindexDefers[indexName];
const doneReindexing = doneReindexingDefers[indexName];
const readyToReindex = readyToReindexWaitGroupMap[indexName];
const doneReindexing = doneReindexingWaitGroupMap[indexName];
const updateRelocationAliases = updateAliasesWaitGroupMap[indexName];
// check if this migrator's index is involved in some document redistribution
const mustRelocateDocuments = !!readyToReindex;
const mustRelocateDocuments = indicesWithRelocatingTypes.includes(indexName);
return runResilientMigrator({
client: options.elasticsearchClient,
@ -127,6 +138,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise<Migra
preMigrationScript: indexMap[indexName]?.script,
readyToReindex,
doneReindexing,
updateRelocationAliases,
transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) =>
migrateRawDocsSafely({
serializer: options.serializer,

View file

@ -459,6 +459,14 @@ export interface MarkVersionIndexReady extends PostInitState {
readonly versionIndexReadyActions: Option.Some<AliasAction[]>;
}
export interface MarkVersionIndexReadySync extends PostInitState {
/** Single "client.indices.updateAliases" operation
* to update multiple indices' aliases simultaneously
* */
readonly controlState: 'MARK_VERSION_INDEX_READY_SYNC';
readonly versionIndexReadyActions: Option.Some<AliasAction[]>;
}
export interface MarkVersionIndexReadyConflict extends PostInitState {
/**
* If the MARK_VERSION_INDEX_READY step fails another instance was
@ -541,6 +549,7 @@ export type State = Readonly<
| LegacyReindexWaitForTaskState
| LegacySetWriteBlockState
| MarkVersionIndexReady
| MarkVersionIndexReadySync
| MarkVersionIndexReadyConflict
| OutdatedDocumentsRefresh
| OutdatedDocumentsSearchClosePit

View file

@ -1876,7 +1876,7 @@ describe('migration actions', () => {
expect(res).toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "create_index_succeeded",
"right": "index_already_exists",
}
`);
});

View file

@ -12,6 +12,7 @@ import {
type ISavedObjectTypeRegistry,
type SavedObjectsType,
MAIN_SAVED_OBJECT_INDEX,
ALL_SAVED_OBJECT_INDICES,
} from '@kbn/core-saved-objects-server';
import { DEFAULT_INDEX_TYPES_MAP } from '@kbn/core-saved-objects-base-server-internal';
import {
@ -376,8 +377,8 @@ describe('split .kibana index into multiple system indices', () => {
`[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES -> UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK.`,
`[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_META.`,
`[${index}] UPDATE_TARGET_MAPPINGS_META -> CHECK_VERSION_INDEX_READY_ACTIONS.`,
`[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY.`,
`[${index}] MARK_VERSION_INDEX_READY -> DONE.`,
`[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY_SYNC.`,
`[${index}] MARK_VERSION_INDEX_READY_SYNC -> DONE.`,
`[${index}] Migration completed after`,
],
{ ordered: true }
@ -395,7 +396,6 @@ describe('split .kibana index into multiple system indices', () => {
const { runMigrations } = await migratorTestKitFactory();
await clearLog(logFilePath);
await runMigrations();
const logs = await parseLogFile(logFilePath);
expect(logs).not.toContainLogEntries(['REINDEX', 'CREATE', 'UPDATE_TARGET_MAPPINGS']);
});
@ -407,6 +407,7 @@ describe('split .kibana index into multiple system indices', () => {
});
// FLAKY: https://github.com/elastic/kibana/issues/157510
// This test takes too long. Can be manually executed to verify the correct behavior.
describe.skip('when multiple Kibana migrators run in parallel', () => {
it('correctly migrates 7.7.2_xpack_100k_obj.zip archive', async () => {
esServer = await startElasticsearch({
@ -415,15 +416,29 @@ describe('split .kibana index into multiple system indices', () => {
const esClient = await getEsClient();
const breakdownBefore = await getAggregatedTypesCountAllIndices(esClient);
expect(breakdownBefore).toMatchSnapshot('before migration');
expect(breakdownBefore).toEqual({
'.kibana': {
'apm-telemetry': 1,
config: 1,
dashboard: 52994,
'index-pattern': 1,
search: 1,
space: 1,
'ui-metric': 5,
visualization: 53004,
},
'.kibana_task_manager': {
task: 5,
},
});
for (let i = 0; i < PARALLEL_MIGRATORS; ++i) {
await clearLog(Path.join(__dirname, `dot_kibana_split_instance_${i}.log`));
}
const testKits = await Promise.all(
new Array(PARALLEL_MIGRATORS)
.fill({
new Array(PARALLEL_MIGRATORS).fill(true).map((_, index) =>
getKibanaMigratorTestKit({
settings: {
migrations: {
discardUnknownObjects: currentVersion,
@ -432,13 +447,10 @@ describe('split .kibana index into multiple system indices', () => {
},
kibanaIndex: MAIN_SAVED_OBJECT_INDEX,
types: typeRegistry.getAllTypes(),
defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP,
logFilePath: Path.join(__dirname, `dot_kibana_split_instance_${index}.log`),
})
.map((config, index) =>
getKibanaMigratorTestKit({
...config,
logFilePath: Path.join(__dirname, `dot_kibana_split_instance_${index}.log`),
})
)
)
);
const results = await Promise.all(testKits.map((testKit) => testKit.runMigrations()));
@ -448,9 +460,30 @@ describe('split .kibana index into multiple system indices', () => {
.every((result) => result.status === 'migrated' || result.status === 'patched')
).toEqual(true);
await esClient.indices.refresh({ index: ALL_SAVED_OBJECT_INDICES });
const breakdownAfter = await getAggregatedTypesCountAllIndices(esClient);
expect(breakdownAfter).toMatchSnapshot('after migration');
});
expect(breakdownAfter).toEqual({
'.kibana': {
'apm-telemetry': 1,
config: 1,
space: 1,
'ui-metric': 5,
},
'.kibana_alerting_cases': {},
'.kibana_analytics': {
dashboard: 52994,
'index-pattern': 1,
search: 1,
visualization: 53004,
},
'.kibana_ingest': {},
'.kibana_security_solution': {},
'.kibana_task_manager': {
task: 5,
},
});
}, 1200000);
afterEach(async () => {
await esServer?.stop();

View file

@ -161,7 +161,7 @@ describe('incompatible_cluster_routing_allocation', () => {
.map((str) => JSON5.parse(str)) as LogRecord[];
expect(
records.find((rec) => rec.message.includes('MARK_VERSION_INDEX_READY -> DONE'))
records.find((rec) => rec.message.includes('MARK_VERSION_INDEX_READY_SYNC -> DONE'))
).toBeDefined();
},
{ retryAttempts: 100, retryDelayMs: 500 }