Refactor KibanaMigrator, improve readability, maintainability and UT (#155693)

Addresses the following feedback:
https://github.com/elastic/kibana/pull/154151#discussion_r1158470566

Similar to what has been done for ZDT, the goal of this PR is to extract
the logic of the `runV2Migration()` from the `KibanaMigrator` into a
separate file.

The PR also fixes some incomplete / incorrect UTs and adds a few missing
ones.
This commit is contained in:
Gerard Soldevila 2023-06-01 14:47:40 +02:00 committed by GitHub
parent ab4181aaca
commit 06c337f903
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 988 additions and 661 deletions

View file

@ -48,7 +48,7 @@ export const retryCallCluster = <T extends Promise<unknown>>(apiCaller: () => T)
* Retries the provided Elasticsearch API call when an error such as
* `AuthenticationException` `NoConnections`, `ConnectionFault`,
* `ServiceUnavailable` or `RequestTimeout` are encountered. The API call will
* be retried once a second, indefinitely, until a successful response or a
* be retried once every `delay` millis, indefinitely, until a successful response or a
* different error is received.
*
* @example

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
export { DEFAULT_INDEX_TYPES_MAP } from './src/constants';
export { LEGACY_URL_ALIAS_TYPE, type LegacyUrlAlias } from './src/legacy_alias';
export {
getProperty,

View file

@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { IndexTypesMap } from './mappings';
/**
* This map holds the default breakdown of SO types per index (pre 8.8.0)
*/
export const DEFAULT_INDEX_TYPES_MAP: IndexTypesMap = {
'.kibana_task_manager': ['task'],
'.kibana': [
'action',
'action_task_params',
'alert',
'api_key_pending_invalidation',
'apm-indices',
'apm-server-schema',
'apm-service-group',
'apm-telemetry',
'app_search_telemetry',
'application_usage_daily',
'application_usage_totals',
'book',
'canvas-element',
'canvas-workpad',
'canvas-workpad-template',
'cases',
'cases-comments',
'cases-configure',
'cases-connector-mappings',
'cases-telemetry',
'cases-user-actions',
'config',
'config-global',
'connector_token',
'core-usage-stats',
'csp-rule-template',
'dashboard',
'endpoint:user-artifact-manifest',
'enterprise_search_telemetry',
'epm-packages',
'epm-packages-assets',
'event_loop_delays_daily',
'exception-list',
'exception-list-agnostic',
'file',
'file-upload-usage-collection-telemetry',
'fileShare',
'fleet-fleet-server-host',
'fleet-message-signing-keys',
'fleet-preconfiguration-deletion-record',
'fleet-proxy',
'graph-workspace',
'guided-onboarding-guide-state',
'guided-onboarding-plugin-state',
'index-pattern',
'infrastructure-monitoring-log-view',
'infrastructure-ui-source',
'ingest-agent-policies',
'ingest-download-sources',
'ingest-outputs',
'ingest-package-policies',
'ingest_manager_settings',
'inventory-view',
'kql-telemetry',
'legacy-url-alias',
'lens',
'lens-ui-telemetry',
'map',
'metrics-explorer-view',
'ml-job',
'ml-module',
'ml-trained-model',
'monitoring-telemetry',
'osquery-manager-usage-metric',
'osquery-pack',
'osquery-pack-asset',
'osquery-saved-query',
'query',
'rules-settings',
'sample-data-telemetry',
'search',
'search-session',
'search-telemetry',
'searchableList',
'security-rule',
'security-solution-signals-migration',
'siem-detection-engine-rule-actions',
'siem-ui-timeline',
'siem-ui-timeline-note',
'siem-ui-timeline-pinned-event',
'slo',
'space',
'spaces-usage-stats',
'synthetics-monitor',
'synthetics-param',
'synthetics-privates-locations',
'tag',
'telemetry',
'todo',
'ui-metric',
'upgrade-assistant-ml-upgrade-operation',
'upgrade-assistant-reindex-operation',
'uptime-dynamic-settings',
'uptime-synthetics-api-key',
'url',
'usage-counters',
'visualization',
'workplace_search_telemetry',
],
};

View file

@ -7,34 +7,59 @@
*/
import { take } from 'rxjs/operators';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import { SavedObjectTypeRegistry } from '@kbn/core-saved-objects-base-server-internal';
import { type KibanaMigratorOptions, KibanaMigrator } from './kibana_migrator';
import {
type MigrationResult,
SavedObjectTypeRegistry,
} from '@kbn/core-saved-objects-base-server-internal';
import { KibanaMigrator } from './kibana_migrator';
import { DocumentMigrator } from './document_migrator';
import { ByteSizeValue } from '@kbn/config-schema';
import { docLinksServiceMock } from '@kbn/core-doc-links-server-mocks';
import { lastValueFrom } from 'rxjs';
import * as runResilientMigratorModule from './run_resilient_migrator';
import { runResilientMigrator } from './run_resilient_migrator';
import { runV2Migration } from './run_v2_migration';
import { runZeroDowntimeMigration } from './zdt';
const mappingsResponseWithoutIndexTypesMap: estypes.IndicesGetMappingResponse = {
'.kibana_8.7.0_001': {
mappings: {
_meta: {
migrationMappingPropertyHashes: {
references: '7997cf5a56cc02bdc9c93361bde732b0',
// ...
},
// we do not add a `indexTypesMap`
// simulating a Kibana < 8.8.0 that does not have one yet
},
},
const V2_SUCCESSFUL_MIGRATION_RESULT: MigrationResult[] = [
{
sourceIndex: '.my_index_pre8.2.3_001',
destIndex: '.my_index_8.2.3_001',
elapsedMs: 14,
status: 'migrated',
},
};
];
const ZDT_SUCCESSFUL_MIGRATION_RESULT: MigrationResult[] = [
{
sourceIndex: '.my_index_8.8.0_001',
destIndex: '.my_index_8.8.1_001',
elapsedMs: 14,
status: 'migrated',
},
{
destIndex: '.other_index_8.8.0_001',
elapsedMs: 128,
status: 'patched',
},
];
jest.mock('./run_v2_migration', () => {
return {
runV2Migration: jest.fn(
(): Promise<MigrationResult[]> => Promise.resolve(V2_SUCCESSFUL_MIGRATION_RESULT)
),
};
});
jest.mock('./zdt', () => {
return {
runZeroDowntimeMigration: jest.fn(
(): Promise<MigrationResult[]> => Promise.resolve(ZDT_SUCCESSFUL_MIGRATION_RESULT)
),
};
});
const createRegistry = (types: Array<Partial<SavedObjectsType>>) => {
const registry = new SavedObjectTypeRegistry();
@ -51,10 +76,15 @@ const createRegistry = (types: Array<Partial<SavedObjectsType>>) => {
return registry;
};
const mockRunV2Migration = runV2Migration as jest.MockedFunction<typeof runV2Migration>;
const mockRunZeroDowntimeMigration = runZeroDowntimeMigration as jest.MockedFunction<
typeof runZeroDowntimeMigration
>;
describe('KibanaMigrator', () => {
beforeEach(() => {
jest.restoreAllMocks();
jest.spyOn(runResilientMigratorModule, 'runResilientMigrator');
mockRunV2Migration.mockClear();
mockRunZeroDowntimeMigration.mockClear();
});
describe('getActiveMappings', () => {
it('returns full index mappings w/ core properties', () => {
@ -68,7 +98,7 @@ describe('KibanaMigrator', () => {
},
{
name: 'bmap',
indexPattern: '.other-index',
indexPattern: '.other_index',
mappings: {
properties: { field: { type: 'text' } },
},
@ -90,6 +120,7 @@ describe('KibanaMigrator', () => {
);
});
// TODO check if it applies
it('calls documentMigrator.migrate', () => {
const options = mockOptions();
const kibanaMigrator = new KibanaMigrator(options);
@ -102,88 +133,57 @@ describe('KibanaMigrator', () => {
});
describe('runMigrations', () => {
it('throws if prepareMigrations is not called first', async () => {
it("calls runV2Migration with the right params when the migration algorithm is 'v2'", async () => {
const options = mockOptions();
const migrator = new KibanaMigrator(options);
migrator.prepareMigrations();
const res = await migrator.runMigrations();
await expect(migrator.runMigrations()).rejects.toThrowError(
'Migrations are not ready. Make sure prepareMigrations is called first.'
expect(runV2Migration).toHaveBeenCalledTimes(1);
expect(runZeroDowntimeMigration).not.toHaveBeenCalled();
expect(runV2Migration).toHaveBeenCalledWith(
expect.objectContaining({
kibanaVersion: '8.2.3',
kibanaIndexPrefix: '.my_index',
migrationConfig: options.soMigrationsConfig,
waitForMigrationCompletion: false,
})
);
expect(res).toEqual(V2_SUCCESSFUL_MIGRATION_RESULT);
});
it("calls runZeroDowntimeMigration with the right params when the migration algorithm is 'zdt'", async () => {
const options = mockOptions('zdt');
const migrator = new KibanaMigrator(options);
migrator.prepareMigrations();
const res = await migrator.runMigrations();
expect(runZeroDowntimeMigration).toHaveBeenCalledTimes(1);
expect(runV2Migration).not.toHaveBeenCalled();
expect(runZeroDowntimeMigration).toHaveBeenCalledWith(
expect.objectContaining({
kibanaVersion: '8.2.3',
kibanaIndexPrefix: '.my_index',
migrationConfig: options.soMigrationsConfig,
})
);
expect(res).toEqual(ZDT_SUCCESSFUL_MIGRATION_RESULT);
});
it('only runs migrations once if called multiple times', async () => {
const successfulRun: typeof runResilientMigrator = ({ indexPrefix }) =>
Promise.resolve({
sourceIndex: indexPrefix,
destIndex: indexPrefix,
elapsedMs: 28,
status: 'migrated',
});
const mockRunResilientMigrator = runResilientMigrator as jest.MockedFunction<
typeof runResilientMigrator
>;
mockRunResilientMigrator.mockImplementationOnce(successfulRun);
mockRunResilientMigrator.mockImplementationOnce(successfulRun);
mockRunResilientMigrator.mockImplementationOnce(successfulRun);
mockRunResilientMigrator.mockImplementationOnce(successfulRun);
const options = mockOptions();
options.client.indices.get.mockResponse({}, { statusCode: 200 });
options.client.indices.getMapping.mockResponse(mappingsResponseWithoutIndexTypesMap, {
statusCode: 200,
});
options.client.cluster.getSettings.mockResponse(
{
transient: {},
persistent: {},
},
{ statusCode: 404 }
);
const migrator = new KibanaMigrator(options);
migrator.prepareMigrations();
await migrator.runMigrations();
await migrator.runMigrations();
await migrator.runMigrations();
// indices.get is called twice during a single migration
expect(runResilientMigrator).toHaveBeenCalledTimes(4);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
indexPrefix: '.my-index',
mustRelocateDocuments: true,
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
indexPrefix: '.other-index',
mustRelocateDocuments: true,
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
indexPrefix: '.my-task-index',
mustRelocateDocuments: false,
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
4,
expect.objectContaining({
indexPrefix: '.my-complementary-index',
mustRelocateDocuments: true,
})
);
expect(runV2Migration).toHaveBeenCalledTimes(1);
});
it('emits results on getMigratorResult$()', async () => {
const options = mockV2MigrationOptions();
options.client.indices.getMapping.mockResponse(mappingsResponseWithoutIndexTypesMap, {
statusCode: 200,
});
it('emits v2 results on getMigratorResult$()', async () => {
const options = mockOptions();
const migrator = new KibanaMigrator(options);
const migratorStatus = lastValueFrom(migrator.getStatus$().pipe(take(3)));
migrator.prepareMigrations();
@ -191,294 +191,70 @@ describe('KibanaMigrator', () => {
const { status, result } = await migratorStatus;
expect(status).toEqual('completed');
expect(result![0]).toMatchObject({
destIndex: '.my-index_8.2.3_001',
sourceIndex: '.my-index_pre8.2.3_001',
elapsedMs: expect.any(Number),
status: 'migrated',
});
expect(result![1]).toMatchObject({
destIndex: '.other-index_8.2.3_001',
elapsedMs: expect.any(Number),
status: 'patched',
});
expect(result).toEqual(V2_SUCCESSFUL_MIGRATION_RESULT);
});
it('rejects when the migration state machine terminates in a FATAL state', async () => {
const options = mockV2MigrationOptions();
options.client.indices.get.mockResponse(
{
'.my-index_8.2.4_001': {
aliases: {
'.my-index': {},
'.my-index_8.2.4': {},
},
mappings: { properties: {}, _meta: { migrationMappingPropertyHashes: {} } },
settings: {},
},
},
{ statusCode: 200 }
);
options.client.indices.getMapping.mockResponse(mappingsResponseWithoutIndexTypesMap, {
statusCode: 200,
});
it('emits zdt results on getMigratorResult$()', async () => {
const options = mockOptions('zdt');
const migrator = new KibanaMigrator(options);
const migratorStatus = lastValueFrom(migrator.getStatus$().pipe(take(3)));
migrator.prepareMigrations();
return expect(migrator.runMigrations()).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.my-index] index: The .my-index alias is pointing to a newer version of Kibana: v8.2.4]`
);
await migrator.runMigrations();
const { status, result } = await migratorStatus;
expect(status).toEqual('completed');
expect(result).toEqual(ZDT_SUCCESSFUL_MIGRATION_RESULT);
});
it('rejects when an unexpected exception occurs in an action', async () => {
const options = mockV2MigrationOptions();
options.client.tasks.get.mockResponse({
completed: true,
error: { type: 'elasticsearch_exception', reason: 'task failed with an error' },
task: { description: 'task description' } as any,
});
options.client.indices.getMapping.mockResponse(mappingsResponseWithoutIndexTypesMap, {
statusCode: 200,
});
it('rejects when the v2 migrator algorithm rejects', async () => {
const options = mockOptions();
const migrator = new KibanaMigrator(options);
const fatal = new Error(
`Unable to complete saved object migrations for the [${options.kibanaIndex}] index: Something went horribly wrong`
);
mockRunV2Migration.mockRejectedValueOnce(fatal);
migrator.prepareMigrations();
await expect(migrator.runMigrations()).rejects.toMatchInlineSnapshot(`
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
`);
expect(loggingSystemMock.collect(options.logger).error[0][0]).toMatchInlineSnapshot(`
[Error: Reindex failed with the following error:
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
`);
expect(migrator.runMigrations()).rejects.toEqual(fatal);
});
describe('for V2 migrations', () => {
describe('where some SO types must be relocated', () => {
it('runs successfully', async () => {
const options = mockV2MigrationOptions();
options.client.indices.getMapping.mockResponse(mappingsResponseWithoutIndexTypesMap, {
statusCode: 200,
});
it('rejects when the zdt migrator algorithm rejects', async () => {
const options = mockOptions('zdt');
const migrator = new KibanaMigrator(options);
const migrator = new KibanaMigrator(options);
migrator.prepareMigrations();
const results = await migrator.runMigrations();
const fatal = new Error(
`Unable to complete saved object migrations for the [${options.kibanaIndex}] index: Something went terribly wrong`
);
mockRunZeroDowntimeMigration.mockRejectedValueOnce(fatal);
expect(results.length).toEqual(4);
expect(results[0]).toEqual(
expect.objectContaining({
sourceIndex: '.my-index_pre8.2.3_001',
destIndex: '.my-index_8.2.3_001',
elapsedMs: expect.any(Number),
status: 'migrated',
})
);
expect(results[1]).toEqual(
expect.objectContaining({
destIndex: '.other-index_8.2.3_001',
elapsedMs: expect.any(Number),
status: 'patched',
})
);
expect(results[2]).toEqual(
expect.objectContaining({
destIndex: '.my-task-index_8.2.3_001',
elapsedMs: expect.any(Number),
status: 'patched',
})
);
expect(results[3]).toEqual(
expect.objectContaining({
destIndex: '.my-complementary-index_8.2.3_001',
elapsedMs: expect.any(Number),
status: 'patched',
})
);
expect(runResilientMigrator).toHaveBeenCalledTimes(4);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
kibanaVersion: '8.2.3',
indexPrefix: '.my-index',
indexTypesMap: {
'.my-index': ['testtype', 'testtype3'],
'.other-index': ['testtype2'],
'.my-task-index': ['testtasktype'],
},
targetMappings: expect.objectContaining({
properties: expect.objectContaining({
testtype: expect.anything(),
testtype3: expect.anything(),
}),
}),
readyToReindex: expect.objectContaining({
promise: expect.anything(),
resolve: expect.anything(),
reject: expect.anything(),
}),
mustRelocateDocuments: true,
doneReindexing: expect.objectContaining({
promise: expect.anything(),
resolve: expect.anything(),
reject: expect.anything(),
}),
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
kibanaVersion: '8.2.3',
indexPrefix: '.other-index',
indexTypesMap: {
'.my-index': ['testtype', 'testtype3'],
'.other-index': ['testtype2'],
'.my-task-index': ['testtasktype'],
},
targetMappings: expect.objectContaining({
properties: expect.objectContaining({
testtype2: expect.anything(),
}),
}),
readyToReindex: expect.objectContaining({
promise: expect.anything(),
resolve: expect.anything(),
reject: expect.anything(),
}),
mustRelocateDocuments: true,
doneReindexing: expect.objectContaining({
promise: expect.anything(),
resolve: expect.anything(),
reject: expect.anything(),
}),
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
kibanaVersion: '8.2.3',
indexPrefix: '.my-task-index',
indexTypesMap: {
'.my-index': ['testtype', 'testtype3'],
'.other-index': ['testtype2'],
'.my-task-index': ['testtasktype'],
},
targetMappings: expect.objectContaining({
properties: expect.objectContaining({
testtasktype: expect.anything(),
}),
}),
// this migrator is NOT involved in any relocation,
// thus, it must not synchronize with other migrators
mustRelocateDocuments: false,
readyToReindex: undefined,
doneReindexing: undefined,
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
4,
expect.objectContaining({
kibanaVersion: '8.2.3',
indexPrefix: '.my-complementary-index',
indexTypesMap: {
'.my-index': ['testtype', 'testtype3'],
'.other-index': ['testtype2'],
'.my-task-index': ['testtasktype'],
},
targetMappings: expect.objectContaining({
properties: expect.not.objectContaining({
// this index does no longer have any types associated to it
testtype: expect.anything(),
testtype2: expect.anything(),
testtype3: expect.anything(),
testtasktype: expect.anything(),
}),
}),
mustRelocateDocuments: true,
doneReindexing: expect.objectContaining({
promise: expect.anything(),
resolve: expect.anything(),
reject: expect.anything(),
}),
})
);
});
});
migrator.prepareMigrations();
expect(migrator.runMigrations()).rejects.toEqual(fatal);
});
});
});
type MockedOptions = KibanaMigratorOptions & {
client: ReturnType<typeof elasticsearchClientMock.createElasticsearchClient>;
};
const mockV2MigrationOptions = () => {
const options = mockOptions();
options.client.cluster.getSettings.mockResponse(
{
transient: {},
persistent: {},
},
{ statusCode: 200 }
);
options.client.indices.get.mockResponse(
{
'.my-index': {
aliases: { '.kibana': {} },
mappings: { properties: {} },
settings: {},
},
},
{ statusCode: 200 }
);
options.client.indices.addBlock.mockResponse({
acknowledged: true,
shards_acknowledged: true,
indices: [],
});
options.client.reindex.mockResponse({
taskId: 'reindex_task_id',
} as estypes.ReindexResponse);
options.client.tasks.get.mockResponse({
completed: true,
error: undefined,
failures: [],
task: { description: 'task description' } as any,
} as estypes.TasksGetResponse);
options.client.search.mockResponse({ hits: { hits: [] } } as any);
options.client.openPointInTime.mockResponse({ id: 'pit_id' });
options.client.closePointInTime.mockResponse({
succeeded: true,
} as estypes.ClosePointInTimeResponse);
return options;
};
const mockOptions = () => {
const mockOptions = (algorithm: 'v2' | 'zdt' = 'v2') => {
const mockedClient = elasticsearchClientMock.createElasticsearchClient();
(mockedClient as any).child = jest.fn().mockImplementation(() => mockedClient);
const options: MockedOptions = {
return {
logger: loggingSystemMock.create().get(),
kibanaVersion: '8.2.3',
waitForMigrationCompletion: false,
defaultIndexTypesMap: {
'.my-index': ['testtype', 'testtype2'],
'.my-task-index': ['testtasktype'],
'.my_index': ['testtype', 'testtype2'],
'.task_index': ['testtasktype'],
// this index no longer has any types registered in typeRegistry
// but we still need a migrator for it, so that 'testtype3' documents
// are moved over to their new index (.my_index)
'.my-complementary-index': ['testtype3'],
'.my_complementary_index': ['testtype3'],
},
typeRegistry: createRegistry([
// typeRegistry depicts an updated index map:
// .my-index: ['testtype', 'testtype3'],
// .my-other-index: ['testtype2'],
// .my-task-index': ['testtasktype'],
// .my_index: ['testtype', 'testtype3'],
// .other_index: ['testtype2'],
// .task_index': ['testtasktype'],
{
name: 'testtype',
hidden: false,
@ -494,8 +270,8 @@ const mockOptions = () => {
name: 'testtype2',
hidden: false,
namespaceType: 'single',
// We are moving 'testtype2' from '.my-index' to '.other-index'
indexPattern: '.other-index',
// We are moving 'testtype2' from '.my_index' to '.other_index'
indexPattern: '.other_index',
mappings: {
properties: {
name: { type: 'keyword' },
@ -507,7 +283,7 @@ const mockOptions = () => {
name: 'testtasktype',
hidden: false,
namespaceType: 'single',
indexPattern: '.my-task-index',
indexPattern: '.task_index',
mappings: {
properties: {
name: { type: 'keyword' },
@ -516,7 +292,7 @@ const mockOptions = () => {
migrations: {},
},
{
// We are moving 'testtype3' from '.my-complementary-index' to '.my-index'
// We are moving 'testtype3' from '.my_complementary_index' to '.my_index'
name: 'testtype3',
hidden: false,
namespaceType: 'single',
@ -528,9 +304,9 @@ const mockOptions = () => {
migrations: {},
},
]),
kibanaIndex: '.my-index',
kibanaIndex: '.my_index',
soMigrationsConfig: {
algorithm: 'v2',
algorithm,
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
maxReadBatchSizeBytes: new ByteSizeValue(536870888),
@ -547,5 +323,4 @@ const mockOptions = () => {
docLinks: docLinksServiceMock.createSetupContract(),
nodeRoles: { backgroundTasks: true, ui: true, migrator: true },
};
return options;
};

View file

@ -12,14 +12,12 @@
*/
import { BehaviorSubject } from 'rxjs';
import Semver from 'semver';
import type { NodeRoles } from '@kbn/core-node-server';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import {
type SavedObjectUnsanitizedDoc,
type SavedObjectsRawDoc,
type ISavedObjectTypeRegistry,
} from '@kbn/core-saved-objects-server';
import {
@ -33,26 +31,22 @@ import {
type MigrationResult,
type IndexTypesMap,
} from '@kbn/core-saved-objects-base-server-internal';
import { getIndicesInvolvedInRelocation } from './kibana_migrator_utils';
import { buildActiveMappings, buildTypesMappings } from './core';
import { DocumentMigrator } from './document_migrator';
import { createIndexMap } from './core/build_index_map';
import { runResilientMigrator } from './run_resilient_migrator';
import { migrateRawDocsSafely } from './core/migrate_raw_docs';
import { runZeroDowntimeMigration } from './zdt';
import { createMultiPromiseDefer, indexMapToIndexTypesMap } from './kibana_migrator_utils';
import { ALLOWED_CONVERT_VERSION, DEFAULT_INDEX_TYPES_MAP } from './kibana_migrator_constants';
import { ALLOWED_CONVERT_VERSION } from './kibana_migrator_constants';
import { runV2Migration } from './run_v2_migration';
export interface KibanaMigratorOptions {
client: ElasticsearchClient;
typeRegistry: ISavedObjectTypeRegistry;
defaultIndexTypesMap: IndexTypesMap;
soMigrationsConfig: SavedObjectsMigrationConfigType;
kibanaIndex: string;
kibanaVersion: string;
logger: Logger;
docLinks: DocLinksServiceStart;
waitForMigrationCompletion: boolean;
defaultIndexTypesMap?: IndexTypesMap;
nodeRoles: NodeRoles;
}
@ -66,6 +60,7 @@ export class KibanaMigrator implements IKibanaMigrator {
private readonly log: Logger;
private readonly mappingProperties: SavedObjectsTypeMappingDefinitions;
private readonly typeRegistry: ISavedObjectTypeRegistry;
private readonly defaultIndexTypesMap: IndexTypesMap;
private readonly serializer: SavedObjectsSerializer;
private migrationResult?: Promise<MigrationResult[]>;
private readonly status$ = new BehaviorSubject<KibanaMigratorStatus>({
@ -75,7 +70,6 @@ export class KibanaMigrator implements IKibanaMigrator {
private readonly soMigrationsConfig: SavedObjectsMigrationConfigType;
private readonly docLinks: DocLinksServiceStart;
private readonly waitForMigrationCompletion: boolean;
private readonly defaultIndexTypesMap: IndexTypesMap;
private readonly nodeRoles: NodeRoles;
public readonly kibanaVersion: string;
@ -86,11 +80,11 @@ export class KibanaMigrator implements IKibanaMigrator {
client,
typeRegistry,
kibanaIndex,
defaultIndexTypesMap,
soMigrationsConfig,
kibanaVersion,
logger,
docLinks,
defaultIndexTypesMap = DEFAULT_INDEX_TYPES_MAP,
waitForMigrationCompletion,
nodeRoles,
}: KibanaMigratorOptions) {
@ -98,6 +92,7 @@ export class KibanaMigrator implements IKibanaMigrator {
this.kibanaIndex = kibanaIndex;
this.soMigrationsConfig = soMigrationsConfig;
this.typeRegistry = typeRegistry;
this.defaultIndexTypesMap = defaultIndexTypesMap;
this.serializer = new SavedObjectsSerializer(this.typeRegistry);
this.mappingProperties = buildTypesMappings(this.typeRegistry.getAllTypes());
this.log = logger;
@ -114,7 +109,6 @@ export class KibanaMigrator implements IKibanaMigrator {
// operation so we cache the result
this.activeMappings = buildActiveMappings(this.mappingProperties);
this.docLinks = docLinks;
this.defaultIndexTypesMap = defaultIndexTypesMap;
}
public runMigrations({ rerun = false }: { rerun?: boolean } = {}): Promise<MigrationResult[]> {
@ -144,122 +138,37 @@ export class KibanaMigrator implements IKibanaMigrator {
return this.status$.asObservable();
}
private async runMigrationsInternal(): Promise<MigrationResult[]> {
private runMigrationsInternal(): Promise<MigrationResult[]> {
const migrationAlgorithm = this.soMigrationsConfig.algorithm;
if (migrationAlgorithm === 'zdt') {
return await this.runMigrationZdt();
} else {
return await this.runMigrationV2();
}
}
private runMigrationZdt(): Promise<MigrationResult[]> {
return runZeroDowntimeMigration({
kibanaVersion: this.kibanaVersion,
kibanaIndexPrefix: this.kibanaIndex,
typeRegistry: this.typeRegistry,
logger: this.log,
documentMigrator: this.documentMigrator,
migrationConfig: this.soMigrationsConfig,
docLinks: this.docLinks,
serializer: this.serializer,
elasticsearchClient: this.client,
nodeRoles: this.nodeRoles,
});
}
private async runMigrationV2(): Promise<MigrationResult[]> {
const indexMap = createIndexMap({
kibanaIndexName: this.kibanaIndex,
indexMap: this.mappingProperties,
registry: this.typeRegistry,
});
this.log.debug('Applying registered migrations for the following saved object types:');
Object.entries(this.documentMigrator.getMigrationVersion())
.sort(([t1, v1], [t2, v2]) => {
return Semver.compare(v1, v2);
})
.forEach(([type, migrationVersion]) => {
this.log.debug(`migrationVersion: ${migrationVersion} saved object type: ${type}`);
return runZeroDowntimeMigration({
kibanaVersion: this.kibanaVersion,
kibanaIndexPrefix: this.kibanaIndex,
typeRegistry: this.typeRegistry,
logger: this.log,
documentMigrator: this.documentMigrator,
migrationConfig: this.soMigrationsConfig,
docLinks: this.docLinks,
serializer: this.serializer,
elasticsearchClient: this.client,
nodeRoles: this.nodeRoles,
});
// build a indexTypesMap from the info present in tye typeRegistry, e.g.:
// {
// '.kibana': ['typeA', 'typeB', ...]
// '.kibana_task_manager': ['task', ...]
// '.kibana_cases': ['typeC', 'typeD', ...]
// ...
// }
const indexTypesMap = indexMapToIndexTypesMap(indexMap);
// compare indexTypesMap with the one present (or not) in the .kibana index meta
// and check if some SO types have been moved to different indices
const indicesWithMovingTypes = await getIndicesInvolvedInRelocation({
mainIndex: this.kibanaIndex,
client: this.client,
indexTypesMap,
logger: this.log,
defaultIndexTypesMap: this.defaultIndexTypesMap,
});
// we create 2 synchronization objects (2 synchronization points) for each of the
// migrators involved in relocations, aka each of the migrators that will:
// A) reindex some documents TO other indices
// B) receive some documents FROM other indices
// C) both
const readyToReindexDefers = createMultiPromiseDefer(indicesWithMovingTypes);
const doneReindexingDefers = createMultiPromiseDefer(indicesWithMovingTypes);
// build a list of all migrators that must be started
const migratorIndices = new Set(Object.keys(indexMap));
// indices involved in a relocation might no longer be present in current mappings
// but if their SOs must be relocated to another index, we still need a migrator to do the job
indicesWithMovingTypes.forEach((index) => migratorIndices.add(index));
const migrators = Array.from(migratorIndices).map((indexName, i) => {
return {
migrate: (): Promise<MigrationResult> => {
const readyToReindex = readyToReindexDefers[indexName];
const doneReindexing = doneReindexingDefers[indexName];
// check if this migrator's index is involved in some document redistribution
const mustRelocateDocuments = !!readyToReindex;
return runResilientMigrator({
client: this.client,
kibanaVersion: this.kibanaVersion,
mustRelocateDocuments,
indexTypesMap,
waitForMigrationCompletion: this.waitForMigrationCompletion,
// a migrator's index might no longer have any associated types to it
targetMappings: buildActiveMappings(indexMap[indexName]?.typeMappings ?? {}),
logger: this.log,
preMigrationScript: indexMap[indexName]?.script,
readyToReindex,
doneReindexing,
transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) =>
migrateRawDocsSafely({
serializer: this.serializer,
migrateDoc: this.documentMigrator.migrateAndConvert,
rawDocs,
}),
coreMigrationVersionPerType: this.documentMigrator.getMigrationVersion({
includeDeferred: false,
migrationType: 'core',
}),
migrationVersionPerType: this.documentMigrator.getMigrationVersion({
includeDeferred: false,
}),
indexPrefix: indexName,
migrationsConfig: this.soMigrationsConfig,
typeRegistry: this.typeRegistry,
docLinks: this.docLinks,
});
},
};
});
return Promise.all(migrators.map((migrator) => migrator.migrate()));
} else {
return runV2Migration({
kibanaVersion: this.kibanaVersion,
kibanaIndexPrefix: this.kibanaIndex,
typeRegistry: this.typeRegistry,
defaultIndexTypesMap: this.defaultIndexTypesMap,
logger: this.log,
documentMigrator: this.documentMigrator,
migrationConfig: this.soMigrationsConfig,
docLinks: this.docLinks,
serializer: this.serializer,
elasticsearchClient: this.client,
mappingProperties: this.mappingProperties,
waitForMigrationCompletion: this.waitForMigrationCompletion,
});
}
}
public getActiveMappings(): IndexMapping {

View file

@ -6,8 +6,6 @@
* Side Public License, v 1.
*/
import type { IndexTypesMap } from '@kbn/core-saved-objects-base-server-internal';
export enum TypeStatus {
Added = 'added',
Removed = 'removed',
@ -24,108 +22,3 @@ export interface TypeStatusDetails {
// ensure plugins don't try to convert SO namespaceTypes after 8.0.0
// see https://github.com/elastic/kibana/issues/147344
export const ALLOWED_CONVERT_VERSION = '8.0.0';
export const DEFAULT_INDEX_TYPES_MAP: IndexTypesMap = {
'.kibana_task_manager': ['task'],
'.kibana': [
'action',
'action_task_params',
'alert',
'api_key_pending_invalidation',
'apm-indices',
'apm-server-schema',
'apm-service-group',
'apm-telemetry',
'app_search_telemetry',
'application_usage_daily',
'application_usage_totals',
'book',
'canvas-element',
'canvas-workpad',
'canvas-workpad-template',
'cases',
'cases-comments',
'cases-configure',
'cases-connector-mappings',
'cases-telemetry',
'cases-user-actions',
'config',
'config-global',
'connector_token',
'core-usage-stats',
'csp-rule-template',
'dashboard',
'endpoint:user-artifact',
'endpoint:user-artifact-manifest',
'enterprise_search_telemetry',
'epm-packages',
'epm-packages-assets',
'event_loop_delays_daily',
'exception-list',
'exception-list-agnostic',
'file',
'file-upload-usage-collection-telemetry',
'fileShare',
'fleet-fleet-server-host',
'fleet-message-signing-keys',
'fleet-preconfiguration-deletion-record',
'fleet-proxy',
'graph-workspace',
'guided-onboarding-guide-state',
'guided-onboarding-plugin-state',
'index-pattern',
'infrastructure-monitoring-log-view',
'infrastructure-ui-source',
'ingest-agent-policies',
'ingest-download-sources',
'ingest-outputs',
'ingest-package-policies',
'ingest_manager_settings',
'inventory-view',
'kql-telemetry',
'legacy-url-alias',
'lens',
'lens-ui-telemetry',
'map',
'metrics-explorer-view',
'ml-job',
'ml-module',
'ml-trained-model',
'monitoring-telemetry',
'osquery-manager-usage-metric',
'osquery-pack',
'osquery-pack-asset',
'osquery-saved-query',
'query',
'rules-settings',
'sample-data-telemetry',
'search',
'search-session',
'search-telemetry',
'searchableList',
'security-rule',
'security-solution-signals-migration',
'siem-detection-engine-rule-actions',
'siem-ui-timeline',
'siem-ui-timeline-note',
'siem-ui-timeline-pinned-event',
'slo',
'space',
'spaces-usage-stats',
'synthetics-monitor',
'synthetics-param',
'synthetics-privates-locations',
'tag',
'telemetry',
'todo',
'ui-metric',
'upgrade-assistant-ml-upgrade-operation',
'upgrade-assistant-reindex-operation',
'uptime-dynamic-settings',
'uptime-synthetics-api-key',
'url',
'usage-counters',
'visualization',
'workplace_search_telemetry',
],
};

View file

@ -3147,43 +3147,6 @@ export const INDEX_MAP_BEFORE_SPLIT: IndexMap = {
},
},
},
'endpoint:user-artifact': {
properties: {
identifier: {
type: 'keyword',
},
compressionAlgorithm: {
type: 'keyword',
index: false,
},
encryptionAlgorithm: {
type: 'keyword',
index: false,
},
encodedSha256: {
type: 'keyword',
},
encodedSize: {
type: 'long',
index: false,
},
decodedSha256: {
type: 'keyword',
index: false,
},
decodedSize: {
type: 'long',
index: false,
},
created: {
type: 'date',
index: false,
},
body: {
type: 'binary',
},
},
},
'endpoint:user-artifact-manifest': {
properties: {
created: {

View file

@ -9,10 +9,12 @@
import { errors } from '@elastic/elasticsearch';
import type { IndicesGetMappingResponse } from '@elastic/elasticsearch/lib/api/types';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import type { IndexTypesMap } from '@kbn/core-saved-objects-base-server-internal';
import {
DEFAULT_INDEX_TYPES_MAP,
type IndexTypesMap,
} from '@kbn/core-saved-objects-base-server-internal';
import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import { loggerMock } from '@kbn/logging-mocks';
import { DEFAULT_INDEX_TYPES_MAP } from './kibana_migrator_constants';
import {
calculateTypeStatuses,
createMultiPromiseDefer,

View file

@ -10,7 +10,10 @@ import { chain } from 'lodash';
import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import {
DEFAULT_INDEX_TYPES_MAP,
type IndexMapping,
} from '@kbn/core-saved-objects-base-server-internal';
import type {
BaseState,
CalculateExcludeFiltersState,
@ -60,7 +63,6 @@ import type { ResponseType } from '../next';
import { createInitialProgress } from './progress';
import { model } from './model';
import type { BulkIndexOperationTuple, BulkOperation } from './create_batches';
import { DEFAULT_INDEX_TYPES_MAP } from '../kibana_migrator_constants';
describe('migrations v2 model', () => {
const indexMapping: IndexMapping = {

View file

@ -124,7 +124,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
const laterVersionAlias = hasLaterVersionAlias(stateP.kibanaVersion, aliases);
if (
// `.kibana_<version>` alias exists, and refers to a later version of Kibana
// a `.kibana_<version>` alias exist, which refers to a later version of Kibana
// e.g. `.kibana_8.7.0` exists, and current stack version is 8.6.1
// see https://github.com/elastic/kibana/issues/155136
laterVersionAlias

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { SavedObjectTypeRegistry } from '@kbn/core-saved-objects-base-server-internal';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
export const createRegistry = (types: Array<Partial<SavedObjectsType>>) => {
const registry = new SavedObjectTypeRegistry();
types.forEach((type) =>
registry.registerType({
name: 'unknown',
hidden: false,
namespaceType: 'single',
mappings: {
properties: {
name: { type: 'keyword' },
},
},
migrations: {},
...type,
})
);
return registry;
};
export const indexTypesMapMock = {
'.my_index': ['testtype', 'testtype2'],
'.task_index': ['testtasktype'],
'.complementary_index': ['testtype3'],
};
export const savedObjectTypeRegistryMock = createRegistry([
// typeRegistry depicts an updated index map:
// .my_index: ['testtype', 'testtype3'],
// .other_index: ['testtype2'],
// .task_index': ['testtasktype'],
{
name: 'testtype',
migrations: { '8.2.3': jest.fn().mockImplementation((doc) => doc) },
},
{
name: 'testtype2',
// We are moving 'testtype2' from '.my_index' to '.other_index'
indexPattern: '.other_index',
},
{
name: 'testtasktype',
indexPattern: '.task_index',
},
{
// We are moving 'testtype3' from '.complementary_index' to '.my_index'
name: 'testtype3',
},
]);

View file

@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import buffer from 'buffer';
import { ByteSizeValue } from '@kbn/config-schema';
import { docLinksServiceMock } from '@kbn/core-doc-links-server-mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import type { MigrationResult } from '@kbn/core-saved-objects-base-server-internal';
import { createInitialState } from './initial_state';
import { Defer } from './kibana_migrator_utils';
import { migrationStateActionMachine } from './migrations_state_action_machine';
import { next } from './next';
import { runResilientMigrator, type RunResilientMigratorParams } from './run_resilient_migrator';
import { indexTypesMapMock, savedObjectTypeRegistryMock } from './run_resilient_migrator.fixtures';
import type { InitState, State } from './state';
import type { Next } from './state_action_machine';
const SOME_MIGRATION_RESULT: MigrationResult = {
sourceIndex: '.my_index_pre8.2.3_001',
destIndex: '.my_index_8.2.3_001',
elapsedMs: 16,
status: 'migrated',
};
jest.mock('./migrations_state_action_machine', () => {
const actual = jest.requireActual('./migrations_state_action_machine');
return {
...actual,
migrationStateActionMachine: jest.fn(() => Promise.resolve(SOME_MIGRATION_RESULT)),
};
});
jest.mock('./initial_state', () => {
const actual = jest.requireActual('./initial_state');
return {
...actual,
createInitialState: jest.fn(actual.createInitialState),
};
});
jest.mock('./next', () => {
const actual = jest.requireActual('./next');
return {
...actual,
next: jest.fn(actual.next),
};
});
describe('runResilientMigrator', () => {
let options: RunResilientMigratorParams;
let initialState: InitState;
let migrationResult: MigrationResult;
let nextFunc: Next<State>;
beforeAll(async () => {
options = mockOptions();
migrationResult = await runResilientMigrator(options);
});
it('calls createInitialState with the right params', () => {
expect(createInitialState).toHaveBeenCalledTimes(1);
expect(createInitialState).toHaveBeenCalledWith({
kibanaVersion: options.kibanaVersion,
waitForMigrationCompletion: options.waitForMigrationCompletion,
mustRelocateDocuments: options.mustRelocateDocuments,
indexTypesMap: options.indexTypesMap,
targetMappings: options.targetMappings,
preMigrationScript: options.preMigrationScript,
migrationVersionPerType: options.migrationVersionPerType,
coreMigrationVersionPerType: options.coreMigrationVersionPerType,
indexPrefix: options.indexPrefix,
migrationsConfig: options.migrationsConfig,
typeRegistry: options.typeRegistry,
docLinks: options.docLinks,
logger: options.logger,
});
// store the created initial state
initialState = (createInitialState as jest.MockedFunction<typeof createInitialState>).mock
.results[0].value;
// store the generated "next" function
nextFunc = (next as jest.MockedFunction<typeof next>).mock.results[0].value;
});
it('calls migrationStateMachine with the right params', () => {
expect(migrationStateActionMachine).toHaveBeenCalledTimes(1);
expect(migrationStateActionMachine).toHaveBeenCalledWith({
initialState,
logger: options.logger,
next: nextFunc,
model: expect.any(Function),
abort: expect.any(Function),
});
});
it('returns the result of migrationStateMachine', () => {
expect(migrationResult).toEqual(SOME_MIGRATION_RESULT);
});
});
const mockOptions = (): RunResilientMigratorParams => {
const logger = loggingSystemMock.create().get();
const mockedClient = elasticsearchClientMock.createElasticsearchClient();
(mockedClient as any).child = jest.fn().mockImplementation(() => mockedClient);
return {
client: mockedClient,
kibanaVersion: '8.8.0',
waitForMigrationCompletion: false,
mustRelocateDocuments: true,
indexTypesMap: indexTypesMapMock,
targetMappings: {
properties: {
a: { type: 'keyword' },
c: { type: 'long' },
},
_meta: {
migrationMappingPropertyHashes: {
a: '000',
c: '222',
},
},
},
readyToReindex: new Defer(),
doneReindexing: new Defer(),
logger,
transformRawDocs: jest.fn(),
preMigrationScript: "ctx._id = ctx._source.type + ':' + ctx._id",
migrationVersionPerType: { my_dashboard: '7.10.1', my_viz: '8.0.0' },
coreMigrationVersionPerType: {},
indexPrefix: '.my_index',
migrationsConfig: {
algorithm: 'v2' as const,
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
maxReadBatchSizeBytes: new ByteSizeValue(buffer.constants.MAX_STRING_LENGTH),
pollInterval: 20000,
scrollDuration: '10m',
skip: false,
retryAttempts: 20,
zdt: {
metaPickupSyncDelaySec: 120,
runOnNonMigratorNodes: true,
},
},
typeRegistry: savedObjectTypeRegistryMock,
docLinks: docLinksServiceMock.createSetupContract(),
};
};

View file

@ -40,6 +40,26 @@ import type { State } from './state';
*/
export const MIGRATION_CLIENT_OPTIONS = { maxRetries: 0, requestTimeout: 120_000 };
export interface RunResilientMigratorParams {
client: ElasticsearchClient;
kibanaVersion: string;
waitForMigrationCompletion: boolean;
mustRelocateDocuments: boolean;
indexTypesMap: IndexTypesMap;
targetMappings: IndexMapping;
preMigrationScript?: string;
readyToReindex: Defer<any>;
doneReindexing: Defer<any>;
logger: Logger;
transformRawDocs: TransformRawDocs;
coreMigrationVersionPerType: SavedObjectsMigrationVersion;
migrationVersionPerType: SavedObjectsMigrationVersion;
indexPrefix: string;
migrationsConfig: SavedObjectsMigrationConfigType;
typeRegistry: ISavedObjectTypeRegistry;
docLinks: DocLinksServiceStart;
}
/**
* Migrates the provided indexPrefix index using a resilient algorithm that is
* completely lock-free so that any failure can always be retried by
@ -63,25 +83,7 @@ export async function runResilientMigrator({
migrationsConfig,
typeRegistry,
docLinks,
}: {
client: ElasticsearchClient;
kibanaVersion: string;
waitForMigrationCompletion: boolean;
mustRelocateDocuments: boolean;
indexTypesMap: IndexTypesMap;
targetMappings: IndexMapping;
preMigrationScript?: string;
readyToReindex: Defer<any>;
doneReindexing: Defer<any>;
logger: Logger;
transformRawDocs: TransformRawDocs;
coreMigrationVersionPerType: SavedObjectsMigrationVersion;
migrationVersionPerType: SavedObjectsMigrationVersion;
indexPrefix: string;
migrationsConfig: SavedObjectsMigrationConfigType;
typeRegistry: ISavedObjectTypeRegistry;
docLinks: DocLinksServiceStart;
}): Promise<MigrationResult> {
}: RunResilientMigratorParams): Promise<MigrationResult> {
const initialState = createInitialState({
kibanaVersion,
waitForMigrationCompletion,

View file

@ -0,0 +1,273 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import buffer from 'buffer';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import {
type MigrationResult,
SavedObjectsSerializer,
} from '@kbn/core-saved-objects-base-server-internal';
import { ByteSizeValue } from '@kbn/config-schema';
import { docLinksServiceMock } from '@kbn/core-doc-links-server-mocks';
import { runV2Migration, RunV2MigrationOpts } from './run_v2_migration';
import { DocumentMigrator } from './document_migrator';
import { ALLOWED_CONVERT_VERSION } from './kibana_migrator_constants';
import { buildTypesMappings, createIndexMap } from './core';
import {
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
createMultiPromiseDefer,
Defer,
} from './kibana_migrator_utils';
import { runResilientMigrator } from './run_resilient_migrator';
import { indexTypesMapMock, savedObjectTypeRegistryMock } from './run_resilient_migrator.fixtures';
jest.mock('./core', () => {
const actual = jest.requireActual('./core');
return {
...actual,
createIndexMap: jest.fn(actual.createIndexMap),
};
});
jest.mock('./kibana_migrator_utils', () => {
const actual = jest.requireActual('./kibana_migrator_utils');
return {
...actual,
indexMapToIndexTypesMap: jest.fn(actual.indexMapToIndexTypesMap),
createMultiPromiseDefer: jest.fn(actual.createMultiPromiseDefer),
getIndicesInvolvedInRelocation: jest.fn(() => Promise.resolve(['.my_index', '.other_index'])),
};
});
const V2_SUCCESSFUL_MIGRATION_RESULT: MigrationResult[] = [
{
sourceIndex: '.my_index_pre8.2.3_001',
destIndex: '.my_index_8.2.3_001',
elapsedMs: 16,
status: 'migrated',
},
{
sourceIndex: '.other_index_pre8.2.3_001',
destIndex: '.other_index_8.2.3_001',
elapsedMs: 8,
status: 'migrated',
},
{
destIndex: '.task_index_8.2.3_001',
elapsedMs: 4,
status: 'patched',
},
];
jest.mock('./run_resilient_migrator', () => {
const actual = jest.requireActual('./run_resilient_migrator');
return {
...actual,
runResilientMigrator: jest.fn(() => Promise.resolve(V2_SUCCESSFUL_MIGRATION_RESULT)),
};
});
const nextTick = () => new Promise((resolve) => setImmediate(resolve));
const mockCreateIndexMap = createIndexMap as jest.MockedFunction<typeof createIndexMap>;
const mockIndexMapToIndexTypesMap = indexMapToIndexTypesMap as jest.MockedFunction<
typeof indexMapToIndexTypesMap
>;
const mockCreateMultiPromiseDefer = createMultiPromiseDefer as jest.MockedFunction<
typeof createMultiPromiseDefer
>;
const mockGetIndicesInvolvedInRelocation = getIndicesInvolvedInRelocation as jest.MockedFunction<
typeof getIndicesInvolvedInRelocation
>;
const mockRunResilientMigrator = runResilientMigrator as jest.MockedFunction<
typeof runResilientMigrator
>;
describe('runV2Migration', () => {
beforeEach(() => {
mockCreateIndexMap.mockClear();
mockIndexMapToIndexTypesMap.mockClear();
mockCreateMultiPromiseDefer.mockClear();
mockGetIndicesInvolvedInRelocation.mockClear();
mockRunResilientMigrator.mockClear();
});
it('rejects if prepare migrations has not been called on the documentMigrator', async () => {
const options = mockOptions();
await expect(runV2Migration(options)).rejects.toEqual(
new Error('Migrations are not ready. Make sure prepareMigrations is called first.')
);
});
it('calls createIndexMap with the right params', async () => {
const options = mockOptions();
options.documentMigrator.prepareMigrations();
await runV2Migration(options);
expect(createIndexMap).toBeCalledTimes(1);
expect(createIndexMap).toBeCalledWith({
kibanaIndexName: options.kibanaIndexPrefix,
indexMap: options.mappingProperties,
registry: options.typeRegistry,
});
});
it('calls indexMapToIndexTypesMap with the result from createIndexMap', async () => {
const options = mockOptions();
options.documentMigrator.prepareMigrations();
await runV2Migration(options);
expect(indexMapToIndexTypesMap).toBeCalledTimes(1);
expect(indexMapToIndexTypesMap).toBeCalledWith(mockCreateIndexMap.mock.results[0].value);
});
it('calls getIndicesInvolvedInRelocation with the right params', async () => {
const options = mockOptions();
options.documentMigrator.prepareMigrations();
await runV2Migration(options);
expect(getIndicesInvolvedInRelocation).toBeCalledTimes(1);
expect(getIndicesInvolvedInRelocation).toBeCalledWith(
expect.objectContaining({
client: options.elasticsearchClient,
indexTypesMap: mockIndexMapToIndexTypesMap.mock.results[0].value,
logger: options.logger,
})
);
});
it('calls createMultiPromiseDefer, with the list of moving indices', async () => {
const options = mockOptions();
options.documentMigrator.prepareMigrations();
await runV2Migration(options);
expect(createMultiPromiseDefer).toBeCalledTimes(2);
expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']);
expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']);
});
it('calls runResilientMigrator for each migrator it must spawn', async () => {
const options = mockOptions();
options.documentMigrator.prepareMigrations();
await runV2Migration(options);
expect(runResilientMigrator).toHaveBeenCalledTimes(3);
const runResilientMigratorCommonParams = {
client: options.elasticsearchClient,
kibanaVersion: options.kibanaVersion,
logger: options.logger,
migrationsConfig: options.migrationConfig,
typeRegistry: options.typeRegistry,
};
expect(runResilientMigrator).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
...runResilientMigratorCommonParams,
indexPrefix: '.my_index',
mustRelocateDocuments: true,
readyToReindex: expect.any(Object),
doneReindexing: expect.any(Object),
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
...runResilientMigratorCommonParams,
indexPrefix: '.other_index',
mustRelocateDocuments: true,
readyToReindex: expect.any(Object),
doneReindexing: expect.any(Object),
})
);
expect(runResilientMigrator).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
...runResilientMigratorCommonParams,
indexPrefix: '.task_index',
mustRelocateDocuments: false,
readyToReindex: undefined,
doneReindexing: undefined,
})
);
});
it('awaits on all runResilientMigrator promises, and resolves with the results of each of them', async () => {
const myIndexMigratorDefer = new Defer<MigrationResult>();
const otherIndexMigratorDefer = new Defer();
const taskIndexMigratorDefer = new Defer();
let migrationResults: MigrationResult[] | undefined;
mockRunResilientMigrator.mockReturnValueOnce(myIndexMigratorDefer.promise);
mockRunResilientMigrator.mockReturnValueOnce(otherIndexMigratorDefer.promise);
mockRunResilientMigrator.mockReturnValueOnce(taskIndexMigratorDefer.promise);
const options = mockOptions();
options.documentMigrator.prepareMigrations();
runV2Migration(options).then((results) => (migrationResults = results));
await nextTick();
expect(migrationResults).toBeUndefined();
myIndexMigratorDefer.resolve(V2_SUCCESSFUL_MIGRATION_RESULT[0]);
otherIndexMigratorDefer.resolve(V2_SUCCESSFUL_MIGRATION_RESULT[1]);
await nextTick();
expect(migrationResults).toBeUndefined();
taskIndexMigratorDefer.resolve(V2_SUCCESSFUL_MIGRATION_RESULT[2]);
await nextTick();
expect(migrationResults).toEqual(V2_SUCCESSFUL_MIGRATION_RESULT);
});
it('rejects if one of the runResilientMigrator promises rejects', async () => {
mockRunResilientMigrator.mockResolvedValueOnce(V2_SUCCESSFUL_MIGRATION_RESULT[0]);
mockRunResilientMigrator.mockResolvedValueOnce(V2_SUCCESSFUL_MIGRATION_RESULT[1]);
const myTaskIndexMigratorError = new Error(
'Something terrible and unexpected happened whilst tyring to migrate .task_index'
);
mockRunResilientMigrator.mockRejectedValueOnce(myTaskIndexMigratorError);
const options = mockOptions();
options.documentMigrator.prepareMigrations();
await expect(runV2Migration(options)).rejects.toThrowError(myTaskIndexMigratorError);
});
});
const mockOptions = (kibanaVersion = '8.2.3'): RunV2MigrationOpts => {
const mockedClient = elasticsearchClientMock.createElasticsearchClient();
(mockedClient as any).child = jest.fn().mockImplementation(() => mockedClient);
const typeRegistry = savedObjectTypeRegistryMock;
const logger = loggingSystemMock.create().get();
return {
logger,
kibanaVersion,
waitForMigrationCompletion: false,
typeRegistry,
kibanaIndexPrefix: '.my_index',
defaultIndexTypesMap: indexTypesMapMock,
migrationConfig: {
algorithm: 'v2' as const,
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
maxReadBatchSizeBytes: new ByteSizeValue(buffer.constants.MAX_STRING_LENGTH),
pollInterval: 20000,
scrollDuration: '10m',
skip: false,
retryAttempts: 20,
zdt: {
metaPickupSyncDelaySec: 120,
runOnNonMigratorNodes: true,
},
},
elasticsearchClient: mockedClient,
docLinks: docLinksServiceMock.createSetupContract(),
documentMigrator: new DocumentMigrator({
kibanaVersion,
convertVersion: ALLOWED_CONVERT_VERSION,
typeRegistry,
log: logger,
}),
serializer: new SavedObjectsSerializer(typeRegistry),
mappingProperties: buildTypesMappings(typeRegistry.getAllTypes()),
};
};

View file

@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ISavedObjectTypeRegistry,
ISavedObjectsSerializer,
SavedObjectsRawDoc,
} from '@kbn/core-saved-objects-server';
import type {
IndexTypesMap,
MigrationResult,
SavedObjectsMigrationConfigType,
SavedObjectsTypeMappingDefinitions,
} from '@kbn/core-saved-objects-base-server-internal';
import Semver from 'semver';
import type { DocumentMigrator } from './document_migrator';
import { buildActiveMappings, createIndexMap } from './core';
import {
createMultiPromiseDefer,
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
} from './kibana_migrator_utils';
import { runResilientMigrator } from './run_resilient_migrator';
import { migrateRawDocsSafely } from './core/migrate_raw_docs';
export interface RunV2MigrationOpts {
/** The current Kibana version */
kibanaVersion: string;
/** The default Kibana SavedObjects index prefix. e.g `.kibana` */
kibanaIndexPrefix: string;
/** The SO type registry to use for the migration */
typeRegistry: ISavedObjectTypeRegistry;
/** The map of indices => types to use as a default / baseline state */
defaultIndexTypesMap: IndexTypesMap;
/** Logger to use for migration output */
logger: Logger;
/** The document migrator to use to convert the document */
documentMigrator: DocumentMigrator;
/** docLinks contract to use to link to documentation */
docLinks: DocLinksServiceStart;
/** SO serializer to use for migration */
serializer: ISavedObjectsSerializer;
/** The client to use for communications with ES */
elasticsearchClient: ElasticsearchClient;
/** The configuration that drives the behavior of each migrator */
migrationConfig: SavedObjectsMigrationConfigType;
/** The definitions of the different saved object types */
mappingProperties: SavedObjectsTypeMappingDefinitions;
/** Tells whether this instance should actively participate in the migration or not */
waitForMigrationCompletion: boolean;
}
export const runV2Migration = async (options: RunV2MigrationOpts): Promise<MigrationResult[]> => {
const indexMap = createIndexMap({
kibanaIndexName: options.kibanaIndexPrefix,
indexMap: options.mappingProperties,
registry: options.typeRegistry,
});
options.logger.debug('Applying registered migrations for the following saved object types:');
Object.entries(options.documentMigrator.getMigrationVersion())
.sort(([t1, v1], [t2, v2]) => {
return Semver.compare(v1, v2);
})
.forEach(([type, migrationVersion]) => {
options.logger.debug(`migrationVersion: ${migrationVersion} saved object type: ${type}`);
});
// build a indexTypesMap from the info present in tye typeRegistry, e.g.:
// {
// '.kibana': ['typeA', 'typeB', ...]
// '.kibana_task_manager': ['task', ...]
// '.kibana_cases': ['typeC', 'typeD', ...]
// ...
// }
const indexTypesMap = indexMapToIndexTypesMap(indexMap);
// compare indexTypesMap with the one present (or not) in the .kibana index meta
// and check if some SO types have been moved to different indices
const indicesWithMovingTypes = await getIndicesInvolvedInRelocation({
mainIndex: options.kibanaIndexPrefix,
client: options.elasticsearchClient,
indexTypesMap,
logger: options.logger,
defaultIndexTypesMap: options.defaultIndexTypesMap,
});
// we create 2 synchronization objects (2 synchronization points) for each of the
// migrators involved in relocations, aka each of the migrators that will:
// A) reindex some documents TO other indices
// B) receive some documents FROM other indices
// C) both
const readyToReindexDefers = createMultiPromiseDefer(indicesWithMovingTypes);
const doneReindexingDefers = createMultiPromiseDefer(indicesWithMovingTypes);
// build a list of all migrators that must be started
const migratorIndices = new Set(Object.keys(indexMap));
// indices involved in a relocation might no longer be present in current mappings
// but if their SOs must be relocated to another index, we still need a migrator to do the job
indicesWithMovingTypes.forEach((index) => migratorIndices.add(index));
const migrators = Array.from(migratorIndices).map((indexName, i) => {
return {
migrate: (): Promise<MigrationResult> => {
const readyToReindex = readyToReindexDefers[indexName];
const doneReindexing = doneReindexingDefers[indexName];
// check if this migrator's index is involved in some document redistribution
const mustRelocateDocuments = !!readyToReindex;
return runResilientMigrator({
client: options.elasticsearchClient,
kibanaVersion: options.kibanaVersion,
mustRelocateDocuments,
indexTypesMap,
waitForMigrationCompletion: options.waitForMigrationCompletion,
// a migrator's index might no longer have any associated types to it
targetMappings: buildActiveMappings(indexMap[indexName]?.typeMappings ?? {}),
logger: options.logger,
preMigrationScript: indexMap[indexName]?.script,
readyToReindex,
doneReindexing,
transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) =>
migrateRawDocsSafely({
serializer: options.serializer,
migrateDoc: options.documentMigrator.migrateAndConvert,
rawDocs,
}),
coreMigrationVersionPerType: options.documentMigrator.getMigrationVersion({
includeDeferred: false,
migrationType: 'core',
}),
migrationVersionPerType: options.documentMigrator.getMigrationVersion({
includeDeferred: false,
}),
indexPrefix: indexName,
migrationsConfig: options.migrationConfig,
typeRegistry: options.typeRegistry,
docLinks: options.docLinks,
});
},
};
});
return Promise.all(migrators.map((migrator) => migrator.migrate()));
};

View file

@ -38,6 +38,7 @@ import {
type SavedObjectsConfigType,
type SavedObjectsMigrationConfigType,
type IKibanaMigrator,
DEFAULT_INDEX_TYPES_MAP,
} from '@kbn/core-saved-objects-base-server-internal';
import {
SavedObjectsClient,
@ -376,6 +377,7 @@ export class SavedObjectsService
kibanaVersion: this.kibanaVersion,
soMigrationsConfig,
kibanaIndex: MAIN_SAVED_OBJECT_INDEX,
defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP,
client,
docLinks,
waitForMigrationCompletion,

View file

@ -15,6 +15,7 @@ import { REPO_ROOT } from '@kbn/repo-info';
import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
import { DEFAULT_INDEX_TYPES_MAP } from '@kbn/core-saved-objects-base-server-internal';
import {
defaultLogFilePath,
getEsClient,
@ -113,6 +114,7 @@ describe('multiple migrator instances running in parallel', () => {
getKibanaMigratorTestKit({
...config,
logFilePath: Path.join(__dirname, `active_delete_instance_${index}.log`),
defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP,
})
)
);

View file

@ -13,6 +13,7 @@ import {
type SavedObjectsType,
MAIN_SAVED_OBJECT_INDEX,
} from '@kbn/core-saved-objects-server';
import { DEFAULT_INDEX_TYPES_MAP } from '@kbn/core-saved-objects-base-server-internal';
import {
clearLog,
startElasticsearch,
@ -80,6 +81,7 @@ describe('split .kibana index into multiple system indices', () => {
types: updatedTypeRegistry.getAllTypes(),
kibanaIndex: '.kibana',
logFilePath,
defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP,
});
const { runMigrations, client } = await migratorTestKitFactory();

View file

@ -24,6 +24,7 @@ import {
SavedObjectTypeRegistry,
type IKibanaMigrator,
type MigrationResult,
type IndexTypesMap,
} from '@kbn/core-saved-objects-base-server-internal';
import { SavedObjectsRepository } from '@kbn/core-saved-objects-api-server-internal';
import {
@ -73,6 +74,7 @@ export interface KibanaMigratorTestKitParams {
nodeRoles?: NodeRoles;
settings?: Record<string, any>;
types?: Array<SavedObjectsType<any>>;
defaultIndexTypesMap?: IndexTypesMap;
logFilePath?: string;
}
@ -126,6 +128,7 @@ export const getEsClient = async ({
export const getKibanaMigratorTestKit = async ({
settings = {},
kibanaIndex = defaultKibanaIndex,
defaultIndexTypesMap = {}, // do NOT assume any types are stored in any index by default
kibanaVersion = currentVersion,
kibanaBranch = currentBranch,
types = [],
@ -149,16 +152,17 @@ export const getKibanaMigratorTestKit = async ({
// types must be registered before instantiating the migrator
registerTypes(typeRegistry, types);
const migrator = await getMigrator(
const migrator = await getMigrator({
configService,
client,
typeRegistry,
loggerFactory,
kibanaIndex,
defaultIndexTypesMap,
kibanaVersion,
kibanaBranch,
nodeRoles
);
nodeRoles,
});
const runMigrations = async () => {
if (hasRun) {
@ -261,16 +265,28 @@ const getElasticsearchClient = async (
});
};
const getMigrator = async (
configService: ConfigService,
client: ElasticsearchClient,
typeRegistry: ISavedObjectTypeRegistry,
loggerFactory: LoggerFactory,
kibanaIndex: string,
kibanaVersion: string,
kibanaBranch: string,
nodeRoles: NodeRoles
) => {
interface GetMigratorParams {
configService: ConfigService;
client: ElasticsearchClient;
kibanaIndex: string;
typeRegistry: ISavedObjectTypeRegistry;
defaultIndexTypesMap: IndexTypesMap;
loggerFactory: LoggerFactory;
kibanaVersion: string;
kibanaBranch: string;
nodeRoles: NodeRoles;
}
const getMigrator = async ({
configService,
client,
kibanaIndex,
typeRegistry,
defaultIndexTypesMap,
loggerFactory,
kibanaVersion,
kibanaBranch,
nodeRoles,
}: GetMigratorParams) => {
const savedObjectsConf = await firstValueFrom(
configService.atPath<SavedObjectsConfigType>('savedObjects')
);
@ -286,8 +302,9 @@ const getMigrator = async (
return new KibanaMigrator({
client,
typeRegistry,
kibanaIndex,
typeRegistry,
defaultIndexTypesMap,
soMigrationsConfig: soConfig.migration,
kibanaVersion,
logger: loggerFactory.get('savedobjects-service'),