Address PR#158940 remarks (#159132)

This PR addresses remarks and feedback from
https://github.com/elastic/kibana/pull/158940, which was part of an
emergency release.
This commit is contained in:
Gerard Soldevila 2023-06-07 11:31:22 +02:00 committed by GitHub
parent c994f40d86
commit f017e69660
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 127 additions and 71 deletions

View file

@ -38,9 +38,27 @@ describe('synchronizeMigrators', () => {
migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled());
expect(res).toEqual([
{ _tag: 'Right', right: 'synchronized_successfully' },
{ _tag: 'Right', right: 'synchronized_successfully' },
{ _tag: 'Right', right: 'synchronized_successfully' },
{
_tag: 'Right',
right: {
data: [undefined, undefined, undefined],
type: 'synchronization_successful',
},
},
{
_tag: 'Right',
right: {
data: [undefined, undefined, undefined],
type: 'synchronization_successful',
},
},
{
_tag: 'Right',
right: {
data: [undefined, undefined, undefined],
type: 'synchronization_successful',
},
},
]);
});

View file

@ -17,24 +17,28 @@ export interface SynchronizationFailed {
}
/** @internal */
export interface SynchronizeMigratorsParams<T, U> {
export interface SynchronizationSuccessful<T> {
type: 'synchronization_successful';
data: T[];
}
/** @internal */
export interface SynchronizeMigratorsParams<T> {
waitGroup: WaitGroup<T>;
thenHook?: (res: any) => Either.Right<U>;
payload?: T;
}
export function synchronizeMigrators<T, U>({
export function synchronizeMigrators<T>({
waitGroup,
payload,
thenHook = () =>
Either.right(
'synchronized_successfully' as const
) as Either.Right<'synchronized_successfully'> as unknown as Either.Right<U>,
}: SynchronizeMigratorsParams<T, U>): TaskEither.TaskEither<SynchronizationFailed, U> {
}: SynchronizeMigratorsParams<T>): TaskEither.TaskEither<
SynchronizationFailed,
SynchronizationSuccessful<T>
> {
return () => {
waitGroup.resolve(payload);
return waitGroup.promise
.then((res) => (thenHook ? thenHook(res) : res))
.then((data: T[]) => Either.right({ type: 'synchronization_successful' as const, data }))
.catch((error) => Either.left({ type: 'synchronization_failed' as const, error }));
};
}

View file

@ -30,10 +30,7 @@ export function waitGroup<T>(): WaitGroup<T> {
return new Defer<T>();
}
export function createWaitGroupMap<T, U>(
keys: string[],
thenHook: (res: T[]) => U = (res) => res as unknown as U
): Record<string, WaitGroup<T>> {
export function createWaitGroupMap<T>(keys: string[]): Record<string, WaitGroup<T>> {
if (!keys?.length) {
return {};
}
@ -41,7 +38,7 @@ export function createWaitGroupMap<T, U>(
const defers: Array<WaitGroup<T>> = keys.map(() => waitGroup<T>());
// every member of the WaitGroup will wait for all members to resolve
const all = Promise.all(defers.map(({ promise }) => promise)).then(thenHook);
const all = Promise.all(defers.map(({ promise }) => promise));
return keys.reduce<Record<string, WaitGroup<T>>>((acc, indexName, i) => {
const { resolve, reject } = defers[i];

View file

@ -1764,9 +1764,10 @@ describe('migrations v2 model', () => {
describe('if the migrator source index did NOT exist', () => {
test('READY_TO_REINDEX_SYNC -> DONE_REINDEXING_SYNC', () => {
const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right(
'synchronized_successfully' as const
);
const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right({
type: 'synchronization_successful' as const,
data: [],
});
const newState = model(state, res);
expect(newState.controlState).toEqual('DONE_REINDEXING_SYNC');
});
@ -1774,9 +1775,10 @@ describe('migrations v2 model', () => {
describe('if the migrator source index did exist', () => {
test('READY_TO_REINDEX_SYNC -> REINDEX_SOURCE_TO_TEMP_OPEN_PIT', () => {
const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right(
'synchronized_successfully' as const
);
const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right({
type: 'synchronization_successful' as const,
data: [],
});
const newState = model(
{
...state,
@ -2044,9 +2046,10 @@ describe('migrations v2 model', () => {
};
test('DONE_REINDEXING_SYNC -> SET_TEMP_WRITE_BLOCK if synchronization succeeds', () => {
const res: ResponseType<'DONE_REINDEXING_SYNC'> = Either.right(
'synchronized_successfully' as const
);
const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right({
type: 'synchronization_successful' as const,
data: [],
});
const newState = model(state, res);
expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK');
});
@ -2952,6 +2955,24 @@ describe('migrations v2 model', () => {
expect(newState.retryDelay).toEqual(0);
});
test('CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY_SYNC if mustRelocateDocuments === true', () => {
const versionIndexReadyActions = Option.some([
{ add: { index: 'kibana-index', alias: 'my-alias' } },
]);
const newState = model(
{
...сheckVersionIndexReadyActionsState,
mustRelocateDocuments: true,
versionIndexReadyActions,
},
res
) as PostInitState;
expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY_SYNC');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('CHECK_VERSION_INDEX_READY_ACTIONS -> DONE if none versionIndexReadyActions', () => {
const newState = model(сheckVersionIndexReadyActionsState, res) as PostInitState;
expect(newState.controlState).toEqual('DONE');

View file

@ -276,7 +276,6 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
if (
// If this version's migration has already been completed we can proceed
Either.isRight(aliasesRes) &&
// TODO check that this behaves correctly when skipping reindexing
versionMigrationCompleted(stateP.currentAlias, stateP.versionAlias, aliasesRes.right)
) {
return {
@ -637,11 +636,12 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
controlState: stateP.mustRefresh ? 'REFRESH_SOURCE' : 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT',
};
} else if (Either.isLeft(res)) {
const left = res.left;
// Note: if multiple newer Kibana versions are competing with each other to perform a migration,
// it might happen that another Kibana instance has deleted this instance's version index.
// NIT to handle this in properly, we'd have to add a PREPARE_COMPATIBLE_MIGRATION_CONFLICT step,
// similar to MARK_VERSION_INDEX_READY_CONFLICT.
if (isTypeof(res.left, 'alias_not_found_exception')) {
if (isTypeof(left, 'alias_not_found_exception')) {
// We assume that the alias was already deleted by another Kibana instance
return {
...stateP,
@ -649,8 +649,19 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
? 'REFRESH_SOURCE'
: 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT',
};
} else if (isTypeof(left, 'index_not_found_exception')) {
// We don't handle the following errors as the migration algorithm
// will never cause them to occur:
// - index_not_found_exception
throwBadResponse(stateP, left as never);
} else if (isTypeof(left, 'remove_index_not_a_concrete_index')) {
// We don't handle this error as the migration algorithm will never
// cause it to occur (this error is only relevant to the LEGACY_DELETE
// step).
throwBadResponse(stateP, left as never);
} else {
throwBadResponse(stateP, res.left as never);
// TODO update to handle 2 more cases
throwBadResponse(stateP, left);
}
} else {
throwBadResponse(stateP, res);
@ -724,11 +735,13 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
...stateP,
controlState: 'CALCULATE_EXCLUDE_FILTERS',
};
} else {
} else if (isTypeof(res.left, 'index_not_found_exception')) {
// We don't handle the following errors as the migration algorithm
// will never cause them to occur:
// - index_not_found_exception
throwBadResponse(stateP, res.left as never);
} else {
throwBadResponse(stateP, res.left);
}
} else if (stateP.controlState === 'CALCULATE_EXCLUDE_FILTERS') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -825,7 +838,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
throwBadResponse(stateP, left);
}
} else {
throwBadResponse(stateP, res as never);
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -972,7 +985,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
throwBadResponse(stateP, left);
}
} else {
throwBadResponse(stateP, res as never);
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_TRANSFORM') {
// We follow a similar control flow as for
@ -1052,7 +1065,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
};
} else {
// should never happen
throwBadResponse(stateP, res as never);
throwBadResponse(stateP, left);
}
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK') {
@ -1327,7 +1340,8 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
};
}
} else {
if (isTypeof(res.left, 'documents_transform_failed')) {
const left = res.left;
if (isTypeof(left, 'documents_transform_failed')) {
// continue to build up any more transformation errors before failing the migration.
return {
...stateP,
@ -1338,7 +1352,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
progress,
};
} else {
throwBadResponse(stateP, res as never);
throwBadResponse(stateP, left);
}
}
} else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') {
@ -1359,22 +1373,23 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
hasTransformedDocs: true,
};
} else {
if (isTypeof(res.left, 'request_entity_too_large_exception')) {
const left = res.left;
if (isTypeof(left, 'request_entity_too_large_exception')) {
return {
...stateP,
controlState: 'FATAL',
reason: FATAL_REASON_REQUEST_ENTITY_TOO_LARGE,
};
} else if (
isTypeof(res.left, 'target_index_had_write_block') ||
isTypeof(res.left, 'index_not_found_exception')
isTypeof(left, 'target_index_had_write_block') ||
isTypeof(left, 'index_not_found_exception')
) {
// we fail on these errors since the target index will never get
// deleted and should only have a write block if a newer version of
// Kibana started an upgrade
throwBadResponse(stateP, res.left as never);
throwBadResponse(stateP, left as never);
} else {
throwBadResponse(stateP, res.left);
throwBadResponse(stateP, left);
}
}
} else if (stateP.controlState === 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT') {
@ -1546,11 +1561,13 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// another instance has already completed the migration and deleted
// the temporary index
return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' };
} else {
} else if (isTypeof(left, 'index_not_found_exception')) {
// The migration algorithm will never cause a
// index_not_found_exception for an index other than the temporary
// index handled above.
throwBadResponse(stateP, left as never);
} else {
throwBadResponse(stateP, left);
}
} else if (isTypeof(left, 'remove_index_not_a_concrete_index')) {
// We don't handle this error as the migration algorithm will never

View file

@ -6,7 +6,9 @@
* Side Public License, v 1.
*/
import { pipe } from 'fp-ts/lib/pipeable';
import * as Option from 'fp-ts/lib/Option';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { omit } from 'lodash';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { WaitGroup } from './kibana_migrator_utils';
@ -257,11 +259,18 @@ export const nextActionMap = (
MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) =>
Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }),
MARK_VERSION_INDEX_READY_SYNC: (state: MarkVersionIndexReady) =>
Actions.synchronizeMigrators({
waitGroup: updateRelocationAliases,
payload: state.versionIndexReadyActions.value,
thenHook: (res) => res,
}),
pipe(
// First, we wait for all the migrators involved in a relocation to reach this point.
Actions.synchronizeMigrators<Actions.AliasAction[]>({
waitGroup: updateRelocationAliases,
payload: state.versionIndexReadyActions.value,
}),
// Then, all migrators will try to update all aliases (from all indices). Only the first one will succeed.
// The others will receive alias_not_found_exception and cause MARK_VERSION_INDEX_READY_CONFLICT (that's acceptable).
TaskEither.chainW(({ data }) =>
Actions.updateAliases({ client, aliasActions: data.flat() })
)
),
MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) =>
Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }),
LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) =>

View file

@ -144,11 +144,7 @@ describe('runV2Migration', () => {
expect(mockCreateWaitGroupMap).toBeCalledTimes(3);
expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']);
expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']);
expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(
3,
['.my_index', '.other_index'],
expect.any(Function) // we expect to receive a method to update all aliases in this hook
);
expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(3, ['.my_index', '.other_index']);
});
it('calls runResilientMigrator for each migrator it must spawn', async () => {

View file

@ -30,8 +30,6 @@ import {
} from './kibana_migrator_utils';
import { runResilientMigrator } from './run_resilient_migrator';
import { migrateRawDocsSafely } from './core/migrate_raw_docs';
import type { AliasAction } from './actions/update_aliases';
import { updateAliases } from './actions';
export interface RunV2MigrationOpts {
/** The current Kibana version */
@ -102,14 +100,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise<Migra
// C) both
const readyToReindexWaitGroupMap = createWaitGroupMap(indicesWithRelocatingTypes);
const doneReindexingWaitGroupMap = createWaitGroupMap(indicesWithRelocatingTypes);
const updateAliasesWaitGroupMap = createWaitGroupMap<AliasAction[], Promise<any>>(
indicesWithRelocatingTypes,
(allAliasActions) =>
updateAliases({
client: options.elasticsearchClient,
aliasActions: allAliasActions.flat(),
})()
);
const updateAliasesWaitGroupMap = createWaitGroupMap(indicesWithRelocatingTypes);
// build a list of all migrators that must be started
const migratorIndices = new Set(Object.keys(indexMap));

View file

@ -42,7 +42,8 @@ const RELOCATE_TYPES: Record<string, string> = {
};
const PARALLEL_MIGRATORS = 6;
export const logFilePath = Path.join(__dirname, 'dot_kibana_split.test.log');
export const logFilePathFirstRun = Path.join(__dirname, 'dot_kibana_split_1st_run.test.log');
export const logFilePathSecondRun = Path.join(__dirname, 'dot_kibana_split_2nd_run.test.log');
describe('split .kibana index into multiple system indices', () => {
let esServer: TestElasticsearchUtils['es'];
@ -53,11 +54,12 @@ describe('split .kibana index into multiple system indices', () => {
});
beforeEach(async () => {
await clearLog(logFilePath);
await clearLog(logFilePathFirstRun);
await clearLog(logFilePathSecondRun);
});
describe('when migrating from a legacy version', () => {
let migratorTestKitFactory: () => Promise<KibanaMigratorTestKit>;
let migratorTestKitFactory: (logFilePath: string) => Promise<KibanaMigratorTestKit>;
beforeAll(async () => {
esServer = await startElasticsearch({
@ -77,7 +79,7 @@ describe('split .kibana index into multiple system indices', () => {
}
);
migratorTestKitFactory = () =>
migratorTestKitFactory = (logFilePath: string) =>
getKibanaMigratorTestKit({
types: updatedTypeRegistry.getAllTypes(),
kibanaIndex: '.kibana',
@ -85,7 +87,7 @@ describe('split .kibana index into multiple system indices', () => {
defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP,
});
const { runMigrations, client } = await migratorTestKitFactory();
const { runMigrations, client } = await migratorTestKitFactory(logFilePathFirstRun);
// count of types in the legacy index
expect(await getAggregatedTypesCount(client, '.kibana_1')).toEqual({
@ -286,7 +288,7 @@ describe('split .kibana index into multiple system indices', () => {
}
`);
const logs = await parseLogFile(logFilePath);
const logs = await parseLogFile(logFilePathFirstRun);
expect(logs).toContainLogEntries(
[
@ -378,7 +380,7 @@ describe('split .kibana index into multiple system indices', () => {
`[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_META.`,
`[${index}] UPDATE_TARGET_MAPPINGS_META -> CHECK_VERSION_INDEX_READY_ACTIONS.`,
`[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY_SYNC.`,
`[${index}] MARK_VERSION_INDEX_READY_SYNC -> DONE.`,
`[${index}] MARK_VERSION_INDEX_READY_SYNC`, // all migrators try to update all aliases, all but one will have conclicts
`[${index}] Migration completed after`,
],
{ ordered: true }
@ -393,10 +395,9 @@ describe('split .kibana index into multiple system indices', () => {
afterEach(async () => {
// we run the migrator again to ensure that the next time state is loaded everything still works as expected
const { runMigrations } = await migratorTestKitFactory();
await clearLog(logFilePath);
const { runMigrations } = await migratorTestKitFactory(logFilePathSecondRun);
await runMigrations();
const logs = await parseLogFile(logFilePath);
const logs = await parseLogFile(logFilePathSecondRun);
expect(logs).not.toContainLogEntries(['REINDEX', 'CREATE', 'UPDATE_TARGET_MAPPINGS']);
});
@ -419,9 +420,11 @@ describe('split .kibana index into multiple system indices', () => {
expect(breakdownBefore).toEqual({
'.kibana': {
'apm-telemetry': 1,
application_usage_transactional: 4,
config: 1,
dashboard: 52994,
'index-pattern': 1,
'maps-telemetry': 1,
search: 1,
space: 1,
'ui-metric': 5,