mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[Migrations] Only pickup updated SO types when performing a compatible migration (#159962)
## Summary Tackles the first improvement described in https://github.com/elastic/kibana/issues/160038. When "picking up" the updated mappings, we add a "query" in order to select and update only the SO types that have been updated, compared to the previous version. We achieve this by comparing `migrationMappingPropertyHashes`; we compare the hashes stored in the `<soIndex>.mapping._meta.migrationMappingPropertyHashes` against the ones calculated from the definitions from the `typeRegistry`. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
cca2be4e1c
commit
6f87e1d696
18 changed files with 608 additions and 130 deletions
|
@ -11,7 +11,7 @@ const ROOT_FIELDS = [
|
|||
'namespaces',
|
||||
'type',
|
||||
'references',
|
||||
'migrationVersion',
|
||||
'migrationVersion', // deprecated, see https://github.com/elastic/kibana/pull/150075
|
||||
'coreMigrationVersion',
|
||||
'typeMigrationVersion',
|
||||
'managed',
|
||||
|
|
|
@ -8,22 +8,35 @@
|
|||
|
||||
import * as Either from 'fp-ts/lib/Either';
|
||||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import { checkTargetMappings } from './check_target_mappings';
|
||||
import { diffMappings } from '../core/build_active_mappings';
|
||||
import type { SavedObjectsMappingProperties } from '@kbn/core-saved-objects-server';
|
||||
import {
|
||||
checkTargetMappings,
|
||||
type ComparedMappingsChanged,
|
||||
type ComparedMappingsMatch,
|
||||
} from './check_target_mappings';
|
||||
import { getUpdatedHashes } from '../core/build_active_mappings';
|
||||
|
||||
jest.mock('../core/build_active_mappings');
|
||||
|
||||
const diffMappingsMock = diffMappings as jest.MockedFn<typeof diffMappings>;
|
||||
const getUpdatedHashesMock = getUpdatedHashes as jest.MockedFn<typeof getUpdatedHashes>;
|
||||
|
||||
const actualMappings: IndexMapping = {
|
||||
properties: {
|
||||
field: { type: 'integer' },
|
||||
},
|
||||
const indexTypes = ['type1', 'type2'];
|
||||
|
||||
const properties: SavedObjectsMappingProperties = {
|
||||
type1: { type: 'long' },
|
||||
type2: { type: 'long' },
|
||||
};
|
||||
|
||||
const migrationMappingPropertyHashes = {
|
||||
type1: 'type1Hash',
|
||||
type2: 'type2Hash',
|
||||
};
|
||||
|
||||
const expectedMappings: IndexMapping = {
|
||||
properties: {
|
||||
field: { type: 'long' },
|
||||
properties,
|
||||
dynamic: 'strict',
|
||||
_meta: {
|
||||
migrationMappingPropertyHashes,
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -32,48 +45,99 @@ describe('checkTargetMappings', () => {
|
|||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('returns match=false if source mappings are not defined', async () => {
|
||||
const task = checkTargetMappings({
|
||||
expectedMappings,
|
||||
describe('when actual mappings are incomplete', () => {
|
||||
it("returns 'actual_mappings_incomplete' if actual mappings are not defined", async () => {
|
||||
const task = checkTargetMappings({
|
||||
indexTypes,
|
||||
expectedMappings,
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(result).toEqual(Either.left({ type: 'actual_mappings_incomplete' as const }));
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(diffMappings).not.toHaveBeenCalled();
|
||||
expect(result).toEqual(Either.right({ match: false }));
|
||||
it("returns 'actual_mappings_incomplete' if actual mappings do not define _meta", async () => {
|
||||
const task = checkTargetMappings({
|
||||
indexTypes,
|
||||
expectedMappings,
|
||||
actualMappings: {
|
||||
properties,
|
||||
dynamic: 'strict',
|
||||
},
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(result).toEqual(Either.left({ type: 'actual_mappings_incomplete' as const }));
|
||||
});
|
||||
|
||||
it("returns 'actual_mappings_incomplete' if actual mappings do not define migrationMappingPropertyHashes", async () => {
|
||||
const task = checkTargetMappings({
|
||||
indexTypes,
|
||||
expectedMappings,
|
||||
actualMappings: {
|
||||
properties,
|
||||
dynamic: 'strict',
|
||||
_meta: {},
|
||||
},
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(result).toEqual(Either.left({ type: 'actual_mappings_incomplete' as const }));
|
||||
});
|
||||
|
||||
it("returns 'actual_mappings_incomplete' if actual mappings define a different value for 'dynamic' property", async () => {
|
||||
const task = checkTargetMappings({
|
||||
indexTypes,
|
||||
expectedMappings,
|
||||
actualMappings: {
|
||||
properties,
|
||||
dynamic: false,
|
||||
_meta: { migrationMappingPropertyHashes },
|
||||
},
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(result).toEqual(Either.left({ type: 'actual_mappings_incomplete' as const }));
|
||||
});
|
||||
});
|
||||
|
||||
it('calls diffMappings() with the source and target mappings', async () => {
|
||||
const task = checkTargetMappings({
|
||||
actualMappings,
|
||||
expectedMappings,
|
||||
describe('when actual mappings are complete', () => {
|
||||
describe('and mappings do not match', () => {
|
||||
it('returns the lists of changed root fields and types', async () => {
|
||||
const task = checkTargetMappings({
|
||||
indexTypes,
|
||||
expectedMappings,
|
||||
actualMappings: expectedMappings,
|
||||
});
|
||||
|
||||
getUpdatedHashesMock.mockReturnValueOnce(['type1', 'type2', 'someRootField']);
|
||||
|
||||
const result = await task();
|
||||
const expected: ComparedMappingsChanged = {
|
||||
type: 'compared_mappings_changed' as const,
|
||||
updatedRootFields: ['someRootField'],
|
||||
updatedTypes: ['type1', 'type2'],
|
||||
};
|
||||
expect(result).toEqual(Either.left(expected));
|
||||
});
|
||||
});
|
||||
|
||||
await task();
|
||||
expect(diffMappings).toHaveBeenCalledTimes(1);
|
||||
expect(diffMappings).toHaveBeenCalledWith(actualMappings, expectedMappings);
|
||||
});
|
||||
describe('and mappings match', () => {
|
||||
it('returns a compared_mappings_match response', async () => {
|
||||
const task = checkTargetMappings({
|
||||
indexTypes,
|
||||
expectedMappings,
|
||||
actualMappings: expectedMappings,
|
||||
});
|
||||
|
||||
it('returns match=true if diffMappings() match', async () => {
|
||||
diffMappingsMock.mockReturnValueOnce(undefined);
|
||||
getUpdatedHashesMock.mockReturnValueOnce([]);
|
||||
|
||||
const task = checkTargetMappings({
|
||||
actualMappings,
|
||||
expectedMappings,
|
||||
const result = await task();
|
||||
const expected: ComparedMappingsMatch = {
|
||||
type: 'compared_mappings_match' as const,
|
||||
};
|
||||
expect(result).toEqual(Either.right(expected));
|
||||
});
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(result).toEqual(Either.right({ match: true }));
|
||||
});
|
||||
|
||||
it('returns match=false if diffMappings() finds differences', async () => {
|
||||
diffMappingsMock.mockReturnValueOnce({ changedProp: 'field' });
|
||||
|
||||
const task = checkTargetMappings({
|
||||
actualMappings,
|
||||
expectedMappings,
|
||||
});
|
||||
|
||||
const result = await task();
|
||||
expect(result).toEqual(Either.right({ match: false }));
|
||||
});
|
||||
});
|
||||
|
|
|
@ -8,29 +8,62 @@
|
|||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
|
||||
import { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import { diffMappings } from '../core/build_active_mappings';
|
||||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import { getUpdatedHashes } from '../core/build_active_mappings';
|
||||
|
||||
/** @internal */
|
||||
export interface CheckTargetMappingsParams {
|
||||
indexTypes: string[];
|
||||
actualMappings?: IndexMapping;
|
||||
expectedMappings: IndexMapping;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface TargetMappingsCompareResult {
|
||||
match: boolean;
|
||||
export interface ComparedMappingsMatch {
|
||||
type: 'compared_mappings_match';
|
||||
}
|
||||
|
||||
export interface ActualMappingsIncomplete {
|
||||
type: 'actual_mappings_incomplete';
|
||||
}
|
||||
|
||||
export interface ComparedMappingsChanged {
|
||||
type: 'compared_mappings_changed';
|
||||
updatedRootFields: string[];
|
||||
updatedTypes: string[];
|
||||
}
|
||||
|
||||
export const checkTargetMappings =
|
||||
({
|
||||
indexTypes,
|
||||
actualMappings,
|
||||
expectedMappings,
|
||||
}: CheckTargetMappingsParams): TaskEither.TaskEither<never, TargetMappingsCompareResult> =>
|
||||
}: CheckTargetMappingsParams): TaskEither.TaskEither<
|
||||
ActualMappingsIncomplete | ComparedMappingsChanged,
|
||||
ComparedMappingsMatch
|
||||
> =>
|
||||
async () => {
|
||||
if (!actualMappings) {
|
||||
return Either.right({ match: false });
|
||||
if (
|
||||
!actualMappings?._meta?.migrationMappingPropertyHashes ||
|
||||
actualMappings.dynamic !== expectedMappings.dynamic
|
||||
) {
|
||||
return Either.left({ type: 'actual_mappings_incomplete' as const });
|
||||
}
|
||||
|
||||
const updatedHashes = getUpdatedHashes({
|
||||
actual: actualMappings,
|
||||
expected: expectedMappings,
|
||||
});
|
||||
|
||||
if (updatedHashes.length) {
|
||||
const updatedTypes = updatedHashes.filter((field) => indexTypes.includes(field));
|
||||
const updatedRootFields = updatedHashes.filter((field) => !indexTypes.includes(field));
|
||||
return Either.left({
|
||||
type: 'compared_mappings_changed' as const,
|
||||
updatedRootFields,
|
||||
updatedTypes,
|
||||
});
|
||||
} else {
|
||||
return Either.right({ type: 'compared_mappings_match' as const });
|
||||
}
|
||||
const diff = diffMappings(actualMappings, expectedMappings);
|
||||
return Either.right({ match: !diff });
|
||||
};
|
||||
|
|
|
@ -108,6 +108,7 @@ import type { UnknownDocsFound } from './check_for_unknown_docs';
|
|||
import type { IncompatibleClusterRoutingAllocation } from './initialize_action';
|
||||
import type { ClusterShardLimitExceeded } from './create_index';
|
||||
import type { SynchronizationFailed } from './synchronize_migrators';
|
||||
import type { ActualMappingsIncomplete, ComparedMappingsChanged } from './check_target_mappings';
|
||||
|
||||
export type {
|
||||
CheckForUnknownDocsParams,
|
||||
|
@ -176,6 +177,8 @@ export interface ActionErrorTypeMap {
|
|||
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
|
||||
es_response_too_large: EsResponseTooLargeError;
|
||||
synchronization_failed: SynchronizationFailed;
|
||||
actual_mappings_incomplete: ActualMappingsIncomplete;
|
||||
compared_mappings_changed: ComparedMappingsChanged;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
import * as Either from 'fp-ts/lib/Either';
|
||||
import * as TaskEither from 'fp-ts/lib/TaskEither';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
|
||||
import {
|
||||
catchRetryableEsClientErrors,
|
||||
type RetryableEsClientError,
|
||||
|
@ -20,7 +21,7 @@ export interface UpdateByQueryResponse {
|
|||
|
||||
/**
|
||||
* Pickup updated mappings by performing an update by query operation on all
|
||||
* documents in the index. Returns a task ID which can be
|
||||
* documents matching the passed in query. Returns a task ID which can be
|
||||
* tracked for progress.
|
||||
*
|
||||
* @remarks When mappings are updated to add a field which previously wasn't
|
||||
|
@ -35,7 +36,8 @@ export const pickupUpdatedMappings =
|
|||
(
|
||||
client: ElasticsearchClient,
|
||||
index: string,
|
||||
batchSize: number
|
||||
batchSize: number,
|
||||
query?: QueryDslQueryContainer
|
||||
): TaskEither.TaskEither<RetryableEsClientError, UpdateByQueryResponse> =>
|
||||
() => {
|
||||
return client
|
||||
|
@ -52,6 +54,8 @@ export const pickupUpdatedMappings =
|
|||
refresh: true,
|
||||
// Create a task and return task id instead of blocking until complete
|
||||
wait_for_completion: false,
|
||||
// Only update the documents that match the provided query
|
||||
query,
|
||||
})
|
||||
.then(({ task: taskId }) => {
|
||||
return Either.right({ taskId: String(taskId!) });
|
||||
|
|
|
@ -11,47 +11,76 @@ import { errors as EsErrors } from '@elastic/elasticsearch';
|
|||
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { updateAndPickupMappings } from './update_and_pickup_mappings';
|
||||
import { DEFAULT_TIMEOUT } from './constants';
|
||||
import { pickupUpdatedMappings } from './pickup_updated_mappings';
|
||||
|
||||
jest.mock('./catch_retryable_es_client_errors');
|
||||
jest.mock('./pickup_updated_mappings');
|
||||
|
||||
describe('updateAndPickupMappings', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
// Create a mock client that rejects all methods with a 503 status code
|
||||
// response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 503,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
describe('putMappingTask', () => {
|
||||
// Create a mock client that rejects all methods with a 503 status code
|
||||
// response.
|
||||
const retryableError = new EsErrors.ResponseError(
|
||||
elasticsearchClientMock.createApiResponse({
|
||||
statusCode: 503,
|
||||
body: { error: { type: 'es_type', reason: 'es_reason' } },
|
||||
})
|
||||
);
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
|
||||
);
|
||||
|
||||
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
|
||||
const task = updateAndPickupMappings({
|
||||
client,
|
||||
index: 'new_index',
|
||||
mappings: { properties: {} },
|
||||
batchSize: 1000,
|
||||
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
|
||||
const task = updateAndPickupMappings({
|
||||
client,
|
||||
index: 'new_index',
|
||||
mappings: { properties: {} },
|
||||
batchSize: 1000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
|
||||
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
|
||||
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
|
||||
});
|
||||
it('calls the indices.putMapping with the mapping properties as well as the _meta information', async () => {
|
||||
const task = updateAndPickupMappings({
|
||||
client,
|
||||
index: 'new_index',
|
||||
mappings: {
|
||||
properties: {
|
||||
'apm-indices': {
|
||||
type: 'object',
|
||||
dynamic: false,
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
migrationMappingPropertyHashes: {
|
||||
references: '7997cf5a56cc02bdc9c93361bde732b0',
|
||||
'epm-packages': '860e23f4404fa1c33f430e6dad5d8fa2',
|
||||
'cases-connector-mappings': '17d2e9e0e170a21a471285a5d845353c',
|
||||
},
|
||||
},
|
||||
},
|
||||
batchSize: 1000,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
|
||||
it('calls the indices.putMapping with the mapping properties as well as the _meta information', async () => {
|
||||
const task = updateAndPickupMappings({
|
||||
client,
|
||||
index: 'new_index',
|
||||
mappings: {
|
||||
expect(client.indices.putMapping).toHaveBeenCalledTimes(1);
|
||||
expect(client.indices.putMapping).toHaveBeenCalledWith({
|
||||
index: 'new_index',
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
properties: {
|
||||
'apm-indices': {
|
||||
type: 'object',
|
||||
|
@ -65,32 +94,47 @@ describe('updateAndPickupMappings', () => {
|
|||
'cases-connector-mappings': '17d2e9e0e170a21a471285a5d845353c',
|
||||
},
|
||||
},
|
||||
},
|
||||
batchSize: 1000,
|
||||
});
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
});
|
||||
|
||||
expect(client.indices.putMapping).toHaveBeenCalledTimes(1);
|
||||
expect(client.indices.putMapping).toHaveBeenCalledWith({
|
||||
index: 'new_index',
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
properties: {
|
||||
'apm-indices': {
|
||||
type: 'object',
|
||||
dynamic: false,
|
||||
describe('pickupUpdatedMappings', () => {
|
||||
const client = elasticsearchClientMock.createInternalClient(
|
||||
elasticsearchClientMock.createSuccessTransportRequestPromise({})
|
||||
);
|
||||
|
||||
it('calls pickupUpdatedMappings with the right parameters', async () => {
|
||||
const query = {
|
||||
bool: {
|
||||
should: [
|
||||
{
|
||||
term: {
|
||||
type: 'type1',
|
||||
},
|
||||
},
|
||||
{
|
||||
term: {
|
||||
type: 'type2',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
migrationMappingPropertyHashes: {
|
||||
references: '7997cf5a56cc02bdc9c93361bde732b0',
|
||||
'epm-packages': '860e23f4404fa1c33f430e6dad5d8fa2',
|
||||
'cases-connector-mappings': '17d2e9e0e170a21a471285a5d845353c',
|
||||
},
|
||||
},
|
||||
};
|
||||
const task = updateAndPickupMappings({
|
||||
client,
|
||||
index: 'new_index',
|
||||
mappings: { properties: {} },
|
||||
batchSize: 1000,
|
||||
query,
|
||||
});
|
||||
try {
|
||||
await task();
|
||||
} catch (e) {
|
||||
/** ignore */
|
||||
}
|
||||
|
||||
expect(pickupUpdatedMappings).toHaveBeenCalledTimes(1);
|
||||
expect(pickupUpdatedMappings).toHaveBeenCalledWith(client, 'new_index', 1000, query);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -11,6 +11,7 @@ import * as TaskEither from 'fp-ts/lib/TaskEither';
|
|||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
|
||||
import {
|
||||
catchRetryableEsClientErrors,
|
||||
type RetryableEsClientError,
|
||||
|
@ -29,6 +30,7 @@ export interface UpdateAndPickupMappingsParams {
|
|||
index: string;
|
||||
mappings: IndexMapping;
|
||||
batchSize: number;
|
||||
query?: QueryDslQueryContainer;
|
||||
}
|
||||
/**
|
||||
* Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping
|
||||
|
@ -39,6 +41,7 @@ export const updateAndPickupMappings = ({
|
|||
index,
|
||||
mappings,
|
||||
batchSize,
|
||||
query,
|
||||
}: UpdateAndPickupMappingsParams): TaskEither.TaskEither<
|
||||
RetryableEsClientError,
|
||||
UpdateAndPickupMappingsResponse
|
||||
|
@ -76,7 +79,7 @@ export const updateAndPickupMappings = ({
|
|||
return pipe(
|
||||
putMappingTask,
|
||||
TaskEither.chain((res) => {
|
||||
return pickupUpdatedMappings(client, index, batchSize);
|
||||
return pickupUpdatedMappings(client, index, batchSize, query);
|
||||
})
|
||||
);
|
||||
};
|
||||
|
|
|
@ -10,7 +10,7 @@ import type {
|
|||
IndexMapping,
|
||||
SavedObjectsTypeMappingDefinitions,
|
||||
} from '@kbn/core-saved-objects-base-server-internal';
|
||||
import { buildActiveMappings, diffMappings } from './build_active_mappings';
|
||||
import { buildActiveMappings, diffMappings, getUpdatedHashes } from './build_active_mappings';
|
||||
|
||||
describe('buildActiveMappings', () => {
|
||||
test('creates a strict mapping', () => {
|
||||
|
@ -208,3 +208,65 @@ describe('diffMappings', () => {
|
|||
expect(diffMappings(actual, expected)!.changedProp).toEqual('_meta');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getUpdatedHashes', () => {
|
||||
test('gives all hashes if _meta is missing from actual', () => {
|
||||
const actual: IndexMapping = {
|
||||
dynamic: 'strict',
|
||||
properties: {},
|
||||
};
|
||||
const expected: IndexMapping = {
|
||||
_meta: {
|
||||
migrationMappingPropertyHashes: { foo: 'bar', bar: 'baz' },
|
||||
},
|
||||
dynamic: 'strict',
|
||||
properties: {},
|
||||
};
|
||||
|
||||
expect(getUpdatedHashes({ actual, expected })).toEqual(['foo', 'bar']);
|
||||
});
|
||||
|
||||
test('gives all hashes if migrationMappingPropertyHashes is missing from actual', () => {
|
||||
const actual: IndexMapping = {
|
||||
dynamic: 'strict',
|
||||
properties: {},
|
||||
_meta: {},
|
||||
};
|
||||
const expected: IndexMapping = {
|
||||
_meta: {
|
||||
migrationMappingPropertyHashes: { foo: 'bar', bar: 'baz' },
|
||||
},
|
||||
dynamic: 'strict',
|
||||
properties: {},
|
||||
};
|
||||
|
||||
expect(getUpdatedHashes({ actual, expected })).toEqual(['foo', 'bar']);
|
||||
});
|
||||
|
||||
test('gives a list of the types with updated hashes', () => {
|
||||
const actual: IndexMapping = {
|
||||
dynamic: 'strict',
|
||||
properties: {},
|
||||
_meta: {
|
||||
migrationMappingPropertyHashes: {
|
||||
type1: 'type1hash1',
|
||||
type2: 'type2hash1',
|
||||
type3: 'type3hash1', // will be removed
|
||||
},
|
||||
},
|
||||
};
|
||||
const expected: IndexMapping = {
|
||||
dynamic: 'strict',
|
||||
properties: {},
|
||||
_meta: {
|
||||
migrationMappingPropertyHashes: {
|
||||
type1: 'type1hash1', // remains the same
|
||||
type2: 'type2hash2', // updated
|
||||
type4: 'type4hash1', // new type
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
expect(getUpdatedHashes({ actual, expected })).toEqual(['type2', 'type4']);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -63,6 +63,30 @@ export function diffMappings(actual: IndexMapping, expected: IndexMapping) {
|
|||
return changedProp ? { changedProp: `properties.${changedProp}` } : undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares the actual vs expected mappings' hashes.
|
||||
* Returns a list with all the hashes that have been updated.
|
||||
*/
|
||||
export const getUpdatedHashes = ({
|
||||
actual,
|
||||
expected,
|
||||
}: {
|
||||
actual: IndexMapping;
|
||||
expected: IndexMapping;
|
||||
}): string[] => {
|
||||
if (!actual._meta?.migrationMappingPropertyHashes) {
|
||||
return Object.keys(expected._meta!.migrationMappingPropertyHashes!);
|
||||
}
|
||||
|
||||
const updatedHashes = Object.keys(expected._meta!.migrationMappingPropertyHashes!).filter(
|
||||
(key) =>
|
||||
actual._meta!.migrationMappingPropertyHashes![key] !==
|
||||
expected._meta!.migrationMappingPropertyHashes![key]
|
||||
);
|
||||
|
||||
return updatedHashes;
|
||||
};
|
||||
|
||||
// Convert an object to an md5 hash string, using a stable serialization (canonicalStringify)
|
||||
function md5Object(obj: any) {
|
||||
return crypto.createHash('md5').update(canonicalStringify(obj)).digest('hex');
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
*/
|
||||
|
||||
import { FetchIndexResponse } from '../actions/fetch_indices';
|
||||
import { BaseState } from '../state';
|
||||
import {
|
||||
addExcludedTypesToBoolQuery,
|
||||
addMustClausesToBoolQuery,
|
||||
|
@ -20,6 +21,7 @@ import {
|
|||
createBulkIndexOperationTuple,
|
||||
hasLaterVersionAlias,
|
||||
aliasVersion,
|
||||
getIndexTypes,
|
||||
} from './helpers';
|
||||
|
||||
describe('addExcludedTypesToBoolQuery', () => {
|
||||
|
@ -444,3 +446,17 @@ describe('getTempIndexName', () => {
|
|||
expect(getTempIndexName('.kibana_cases', '8.8.0')).toEqual('.kibana_cases_8.8.0_reindex_temp');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getIndexTypes', () => {
|
||||
it("returns the list of types that belong to a migrator's index, based on its state", () => {
|
||||
const baseState = {
|
||||
indexPrefix: '.kibana_task_manager',
|
||||
indexTypesMap: {
|
||||
'.kibana': ['foo', 'bar'],
|
||||
'.kibana_task_manager': ['task'],
|
||||
},
|
||||
};
|
||||
|
||||
expect(getIndexTypes(baseState as unknown as BaseState)).toEqual(['task']);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -17,7 +17,7 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
|
|||
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
|
||||
import type { AliasAction, FetchIndexResponse } from '../actions';
|
||||
import type { BulkIndexOperationTuple } from './create_batches';
|
||||
import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state';
|
||||
import type { BaseState, OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state';
|
||||
|
||||
/** @internal */
|
||||
export const REINDEX_TEMP_SUFFIX = '_reindex_temp';
|
||||
|
@ -323,3 +323,7 @@ export const increaseBatchSize = (
|
|||
const increasedBatchSize = Math.floor(stateP.batchSize * 1.2);
|
||||
return increasedBatchSize > stateP.maxBatchSize ? stateP.maxBatchSize : increasedBatchSize;
|
||||
};
|
||||
|
||||
export const getIndexTypes = (state: BaseState): string[] => {
|
||||
return state.indexTypesMap[state.indexPrefix];
|
||||
};
|
||||
|
|
|
@ -2597,19 +2597,75 @@ describe('migrations v2 model', () => {
|
|||
targetIndex: '.kibana_7.11.0_001',
|
||||
};
|
||||
|
||||
it('CHECK_TARGET_MAPPINGS -> UPDATE_TARGET_MAPPINGS_PROPERTIES if mappings do not match', () => {
|
||||
const res: ResponseType<'CHECK_TARGET_MAPPINGS'> = Either.right({ match: false });
|
||||
const newState = model(
|
||||
checkTargetMappingsState,
|
||||
res
|
||||
) as UpdateTargetMappingsPropertiesState;
|
||||
expect(newState.controlState).toBe('UPDATE_TARGET_MAPPINGS_PROPERTIES');
|
||||
describe('reindex migration', () => {
|
||||
it('CHECK_TARGET_MAPPINGS -> UPDATE_TARGET_MAPPINGS_PROPERTIES if origin mappings did not exist', () => {
|
||||
const res: ResponseType<'CHECK_TARGET_MAPPINGS'> = Either.left({
|
||||
type: 'actual_mappings_incomplete' as const,
|
||||
});
|
||||
const newState = model(
|
||||
checkTargetMappingsState,
|
||||
res
|
||||
) as UpdateTargetMappingsPropertiesState;
|
||||
expect(newState.controlState).toBe('UPDATE_TARGET_MAPPINGS_PROPERTIES');
|
||||
expect(Option.isNone(newState.updatedTypesQuery)).toEqual(true);
|
||||
});
|
||||
});
|
||||
|
||||
it('CHECK_TARGET_MAPPINGS -> CHECK_VERSION_INDEX_READY_ACTIONS if mappings match', () => {
|
||||
const res: ResponseType<'CHECK_TARGET_MAPPINGS'> = Either.right({ match: true });
|
||||
const newState = model(checkTargetMappingsState, res) as CheckVersionIndexReadyActions;
|
||||
expect(newState.controlState).toBe('CHECK_VERSION_INDEX_READY_ACTIONS');
|
||||
describe('compatible migration', () => {
|
||||
it('CHECK_TARGET_MAPPINGS -> UPDATE_TARGET_MAPPINGS_PROPERTIES if core fields have been updated', () => {
|
||||
const res: ResponseType<'CHECK_TARGET_MAPPINGS'> = Either.left({
|
||||
type: 'compared_mappings_changed' as const,
|
||||
updatedRootFields: ['namespaces'],
|
||||
updatedTypes: ['dashboard', 'lens'],
|
||||
});
|
||||
const newState = model(
|
||||
checkTargetMappingsState,
|
||||
res
|
||||
) as UpdateTargetMappingsPropertiesState;
|
||||
expect(newState.controlState).toBe('UPDATE_TARGET_MAPPINGS_PROPERTIES');
|
||||
// since a core field has been updated, we must pickup ALL SOs.
|
||||
// Thus, we must NOT define a filter query.
|
||||
expect(Option.isNone(newState.updatedTypesQuery)).toEqual(true);
|
||||
});
|
||||
|
||||
it('CHECK_TARGET_MAPPINGS -> UPDATE_TARGET_MAPPINGS_PROPERTIES if only SO types have changed', () => {
|
||||
const res: ResponseType<'CHECK_TARGET_MAPPINGS'> = Either.left({
|
||||
type: 'compared_mappings_changed' as const,
|
||||
updatedRootFields: [],
|
||||
updatedTypes: ['dashboard', 'lens'],
|
||||
});
|
||||
const newState = model(
|
||||
checkTargetMappingsState,
|
||||
res
|
||||
) as UpdateTargetMappingsPropertiesState;
|
||||
expect(newState.controlState).toBe('UPDATE_TARGET_MAPPINGS_PROPERTIES');
|
||||
expect(
|
||||
Option.isSome(newState.updatedTypesQuery) && newState.updatedTypesQuery.value
|
||||
).toEqual({
|
||||
bool: {
|
||||
should: [
|
||||
{
|
||||
term: {
|
||||
type: 'dashboard',
|
||||
},
|
||||
},
|
||||
{
|
||||
term: {
|
||||
type: 'lens',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('CHECK_TARGET_MAPPINGS -> CHECK_VERSION_INDEX_READY_ACTIONS if mappings match', () => {
|
||||
const res: ResponseType<'CHECK_TARGET_MAPPINGS'> = Either.right({
|
||||
type: 'compared_mappings_match' as const,
|
||||
});
|
||||
const newState = model(checkTargetMappingsState, res) as CheckVersionIndexReadyActions;
|
||||
expect(newState.controlState).toBe('CHECK_VERSION_INDEX_READY_ACTIONS');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -2842,6 +2898,17 @@ describe('migrations v2 model', () => {
|
|||
versionIndexReadyActions: Option.none,
|
||||
sourceIndex: Option.some('.kibana') as Option.Some<string>,
|
||||
targetIndex: '.kibana_7.11.0_001',
|
||||
updatedTypesQuery: Option.fromNullable({
|
||||
bool: {
|
||||
should: [
|
||||
{
|
||||
term: {
|
||||
type: 'type1',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
}),
|
||||
};
|
||||
test('UPDATE_TARGET_MAPPINGS_PROPERTIES -> UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK', () => {
|
||||
const res: ResponseType<'UPDATE_TARGET_MAPPINGS_PROPERTIES'> = Either.right({
|
||||
|
|
|
@ -1422,20 +1422,61 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
|
|||
} else if (stateP.controlState === 'CHECK_TARGET_MAPPINGS') {
|
||||
const res = resW as ResponseType<typeof stateP.controlState>;
|
||||
if (Either.isRight(res)) {
|
||||
if (!res.right.match) {
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'UPDATE_TARGET_MAPPINGS_PROPERTIES',
|
||||
};
|
||||
}
|
||||
|
||||
// The md5 of the mappings match, so there's no need to update target mappings
|
||||
// The md5 of ALL mappings match, so there's no need to update target mappings
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'CHECK_VERSION_INDEX_READY_ACTIONS',
|
||||
};
|
||||
} else {
|
||||
throwBadResponse(stateP, res as never);
|
||||
const left = res.left;
|
||||
if (isTypeof(left, 'actual_mappings_incomplete')) {
|
||||
// reindex migration
|
||||
// some top-level properties have changed, e.g. 'dynamic' or '_meta' (see checkTargetMappings())
|
||||
// we must "pick-up" all documents on the index (by not providing a query)
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'UPDATE_TARGET_MAPPINGS_PROPERTIES',
|
||||
updatedTypesQuery: Option.none,
|
||||
};
|
||||
} else if (isTypeof(left, 'compared_mappings_changed')) {
|
||||
if (left.updatedRootFields.length) {
|
||||
// compatible migration: some core fields have been updated
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'UPDATE_TARGET_MAPPINGS_PROPERTIES',
|
||||
// we must "pick-up" all documents on the index (by not providing a query)
|
||||
updatedTypesQuery: Option.none,
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'info',
|
||||
message: `Kibana is performing a compatible upgrade and the mappings of some root fields have been changed. For Elasticsearch to pickup these mappings, all saved objects need to be updated. Updated root fields: ${left.updatedRootFields}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
} else {
|
||||
// compatible migration: some fields have been updated, and they all correspond to SO types
|
||||
return {
|
||||
...stateP,
|
||||
controlState: 'UPDATE_TARGET_MAPPINGS_PROPERTIES',
|
||||
// we can "pick-up" only the SO types that have changed
|
||||
updatedTypesQuery: Option.some({
|
||||
bool: {
|
||||
should: left.updatedTypes.map((type) => ({ term: { type } })),
|
||||
},
|
||||
}),
|
||||
logs: [
|
||||
...stateP.logs,
|
||||
{
|
||||
level: 'info',
|
||||
message: `Kibana is performing a compatible upgrade and NO root fields have been udpated. Kibana will update the following SO types so that ES can pickup the updated mappings: ${left.updatedTypes}.`,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
} else {
|
||||
throwBadResponse(stateP, res as never);
|
||||
}
|
||||
}
|
||||
} else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS_PROPERTIES') {
|
||||
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
|
||||
|
|
|
@ -58,6 +58,7 @@ import { createDelayFn } from './common/utils';
|
|||
import type { TransformRawDocs } from './types';
|
||||
import * as Actions from './actions';
|
||||
import { REMOVED_TYPES } from './core';
|
||||
import { getIndexTypes } from './model/helpers';
|
||||
|
||||
type ActionMap = ReturnType<typeof nextActionMap>;
|
||||
|
||||
|
@ -201,6 +202,7 @@ export const nextActionMap = (
|
|||
Actions.checkTargetMappings({
|
||||
actualMappings: Option.toUndefined(state.sourceIndexMappings),
|
||||
expectedMappings: state.targetIndexMappings,
|
||||
indexTypes: getIndexTypes(state),
|
||||
}),
|
||||
UPDATE_TARGET_MAPPINGS_PROPERTIES: (state: UpdateTargetMappingsPropertiesState) =>
|
||||
Actions.updateAndPickupMappings({
|
||||
|
@ -208,6 +210,7 @@ export const nextActionMap = (
|
|||
index: state.targetIndex,
|
||||
mappings: omit(state.targetIndexMappings, ['_meta']), // ._meta property will be updated on a later step
|
||||
batchSize: state.batchSize,
|
||||
query: Option.toUndefined(state.updatedTypesQuery),
|
||||
}),
|
||||
UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK: (
|
||||
state: UpdateTargetMappingsPropertiesWaitForTaskState
|
||||
|
|
|
@ -374,6 +374,7 @@ export interface CheckTargetMappingsState extends PostInitState {
|
|||
export interface UpdateTargetMappingsPropertiesState extends PostInitState {
|
||||
/** Update the mappings of the target index */
|
||||
readonly controlState: 'UPDATE_TARGET_MAPPINGS_PROPERTIES';
|
||||
readonly updatedTypesQuery: Option.Option<QueryDslQueryContainer>;
|
||||
}
|
||||
|
||||
export interface UpdateTargetMappingsPropertiesWaitForTaskState extends PostInitState {
|
||||
|
|
|
@ -326,6 +326,8 @@ describe('split .kibana index into multiple system indices', () => {
|
|||
// .kibana_task_manager migrator is NOT involved in relocation, must not sync with other migrators
|
||||
'[.kibana_task_manager] READY_TO_REINDEX_SYNC',
|
||||
'[.kibana_task_manager] DONE_REINDEXING_SYNC',
|
||||
// .kibana_task_manager migrator performed a REINDEX migration, it must update ALL types
|
||||
'[.kibana_task_manager] Kibana is performing a compatible update and it will update the following SO types so that ES can pickup the updated mappings',
|
||||
]);
|
||||
|
||||
// new indices migrators did not exist, so they all have to reindex (create temp index + sync)
|
||||
|
@ -390,6 +392,9 @@ describe('split .kibana index into multiple system indices', () => {
|
|||
// should NOT retransform anything (we reindexed, thus we transformed already)
|
||||
['.kibana', '.kibana_task_manager', '.kibana_so_ui', '.kibana_so_search'].forEach((index) => {
|
||||
expect(logs).not.toContainLogEntry(`[${index}] OUTDATED_DOCUMENTS_TRANSFORM`);
|
||||
expect(logs).not.toContainLogEntry(
|
||||
`[${index}] Kibana is performing a compatible update and it will update the following SO types so that ES can pickup the updated mappings`
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import Path from 'path';
|
||||
import type { TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
|
||||
import {
|
||||
clearLog,
|
||||
createBaseline,
|
||||
currentVersion,
|
||||
defaultKibanaIndex,
|
||||
defaultLogFilePath,
|
||||
getCompatibleMappingsMigrator,
|
||||
getIncompatibleMappingsMigrator,
|
||||
startElasticsearch,
|
||||
} from '../kibana_migrator_test_kit';
|
||||
import '../jest_matchers';
|
||||
import { delay, parseLogFile } from '../test_utils';
|
||||
import { IndexMappingMeta } from '@kbn/core-saved-objects-base-server-internal';
|
||||
|
||||
export const logFilePath = Path.join(__dirname, 'pickup_updated_types_only.test.log');
|
||||
|
||||
describe('pickupUpdatedMappings', () => {
|
||||
let esServer: TestElasticsearchUtils['es'];
|
||||
|
||||
beforeAll(async () => {
|
||||
esServer = await startElasticsearch();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await createBaseline();
|
||||
await clearLog();
|
||||
});
|
||||
|
||||
describe('when performing a reindexing migration', () => {
|
||||
it('should pickup all documents from the index', async () => {
|
||||
const { runMigrations } = await getIncompatibleMappingsMigrator();
|
||||
|
||||
await runMigrations();
|
||||
|
||||
const logs = await parseLogFile(defaultLogFilePath);
|
||||
|
||||
expect(logs).not.toContainLogEntry(
|
||||
'Kibana is performing a compatible upgrade and NO root fields have been udpated. Kibana will update the following SO types so that ES can pickup the updated mappings'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('when performing a compatible migration', () => {
|
||||
it('should pickup only the types that have been updated', async () => {
|
||||
const { runMigrations } = await getCompatibleMappingsMigrator();
|
||||
|
||||
await runMigrations();
|
||||
|
||||
const logs = await parseLogFile(defaultLogFilePath);
|
||||
|
||||
expect(logs).toContainLogEntry(
|
||||
'Kibana is performing a compatible upgrade and NO root fields have been udpated. Kibana will update the following SO types so that ES can pickup the updated mappings: complex.'
|
||||
);
|
||||
});
|
||||
|
||||
it('should pickup ALL documents if any root fields have been updated', async () => {
|
||||
const { runMigrations, client } = await getCompatibleMappingsMigrator();
|
||||
|
||||
// we tamper the baseline mappings to simulate some root fields changes
|
||||
const baselineMappings = await client.indices.getMapping({ index: defaultKibanaIndex });
|
||||
const _meta = baselineMappings[`${defaultKibanaIndex}_${currentVersion}_001`].mappings
|
||||
._meta as IndexMappingMeta;
|
||||
_meta.migrationMappingPropertyHashes!.namespace =
|
||||
_meta.migrationMappingPropertyHashes!.namespace + '_tampered';
|
||||
await client.indices.putMapping({ index: defaultKibanaIndex, _meta });
|
||||
|
||||
await runMigrations();
|
||||
|
||||
const logs = await parseLogFile(defaultLogFilePath);
|
||||
|
||||
expect(logs).toContainLogEntry(
|
||||
'Kibana is performing a compatible upgrade and the mappings of some root fields have been changed. For Elasticsearch to pickup these mappings, all saved objects need to be updated. Updated root fields: namespace.'
|
||||
);
|
||||
expect(logs).not.toContainLogEntry(
|
||||
'Kibana is performing a compatible upgrade and NO root fields have been udpated. Kibana will update the following SO types so that ES can pickup the updated mappings'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await esServer?.stop();
|
||||
await delay(2);
|
||||
});
|
||||
});
|
|
@ -387,6 +387,16 @@ export const createBaseline = async () => {
|
|||
types: baselineTypes,
|
||||
});
|
||||
|
||||
// remove the testing index (current and next minor)
|
||||
await client.indices.delete({
|
||||
index: [
|
||||
defaultKibanaIndex,
|
||||
`${defaultKibanaIndex}_${currentVersion}_001`,
|
||||
`${defaultKibanaIndex}_${nextMinor}_001`,
|
||||
],
|
||||
ignore_unavailable: true,
|
||||
});
|
||||
|
||||
await runMigrations();
|
||||
|
||||
await savedObjectsRepository.bulkCreate(baselineDocuments, {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue