mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[ZDT] handle failures during switch from v2 algo (#161476)
## Summary Fix a bug that was causing the cluster to fall into an unrecoverable state if an error occurs during the document migration phase of a `v2` to `zdt` migration (first zdt migration to take over the `v2` algorithm). When an error occurs during the document migration phase, the index gets in a state where `mappingVersions` is present but not `docVersions`, and the algorithm wasn't able to understand what this state was, and considered it to just be a plain `zdt` predecessors, which caused an error when entering the document migration phase, as `docVersions` couldn't be found. This PR addresses this, by properly identifying this specific state, and acting accordingly (by initiating a document migration without checking for the versions in `docVersions`, as we do then coming from a plain v2 algo)
This commit is contained in:
parent
13da9495a9
commit
79b5754a12
5 changed files with 265 additions and 12 deletions
|
@ -191,7 +191,7 @@ describe('Stage: init', () => {
|
|||
buildIndexMappingsMock.mockReturnValue({});
|
||||
});
|
||||
|
||||
it('adds a log entry about the algo check', () => {
|
||||
it('calls buildIndexMappings with the correct parameters', () => {
|
||||
const state = createState();
|
||||
const res: StateActionResponse<'INIT'> = Either.right(createResponse());
|
||||
|
||||
|
@ -203,7 +203,7 @@ describe('Stage: init', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('calls buildIndexMappings with the correct parameters', () => {
|
||||
it('adds a log entry about the algo check', () => {
|
||||
const state = createState();
|
||||
const res: StateActionResponse<'INIT'> = Either.right(createResponse());
|
||||
|
||||
|
@ -230,12 +230,52 @@ describe('Stage: init', () => {
|
|||
currentIndex,
|
||||
previousMappings: fetchIndexResponse[currentIndex].mappings,
|
||||
additiveMappingChanges: mockMappings.properties,
|
||||
previousAlgorithm: 'v2',
|
||||
skipDocumentMigration: false,
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('when checkIndexCurrentAlgorithm returns `v2-partially-migrated`', () => {
|
||||
beforeEach(() => {
|
||||
checkIndexCurrentAlgorithmMock.mockReset().mockReturnValue('v2-partially-migrated');
|
||||
buildIndexMappingsMock.mockReturnValue({});
|
||||
checkVersionCompatibilityMock.mockReturnValue({ status: 'greater' });
|
||||
});
|
||||
|
||||
it('adds a log entry about the algo check', () => {
|
||||
const state = createState();
|
||||
const res: StateActionResponse<'INIT'> = Either.right(createResponse());
|
||||
|
||||
const newState = init(state, res, context);
|
||||
|
||||
expect(newState.logs.map((entry) => entry.message)).toContain(
|
||||
`INIT: current algo check result: v2-partially-migrated`
|
||||
);
|
||||
});
|
||||
|
||||
it('INIT -> UPDATE_INDEX_MAPPINGS', () => {
|
||||
const state = createState();
|
||||
const fetchIndexResponse = createResponse();
|
||||
const res: StateActionResponse<'INIT'> = Either.right(fetchIndexResponse);
|
||||
|
||||
const mockMappings = { properties: { someMappings: 'string' } };
|
||||
buildIndexMappingsMock.mockReturnValue(mockMappings);
|
||||
|
||||
const newState = init(state, res, context);
|
||||
|
||||
expect(newState).toEqual(
|
||||
expect.objectContaining({
|
||||
controlState: 'UPDATE_INDEX_MAPPINGS',
|
||||
currentIndex,
|
||||
previousMappings: fetchIndexResponse[currentIndex].mappings,
|
||||
previousAlgorithm: 'v2',
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('when checkIndexCurrentAlgorithm returns `zdt`', () => {
|
||||
beforeEach(() => {
|
||||
checkIndexCurrentAlgorithmMock.mockReset().mockReturnValue('zdt');
|
||||
|
@ -329,6 +369,7 @@ describe('Stage: init', () => {
|
|||
previousMappings: fetchIndexResponse[currentIndex].mappings,
|
||||
additiveMappingChanges: { someToken: {} },
|
||||
skipDocumentMigration: false,
|
||||
previousAlgorithm: 'zdt',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
@ -368,6 +409,7 @@ describe('Stage: init', () => {
|
|||
currentIndex,
|
||||
previousMappings: fetchIndexResponse[currentIndex].mappings,
|
||||
skipDocumentMigration: false,
|
||||
previousAlgorithm: 'zdt',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
@ -390,6 +432,7 @@ describe('Stage: init', () => {
|
|||
currentIndex,
|
||||
previousMappings: fetchIndexResponse[currentIndex].mappings,
|
||||
skipDocumentMigration: false,
|
||||
previousAlgorithm: 'zdt',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
|
|
@ -127,7 +127,10 @@ export const init: ModelStage<
|
|||
aliases: existingAliases,
|
||||
aliasActions,
|
||||
previousMappings: currentMappings,
|
||||
previousAlgorithm: currentAlgo === 'v2-compatible' ? ('v2' as const) : ('zdt' as const),
|
||||
previousAlgorithm:
|
||||
currentAlgo === 'v2-compatible' || currentAlgo === 'v2-partially-migrated'
|
||||
? ('v2' as const)
|
||||
: ('zdt' as const),
|
||||
};
|
||||
|
||||
// compatible (8.8+) v2 algo => we jump to update index mapping
|
||||
|
|
|
@ -18,6 +18,16 @@ describe('checkIndexCurrentAlgorithm', () => {
|
|||
expect(checkIndexCurrentAlgorithm(mapping)).toEqual('unknown');
|
||||
});
|
||||
|
||||
it('returns `unknown` if _meta is present but empty', () => {
|
||||
const mapping: IndexMapping = {
|
||||
properties: {
|
||||
_meta: {},
|
||||
},
|
||||
};
|
||||
|
||||
expect(checkIndexCurrentAlgorithm(mapping)).toEqual('unknown');
|
||||
});
|
||||
|
||||
it('returns `unknown` if both v2 and zdt metas are present', () => {
|
||||
const mapping: IndexMapping = {
|
||||
properties: {},
|
||||
|
@ -25,7 +35,7 @@ describe('checkIndexCurrentAlgorithm', () => {
|
|||
migrationMappingPropertyHashes: {
|
||||
foo: 'someHash',
|
||||
},
|
||||
docVersions: {
|
||||
mappingVersions: {
|
||||
foo: '8.8.0',
|
||||
},
|
||||
},
|
||||
|
@ -34,7 +44,7 @@ describe('checkIndexCurrentAlgorithm', () => {
|
|||
expect(checkIndexCurrentAlgorithm(mapping)).toEqual('unknown');
|
||||
});
|
||||
|
||||
it('returns `zdt` if only zdt metas are present', () => {
|
||||
it('returns `zdt` if all zdt metas are present', () => {
|
||||
const mapping: IndexMapping = {
|
||||
properties: {},
|
||||
_meta: {
|
||||
|
@ -50,6 +60,19 @@ describe('checkIndexCurrentAlgorithm', () => {
|
|||
expect(checkIndexCurrentAlgorithm(mapping)).toEqual('zdt');
|
||||
});
|
||||
|
||||
it('returns `v2-partially-migrated` if only mappingVersions is present', () => {
|
||||
const mapping: IndexMapping = {
|
||||
properties: {},
|
||||
_meta: {
|
||||
mappingVersions: {
|
||||
foo: '8.8.0',
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
expect(checkIndexCurrentAlgorithm(mapping)).toEqual('v2-partially-migrated');
|
||||
});
|
||||
|
||||
it('returns `v2-incompatible` if v2 hashes are present but not indexTypesMap', () => {
|
||||
const mapping: IndexMapping = {
|
||||
properties: {},
|
||||
|
|
|
@ -11,12 +11,22 @@ import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal'
|
|||
/**
|
||||
* The list of values returned by `checkIndexCurrentAlgorithm`.
|
||||
*
|
||||
* - `zdt`
|
||||
* - `v2-compatible`
|
||||
* - `v2-incompatible`
|
||||
* - `unknown`
|
||||
* - `zdt`: last algo that ran was zdt
|
||||
*
|
||||
* - `v2-compatible`: last running algo was a v2 version that zdt can take over
|
||||
*
|
||||
* - `v2-incompatible`: last running algo was a v2 version that zdt can not take over
|
||||
*
|
||||
* - `v2-partially-migrated`: last running algo was zdt taking over v2, but the migration failed at some point
|
||||
*
|
||||
* - `unknown`: last running algo cannot be determined
|
||||
*/
|
||||
export type CheckCurrentAlgorithmResult = 'zdt' | 'v2-compatible' | 'v2-incompatible' | 'unknown';
|
||||
export type CheckCurrentAlgorithmResult =
|
||||
| 'zdt'
|
||||
| 'v2-partially-migrated'
|
||||
| 'v2-compatible'
|
||||
| 'v2-incompatible'
|
||||
| 'unknown';
|
||||
|
||||
export const checkIndexCurrentAlgorithm = (
|
||||
indexMapping: IndexMapping
|
||||
|
@ -27,7 +37,7 @@ export const checkIndexCurrentAlgorithm = (
|
|||
}
|
||||
|
||||
const hasV2Meta = !!meta.migrationMappingPropertyHashes;
|
||||
const hasZDTMeta = !!meta.docVersions || !!meta.mappingVersions;
|
||||
const hasZDTMeta = !!meta.mappingVersions;
|
||||
|
||||
if (hasV2Meta && hasZDTMeta) {
|
||||
return 'unknown';
|
||||
|
@ -37,7 +47,8 @@ export const checkIndexCurrentAlgorithm = (
|
|||
return isCompatible ? 'v2-compatible' : 'v2-incompatible';
|
||||
}
|
||||
if (hasZDTMeta) {
|
||||
return 'zdt';
|
||||
const isFullZdt = !!meta.docVersions;
|
||||
return isFullZdt ? 'zdt' : 'v2-partially-migrated';
|
||||
}
|
||||
return 'unknown';
|
||||
};
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import Path from 'path';
|
||||
import fs from 'fs/promises';
|
||||
import { range } from 'lodash';
|
||||
import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
|
||||
import { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
|
||||
import '../jest_matchers';
|
||||
import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit';
|
||||
import { delay, parseLogFile, createType } from '../test_utils';
|
||||
import { getBaseMigratorParams, noopMigration } from '../fixtures/zdt_base.fixtures';
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'v2_to_zdt_partial_failure.test.log');
|
||||
|
||||
describe('ZDT with v2 compat - recovering from partially migrated state', () => {
|
||||
let esServer: TestElasticsearchUtils['es'];
|
||||
|
||||
beforeAll(async () => {
|
||||
await fs.unlink(logFilePath).catch(() => {});
|
||||
esServer = await startElasticsearch();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await esServer?.stop();
|
||||
await delay(10);
|
||||
});
|
||||
|
||||
const typeBefore = createType({
|
||||
name: 'switching_type',
|
||||
mappings: {
|
||||
properties: {
|
||||
text: { type: 'text' },
|
||||
keyword: { type: 'keyword' },
|
||||
},
|
||||
},
|
||||
migrations: {
|
||||
'7.0.0': noopMigration,
|
||||
'7.5.0': noopMigration,
|
||||
},
|
||||
});
|
||||
|
||||
const typeFailingBetween = createType({
|
||||
...typeBefore,
|
||||
switchToModelVersionAt: '8.0.0',
|
||||
modelVersions: {
|
||||
1: {
|
||||
changes: [
|
||||
{
|
||||
type: 'data_backfill',
|
||||
backfillFn: (doc) => {
|
||||
// this was the easiest way to simulate a migrate failure during doc mig.
|
||||
throw new Error('need something to interrupt the migration');
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
mappings: {
|
||||
properties: {
|
||||
text: { type: 'text' },
|
||||
keyword: { type: 'keyword' },
|
||||
newField: { type: 'text' },
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const typeAfter = createType({
|
||||
...typeBefore,
|
||||
switchToModelVersionAt: '8.0.0',
|
||||
modelVersions: {
|
||||
1: {
|
||||
changes: [
|
||||
{
|
||||
type: 'data_backfill',
|
||||
backfillFn: (doc) => {
|
||||
return { attributes: { newField: 'some value' } };
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
mappings: {
|
||||
properties: {
|
||||
text: { type: 'text' },
|
||||
keyword: { type: 'keyword' },
|
||||
newField: { type: 'text' },
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const createBaseline = async () => {
|
||||
const { runMigrations, savedObjectsRepository } = await getKibanaMigratorTestKit({
|
||||
...getBaseMigratorParams({ migrationAlgorithm: 'v2', kibanaVersion: '8.9.0' }),
|
||||
types: [typeBefore],
|
||||
});
|
||||
await runMigrations();
|
||||
|
||||
const sampleObjs = range(5).map<SavedObjectsBulkCreateObject>((number) => ({
|
||||
id: `doc-${number}`,
|
||||
type: 'switching_type',
|
||||
attributes: {
|
||||
text: `text ${number}`,
|
||||
keyword: `kw ${number}`,
|
||||
},
|
||||
}));
|
||||
|
||||
await savedObjectsRepository.bulkCreate(sampleObjs);
|
||||
};
|
||||
|
||||
const runFailingMigration = async () => {
|
||||
const { runMigrations } = await getKibanaMigratorTestKit({
|
||||
...getBaseMigratorParams(),
|
||||
types: [typeFailingBetween],
|
||||
});
|
||||
|
||||
await expect(runMigrations()).rejects.toBeDefined();
|
||||
};
|
||||
|
||||
it('migrates the documents', async () => {
|
||||
await createBaseline();
|
||||
await runFailingMigration();
|
||||
|
||||
const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({
|
||||
...getBaseMigratorParams(),
|
||||
logFilePath,
|
||||
types: [typeAfter],
|
||||
});
|
||||
|
||||
await runMigrations();
|
||||
|
||||
const indices = await client.indices.get({ index: '.kibana*' });
|
||||
expect(Object.keys(indices)).toEqual(['.kibana_8.9.0_001']);
|
||||
|
||||
const index = indices['.kibana_8.9.0_001'];
|
||||
const mappings = index.mappings ?? {};
|
||||
const mappingMeta = mappings._meta ?? {};
|
||||
|
||||
expect(mappings.properties).toEqual(
|
||||
expect.objectContaining({
|
||||
switching_type: typeAfter.mappings,
|
||||
})
|
||||
);
|
||||
|
||||
expect(mappingMeta.docVersions).toEqual({
|
||||
switching_type: '10.1.0',
|
||||
});
|
||||
|
||||
const { saved_objects: sampleDocs } = await savedObjectsRepository.find({
|
||||
type: 'switching_type',
|
||||
});
|
||||
|
||||
expect(sampleDocs).toHaveLength(5);
|
||||
|
||||
const records = await parseLogFile(logFilePath);
|
||||
expect(records).toContainLogEntries(
|
||||
[
|
||||
'current algo check result: v2-partially-migrated',
|
||||
'INIT -> INDEX_STATE_UPDATE_DONE',
|
||||
'INDEX_STATE_UPDATE_DONE -> DOCUMENTS_UPDATE_INIT',
|
||||
'Starting to process 5 documents.',
|
||||
'-> DONE',
|
||||
'Migration completed',
|
||||
],
|
||||
{ ordered: true }
|
||||
);
|
||||
});
|
||||
});
|
Loading…
Add table
Add a link
Reference in a new issue