Rewriting SO id during migration (#97222)

* some typos

* implement an alternative client-side migration algorithm

required to enforce idempotent id generation for SO

* update tests

* lol

* remove unnecessary param from request generic

* remove unused parameter

* optimize search when quierying SO for migration

* fix wrong type in fixtures

* try shard_doc asc

* add an integration test

* cleanup

* track_total_hits: false to improve perf

* add happy path test for transformDocs action

* remove unused types

* fix wrong typing

* add cleanup phase

* add an integration test for cleanup phase

* add unit-tests for cleanup function

* address comments

* Fix functional test

* set defaultIndex before each test. otherwise it is deleted in the first test file during cleanup phase

* sourceIndex: Option.some<> for consistency

* Revert "set defaultIndex before each test. otherwise it is deleted in the first test file during cleanup phase"

This reverts commit a128d7b7c0.

* address comments from Pierre

* fix test

* Revert "fix test"

This reverts commit 97315b6dc2.

* revert min convert version back to 8.0

Co-authored-by: Matthias Wilhelm <matthias.wilhelm@elastic.co>
This commit is contained in:
Mikhail Shustov 2021-04-26 15:57:29 +02:00 committed by GitHub
parent 52a650d947
commit e6ba8ccdc2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1210 additions and 312 deletions

View file

@ -850,7 +850,8 @@ function assertNoDowngrades(
* that we can later regenerate any inbound object references to match.
*
* @note This is only intended to be used when single-namespace object types are converted into multi-namespace object types.
* @internal
*/
function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) {
export function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) {
return uuidv5(`${namespace}:${type}:${id}`, uuidv5.DNS); // the uuidv5 namespace constant (uuidv5.DNS) is arbitrary
}

View file

@ -14,7 +14,6 @@
import _ from 'lodash';
import { estypes } from '@elastic/elasticsearch';
import { MigrationEsClient } from './migration_es_client';
import { CountResponse, SearchResponse } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsMigrationVersion } from '../../types';
import { AliasAction, RawDoc } from './call_cluster';
@ -95,11 +94,11 @@ export async function fetchInfo(client: MigrationEsClient, index: string): Promi
* Creates a reader function that serves up batches of documents from the index. We aren't using
* an async generator, as that feature currently breaks Kibana's tooling.
*
* @param {CallCluster} callCluster - The elastic search connection
* @param {string} - The index to be read from
* @param client - The elastic search connection
* @param index - The index to be read from
* @param {opts}
* @prop {number} batchSize - The number of documents to read at a time
* @prop {string} scrollDuration - The scroll duration used for scrolling through the index
* @prop batchSize - The number of documents to read at a time
* @prop scrollDuration - The scroll duration used for scrolling through the index
*/
export function reader(
client: MigrationEsClient,
@ -111,11 +110,11 @@ export function reader(
const nextBatch = () =>
scrollId !== undefined
? client.scroll<SearchResponse<SavedObjectsRawDocSource>>({
? client.scroll<SavedObjectsRawDocSource>({
scroll,
scroll_id: scrollId,
})
: client.search<SearchResponse<SavedObjectsRawDocSource>>({
: client.search<SavedObjectsRawDocSource>({
body: {
size: batchSize,
query: excludeUnusedTypesQuery,
@ -143,10 +142,6 @@ export function reader(
/**
* Writes the specified documents to the index, throws an exception
* if any of the documents fail to save.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {RawDoc[]} docs
*/
export async function write(client: MigrationEsClient, index: string, docs: RawDoc[]) {
const { body } = await client.bulk({
@ -184,9 +179,9 @@ export async function write(client: MigrationEsClient, index: string, docs: RawD
* it performs the check *each* time it is called, rather than memoizing itself,
* as this is used to determine if migrations are complete.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {SavedObjectsMigrationVersion} migrationVersion - The latest versions of the migrations
* @param client - The connection to ElasticSearch
* @param index
* @param migrationVersion - The latest versions of the migrations
*/
export async function migrationsUpToDate(
client: MigrationEsClient,
@ -207,7 +202,7 @@ export async function migrationsUpToDate(
return true;
}
const { body } = await client.count<CountResponse>({
const { body } = await client.count({
body: {
query: {
bool: {
@ -271,9 +266,9 @@ export async function createIndex(
* is a concrete index. This function will reindex `alias` into a new index, delete the `alias`
* index, and then create an alias `alias` that points to the new index.
*
* @param {CallCluster} callCluster - The connection to ElasticSearch
* @param {FullIndexInfo} info - Information about the mappings and name of the new index
* @param {string} alias - The name of the index being converted to an alias
* @param client - The ElasticSearch connection
* @param info - Information about the mappings and name of the new index
* @param alias - The name of the index being converted to an alias
*/
export async function convertToAlias(
client: MigrationEsClient,
@ -297,7 +292,7 @@ export async function convertToAlias(
* alias, meaning that it will only point to one index at a time, so we
* remove any other indices from the alias.
*
* @param {CallCluster} callCluster
* @param {CallCluster} client
* @param {string} index
* @param {string} alias
* @param {AliasAction[]} aliasActions - Optional actions to be added to the updateAliases call
@ -377,7 +372,7 @@ async function reindex(
) {
// We poll instead of having the request wait for completion, as for large indices,
// the request times out on the Elasticsearch side of things. We have a relatively tight
// polling interval, as the request is fairly efficent, and we don't
// polling interval, as the request is fairly efficient, and we don't
// want to block index migrations for too long on this.
const pollInterval = 250;
const { body: reindexBody } = await client.reindex({

View file

@ -189,8 +189,7 @@ async function migrateSourceToDest(context: Context) {
serializer,
documentMigrator.migrateAndConvert,
// @ts-expect-error @elastic/elasticsearch `Hit._id` may be a string | number in ES, but we always expect strings in the SO index.
docs,
log
docs
)
);
}

View file

@ -11,7 +11,6 @@ import _ from 'lodash';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsSerializer } from '../../serialization';
import { migrateRawDocs } from './migrate_raw_docs';
import { createSavedObjectsMigrationLoggerMock } from '../../migrations/mocks';
describe('migrateRawDocs', () => {
test('converts raw docs to saved objects', async () => {
@ -24,8 +23,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
createSavedObjectsMigrationLoggerMock()
]
);
expect(result).toEqual([
@ -59,7 +57,6 @@ describe('migrateRawDocs', () => {
});
test('throws when encountering a corrupt saved object document', async () => {
const logger = createSavedObjectsMigrationLoggerMock();
const transform = jest.fn<any, any>((doc: any) => [
set(_.cloneDeep(doc), 'attributes.name', 'TADA'),
]);
@ -69,8 +66,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'foo:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
logger
]
);
expect(result).rejects.toMatchInlineSnapshot(
@ -88,8 +84,7 @@ describe('migrateRawDocs', () => {
const result = await migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }]
);
expect(result).toEqual([
@ -119,12 +114,9 @@ describe('migrateRawDocs', () => {
throw new Error('error during transform');
});
await expect(
migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
)
migrateRawDocs(new SavedObjectsSerializer(new SavedObjectTypeRegistry()), transform, [
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
])
).rejects.toThrowErrorMatchingInlineSnapshot(`"error during transform"`);
});
});

View file

@ -16,7 +16,6 @@ import {
SavedObjectUnsanitizedDoc,
} from '../../serialization';
import { MigrateAndConvertFn } from './document_migrator';
import { SavedObjectsMigrationLogger } from '.';
/**
* Error thrown when saved object migrations encounter a corrupt saved object.
@ -46,8 +45,7 @@ export class CorruptSavedObjectError extends Error {
export async function migrateRawDocs(
serializer: SavedObjectsSerializer,
migrateDoc: MigrateAndConvertFn,
rawDocs: SavedObjectsRawDoc[],
log: SavedObjectsMigrationLogger
rawDocs: SavedObjectsRawDoc[]
): Promise<SavedObjectsRawDoc[]> {
const migrateDocWithoutBlocking = transformNonBlocking(migrateDoc);
const processedDocs = [];

View file

@ -229,48 +229,6 @@ describe('KibanaMigrator', () => {
jest.clearAllMocks();
});
it('creates a V2 migrator that initializes a new index and migrates an existing index', async () => {
const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
const migratorStatus = migrator.getStatus$().pipe(take(3)).toPromise();
migrator.prepareMigrations();
await migrator.runMigrations();
// Basic assertions that we're creating and reindexing the expected indices
expect(options.client.indices.create).toHaveBeenCalledTimes(3);
expect(options.client.indices.create.mock.calls).toEqual(
expect.arrayContaining([
// LEGACY_CREATE_REINDEX_TARGET
expect.arrayContaining([expect.objectContaining({ index: '.my-index_pre8.2.3_001' })]),
// CREATE_REINDEX_TEMP
expect.arrayContaining([
expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
]),
// CREATE_NEW_TARGET
expect.arrayContaining([expect.objectContaining({ index: 'other-index_8.2.3_001' })]),
])
);
// LEGACY_REINDEX
expect(options.client.reindex.mock.calls[0][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index' }),
dest: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
}),
})
);
// REINDEX_SOURCE_TO_TEMP
expect(options.client.reindex.mock.calls[1][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
dest: expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
}),
})
);
const { status } = await migratorStatus;
return expect(status).toEqual('completed');
});
it('emits results on getMigratorResult$()', async () => {
const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
@ -378,6 +336,24 @@ const mockV2MigrationOptions = () => {
} as estypes.GetTaskResponse)
);
options.client.search = jest
.fn()
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ hits: { hits: [] } })
);
options.client.openPointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ id: 'pit_id' })
);
options.client.closePointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ succeeded: true })
);
return options;
};

View file

@ -36,7 +36,6 @@ import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
import { runResilientMigrator } from '../../migrationsv2';
import { migrateRawDocs } from '../core/migrate_raw_docs';
import { MigrationLogger } from '../core/migration_logger';
export interface KibanaMigratorOptions {
client: ElasticsearchClient;
@ -185,12 +184,7 @@ export class KibanaMigrator {
logger: this.log,
preMigrationScript: indexMap[index].script,
transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) =>
migrateRawDocs(
this.serializer,
this.documentMigrator.migrateAndConvert,
rawDocs,
new MigrationLogger(this.log)
),
migrateRawDocs(this.serializer, this.documentMigrator.migrateAndConvert, rawDocs),
migrationVersionPerType: this.documentMigrator.migrationVersion,
indexPrefix: index,
migrationsConfig: this.soMigrationsConfig,

View file

@ -78,6 +78,54 @@ describe('actions', () => {
});
});
describe('openPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.openPit(client, 'my_index');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
describe('readWithPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.readWithPit(client, 'pitId', Option.none, 10_000);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
describe('closePit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.closePit(client, 'pitId');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
describe('transformDocs', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.transformDocs(client, () => Promise.resolve([]), [], 'my_index', false);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
describe('reindex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.reindex(
@ -205,7 +253,7 @@ describe('actions', () => {
describe('bulkOverwriteTransformedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', []);
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', [], 'wait_for');
try {
await task();
} catch (e) {

View file

@ -16,7 +16,8 @@ import { pipe } from 'fp-ts/lib/pipeable';
import { flow } from 'fp-ts/lib/function';
import { ElasticsearchClient } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization';
import type { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization';
import type { TransformRawDocs } from '../types';
import {
catchRetryableEsClientErrors,
RetryableEsClientError,
@ -419,6 +420,133 @@ export const pickupUpdatedMappings = (
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface OpenPitResponse {
pitId: string;
}
// how long ES should keep PIT alive
const pitKeepAlive = '10m';
/*
* Creates a lightweight view of data when the request has been initiated.
* See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
* */
export const openPit = (
client: ElasticsearchClient,
index: string
): TaskEither.TaskEither<RetryableEsClientError, OpenPitResponse> => () => {
return client
.openPointInTime({
index,
keep_alive: pitKeepAlive,
})
.then((response) => Either.right({ pitId: response.body.id }))
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface ReadWithPit {
outdatedDocuments: SavedObjectsRawDoc[];
readonly lastHitSortValue: number[] | undefined;
}
/*
* Requests documents from the index using PIT mechanism.
* Filter unusedTypesToExclude documents out to exclude them from being migrated.
* */
export const readWithPit = (
client: ElasticsearchClient,
pitId: string,
/* When reading we use a source query to exclude saved objects types which
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: Option.Option<estypes.QueryContainer>,
batchSize: number,
searchAfter?: number[]
): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> => () => {
return client
.search<SavedObjectsRawDoc>({
body: {
// Sort fields are required to use searchAfter
sort: {
// the most efficient option as order is not important for the migration
_shard_doc: { order: 'asc' },
},
pit: { id: pitId, keep_alive: pitKeepAlive },
size: batchSize,
search_after: searchAfter,
// Improve performance by not calculating the total number of hits
// matching the query.
track_total_hits: false,
// Exclude saved object types
query: Option.isSome(unusedTypesQuery) ? unusedTypesQuery.value : undefined,
},
})
.then((response) => {
const hits = response.body.hits.hits;
if (hits.length > 0) {
return Either.right({
// @ts-expect-error @elastic/elasticsearch _source is optional
outdatedDocuments: hits as SavedObjectsRawDoc[],
lastHitSortValue: hits[hits.length - 1].sort as number[],
});
}
return Either.right({
outdatedDocuments: [],
lastHitSortValue: undefined,
});
})
.catch(catchRetryableEsClientErrors);
};
/*
* Closes PIT.
* See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
* */
export const closePit = (
client: ElasticsearchClient,
pitId: string
): TaskEither.TaskEither<RetryableEsClientError, {}> => () => {
return client
.closePointInTime({
body: { id: pitId },
})
.then((response) => {
if (!response.body.succeeded) {
throw new Error(`Failed to close PointInTime with id: ${pitId}`);
}
return Either.right({});
})
.catch(catchRetryableEsClientErrors);
};
/*
* Transform outdated docs and write them to the index.
* */
export const transformDocs = (
client: ElasticsearchClient,
transformRawDocs: TransformRawDocs,
outdatedDocuments: SavedObjectsRawDoc[],
index: string,
refresh: estypes.Refresh
): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | TargetIndexHadWriteBlock,
'bulk_index_succeeded'
> =>
pipe(
TaskEither.tryCatch(
() => transformRawDocs(outdatedDocuments),
(e) => {
throw e;
}
),
TaskEither.chain((docs) => bulkOverwriteTransformedDocuments(client, index, docs, refresh))
);
/** @internal */
export interface ReindexResponse {
taskId: string;
}
@ -489,10 +617,12 @@ interface WaitForReindexTaskFailure {
readonly cause: { type: string; reason: string };
}
/** @internal */
export interface TargetIndexHadWriteBlock {
type: 'target_index_had_write_block';
}
/** @internal */
export interface IncompatibleMappingException {
type: 'incompatible_mapping_exception';
}
@ -605,14 +735,17 @@ export const waitForPickupUpdatedMappingsTask = flow(
)
);
/** @internal */
export interface AliasNotFound {
type: 'alias_not_found_exception';
}
/** @internal */
export interface RemoveIndexNotAConcreteIndex {
type: 'remove_index_not_a_concrete_index';
}
/** @internal */
export type AliasAction =
| { remove_index: { index: string } }
| { remove: { index: string; alias: string; must_exist: boolean } }
@ -679,11 +812,19 @@ export const updateAliases = (
.catch(catchRetryableEsClientErrors);
};
/** @internal */
export interface AcknowledgeResponse {
acknowledged: boolean;
shardsAcknowledged: boolean;
}
function aliasArrayToRecord(aliases: string[]): Record<string, estypes.Alias> {
const result: Record<string, estypes.Alias> = {};
for (const alias of aliases) {
result[alias] = {};
}
return result;
}
/**
* Creates an index with the given mappings
*
@ -698,16 +839,13 @@ export const createIndex = (
client: ElasticsearchClient,
indexName: string,
mappings: IndexMapping,
aliases?: string[]
aliases: string[] = []
): TaskEither.TaskEither<RetryableEsClientError, 'create_index_succeeded'> => {
const createIndexTask: TaskEither.TaskEither<
RetryableEsClientError,
AcknowledgeResponse
> = () => {
const aliasesObject = (aliases ?? []).reduce((acc, alias) => {
acc[alias] = {};
return acc;
}, {} as Record<string, estypes.Alias>);
const aliasesObject = aliasArrayToRecord(aliases);
return client.indices
.create(
@ -792,6 +930,7 @@ export const createIndex = (
);
};
/** @internal */
export interface UpdateAndPickupMappingsResponse {
taskId: string;
}
@ -842,6 +981,8 @@ export const updateAndPickupMappings = (
})
);
};
/** @internal */
export interface SearchResponse {
outdatedDocuments: SavedObjectsRawDoc[];
}
@ -906,7 +1047,8 @@ export const searchForOutdatedDocuments = (
export const bulkOverwriteTransformedDocuments = (
client: ElasticsearchClient,
index: string,
transformedDocs: SavedObjectsRawDoc[]
transformedDocs: SavedObjectsRawDoc[],
refresh: estypes.Refresh
): TaskEither.TaskEither<RetryableEsClientError, 'bulk_index_succeeded'> => () => {
return client
.bulk({
@ -919,15 +1061,7 @@ export const bulkOverwriteTransformedDocuments = (
// system indices puts in place a hard control.
require_alias: false,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
// Wait for a refresh to happen before returning. This ensures that when
// this Kibana instance searches for outdated documents, it won't find
// documents that were already transformed by itself or another Kibna
// instance. However, this causes each OUTDATED_DOCUMENTS_SEARCH ->
// OUTDATED_DOCUMENTS_TRANSFORM cycle to take 1s so when batches are
// small performance will become a lot worse.
// The alternative is to use a search_after with either a tie_breaker
// field or using a Point In Time as a cursor to go through all documents.
refresh: 'wait_for',
refresh,
filter_path: ['items.*.error'],
body: transformedDocs.flatMap((doc) => {
return [

View file

@ -9,9 +9,10 @@
import { ElasticsearchClient } from '../../elasticsearch';
import { IndexMapping } from '../mappings';
import { Logger } from '../../logging';
import { SavedObjectsMigrationVersion } from '../types';
import type { SavedObjectsMigrationVersion } from '../types';
import type { TransformRawDocs } from './types';
import { MigrationResult } from '../migrations/core';
import { next, TransformRawDocs } from './next';
import { next } from './next';
import { createInitialState, model } from './model';
import { migrationStateActionMachine } from './migrations_state_action_machine';
import { SavedObjectsMigrationConfigType } from '../saved_objects_config';
@ -55,5 +56,6 @@ export async function runResilientMigrator({
logger,
next: next(client, transformRawDocs),
model,
client,
});
}

View file

@ -1 +1 @@
migration_test_kibana.log
*.log

View file

@ -14,9 +14,14 @@ import { SavedObjectsRawDoc } from '../../serialization';
import {
bulkOverwriteTransformedDocuments,
cloneIndex,
closePit,
createIndex,
fetchIndices,
openPit,
OpenPitResponse,
reindex,
readWithPit,
ReadWithPit,
searchForOutdatedDocuments,
SearchResponse,
setWriteBlock,
@ -30,6 +35,7 @@ import {
UpdateAndPickupMappingsResponse,
verifyReindex,
removeWriteBlock,
transformDocs,
waitForIndexStatusYellow,
} from '../actions';
import * as Either from 'fp-ts/lib/Either';
@ -70,14 +76,20 @@ describe('migration actions', () => {
{ _source: { title: 'saved object 4', type: 'another_unused_type' } },
{ _source: { title: 'f-agent-event 5', type: 'f_agent_event' } },
] as unknown) as SavedObjectsRawDoc[];
await bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', sourceDocs)();
await bulkOverwriteTransformedDocuments(
client,
'existing_index_with_docs',
sourceDocs,
'wait_for'
)();
await createIndex(client, 'existing_index_2', { properties: {} })();
await createIndex(client, 'existing_index_with_write_block', { properties: {} })();
await bulkOverwriteTransformedDocuments(
client,
'existing_index_with_write_block',
sourceDocs
sourceDocs,
'wait_for'
)();
await setWriteBlock(client, 'existing_index_with_write_block')();
await updateAliases(client, [
@ -155,7 +167,12 @@ describe('migration actions', () => {
{ _source: { title: 'doc 4' } },
] as unknown) as SavedObjectsRawDoc[];
await expect(
bulkOverwriteTransformedDocuments(client, 'new_index_without_write_block', sourceDocs)()
bulkOverwriteTransformedDocuments(
client,
'new_index_without_write_block',
sourceDocs,
'wait_for'
)()
).rejects.toMatchObject(expect.anything());
});
it('resolves left index_not_found_exception when the index does not exist', async () => {
@ -265,14 +282,14 @@ describe('migration actions', () => {
const task = cloneIndex(client, 'existing_index_with_write_block', 'clone_target_1');
expect.assertions(1);
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": Object {
"acknowledged": true,
"shardsAcknowledged": true,
},
}
`);
Object {
"_tag": "Right",
"right": Object {
"acknowledged": true,
"shardsAcknowledged": true,
},
}
`);
});
it('resolves right after waiting for index status to be yellow if clone target already existed', async () => {
expect.assertions(2);
@ -331,14 +348,14 @@ describe('migration actions', () => {
expect.assertions(1);
const task = cloneIndex(client, 'no_such_index', 'clone_target_3');
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"index": "no_such_index",
"type": "index_not_found_exception",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"index": "no_such_index",
"type": "index_not_found_exception",
},
}
`);
});
it('resolves left with a retryable_es_client_error if clone target already exists but takes longer than the specified timeout before turning yellow', async () => {
// Create a red index
@ -406,13 +423,13 @@ describe('migration actions', () => {
targetIndex: 'reindex_target',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(`
Array [
"doc 1",
"doc 2",
"doc 3",
"saved object 4",
"f-agent-event 5",
"saved object 4",
]
`);
});
@ -433,18 +450,18 @@ describe('migration actions', () => {
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_excluded_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(`
Array [
"doc 1",
"doc 2",
@ -474,13 +491,13 @@ describe('migration actions', () => {
targetIndex: 'reindex_target_2',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
"doc 2_updated",
"doc 3_updated",
"saved object 4_updated",
"f-agent-event 5_updated",
"saved object 4_updated",
]
`);
});
@ -526,13 +543,13 @@ describe('migration actions', () => {
targetIndex: 'reindex_target_3',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
"doc 2_updated",
"doc 3_updated",
"saved object 4_updated",
"f-agent-event 5_updated",
"saved object 4_updated",
]
`);
});
@ -551,7 +568,7 @@ describe('migration actions', () => {
_id,
_source,
}));
await bulkOverwriteTransformedDocuments(client, 'reindex_target_4', sourceDocs)();
await bulkOverwriteTransformedDocuments(client, 'reindex_target_4', sourceDocs, 'wait_for')();
// Now do a real reindex
const res = (await reindex(
@ -576,13 +593,13 @@ describe('migration actions', () => {
targetIndex: 'reindex_target_4',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(`
Array [
"doc 1",
"doc 2",
"doc 3_updated",
"saved object 4_updated",
"f-agent-event 5_updated",
"saved object 4_updated",
]
`);
});
@ -790,9 +807,169 @@ describe('migration actions', () => {
);
task = verifyReindex(client, 'existing_index_2', 'no_such_index');
await expect(task()).rejects.toMatchInlineSnapshot(
`[ResponseError: index_not_found_exception]`
await expect(task()).rejects.toThrow('index_not_found_exception');
});
});
describe('openPit', () => {
it('opens PointInTime for an index', async () => {
const openPitTask = openPit(client, 'existing_index_with_docs');
const pitResponse = (await openPitTask()) as Either.Right<OpenPitResponse>;
expect(pitResponse.right.pitId).toEqual(expect.any(String));
const searchResponse = await client.search({
body: {
pit: { id: pitResponse.right.pitId },
},
});
await expect(searchResponse.body.hits.hits.length).toBeGreaterThan(0);
});
it('rejects if index does not exist', async () => {
const openPitTask = openPit(client, 'no_such_index');
await expect(openPitTask()).rejects.toThrow('index_not_found_exception');
});
});
describe('readWithPit', () => {
it('requests documents from an index using given PIT', async () => {
const openPitTask = openPit(client, 'existing_index_with_docs');
const pitResponse = (await openPitTask()) as Either.Right<OpenPitResponse>;
const readWithPitTask = readWithPit(
client,
pitResponse.right.pitId,
Option.none,
1000,
undefined
);
const docsResponse = (await readWithPitTask()) as Either.Right<ReadWithPit>;
await expect(docsResponse.right.outdatedDocuments.length).toBe(5);
});
it('requests the batchSize of documents from an index', async () => {
const openPitTask = openPit(client, 'existing_index_with_docs');
const pitResponse = (await openPitTask()) as Either.Right<OpenPitResponse>;
const readWithPitTask = readWithPit(
client,
pitResponse.right.pitId,
Option.none,
3,
undefined
);
const docsResponse = (await readWithPitTask()) as Either.Right<ReadWithPit>;
await expect(docsResponse.right.outdatedDocuments.length).toBe(3);
});
it('it excludes documents not matching the provided "unusedTypesQuery"', async () => {
const openPitTask = openPit(client, 'existing_index_with_docs');
const pitResponse = (await openPitTask()) as Either.Right<OpenPitResponse>;
const readWithPitTask = readWithPit(
client,
pitResponse.right.pitId,
Option.some({
bool: {
must_not: [
{
term: {
type: 'f_agent_event',
},
},
{
term: {
type: 'another_unused_type',
},
},
],
},
}),
1000,
undefined
);
const docsResponse = (await readWithPitTask()) as Either.Right<ReadWithPit>;
expect(docsResponse.right.outdatedDocuments.map((doc) => doc._source.title).sort())
.toMatchInlineSnapshot(`
Array [
"doc 1",
"doc 2",
"doc 3",
]
`);
});
it('rejects if PIT does not exist', async () => {
const readWithPitTask = readWithPit(client, 'no_such_pit', Option.none, 1000, undefined);
await expect(readWithPitTask()).rejects.toThrow('illegal_argument_exception');
});
});
describe('closePit', () => {
it('closes PointInTime', async () => {
const openPitTask = openPit(client, 'existing_index_with_docs');
const pitResponse = (await openPitTask()) as Either.Right<OpenPitResponse>;
const pitId = pitResponse.right.pitId;
await closePit(client, pitId)();
const searchTask = client.search({
body: {
pit: { id: pitId },
},
});
await expect(searchTask).rejects.toThrow('search_phase_execution_exception');
});
it('rejects if PIT does not exist', async () => {
const closePitTask = closePit(client, 'no_such_pit');
await expect(closePitTask()).rejects.toThrow('illegal_argument_exception');
});
});
describe('transformDocs', () => {
it('applies "transformRawDocs" and writes result into an index', async () => {
const index = 'transform_docs_index';
const originalDocs = [
{ _id: 'foo:1', _source: { type: 'dashboard', value: 1 } },
{ _id: 'foo:2', _source: { type: 'dashboard', value: 2 } },
];
const createIndexTask = createIndex(client, index, {
dynamic: true,
properties: {},
});
await createIndexTask();
async function tranformRawDocs(docs: SavedObjectsRawDoc[]): Promise<SavedObjectsRawDoc[]> {
for (const doc of docs) {
doc._source.value += 1;
}
return docs;
}
const transformTask = transformDocs(client, tranformRawDocs, originalDocs, index, 'wait_for');
const result = (await transformTask()) as Either.Right<'bulk_index_succeeded'>;
expect(result.right).toBe('bulk_index_succeeded');
const { body } = await client.search<{ value: number }>({
index,
});
const hits = body.hits.hits;
const foo1 = hits.find((h) => h._id === 'foo:1');
expect(foo1?._source?.value).toBe(2);
const foo2 = hits.find((h) => h._id === 'foo:2');
expect(foo2?._source?.value).toBe(3);
});
});
@ -919,7 +1096,8 @@ describe('migration actions', () => {
await bulkOverwriteTransformedDocuments(
client,
'existing_index_without_mappings',
sourceDocs
sourceDocs,
'wait_for'
)();
// Assert that we can't search over the unmapped fields of the document
@ -1147,7 +1325,13 @@ describe('migration actions', () => {
{ _source: { title: 'doc 6' } },
{ _source: { title: 'doc 7' } },
] as unknown) as SavedObjectsRawDoc[];
const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', newDocs);
const task = bulkOverwriteTransformedDocuments(
client,
'existing_index_with_docs',
newDocs,
'wait_for'
);
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -1162,10 +1346,12 @@ describe('migration actions', () => {
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', [
...existingDocs,
({ _source: { title: 'doc 8' } } as unknown) as SavedObjectsRawDoc,
]);
const task = bulkOverwriteTransformedDocuments(
client,
'existing_index_with_docs',
[...existingDocs, ({ _source: { title: 'doc 8' } } as unknown) as SavedObjectsRawDoc],
'wait_for'
);
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
@ -1180,7 +1366,12 @@ describe('migration actions', () => {
{ _source: { title: 'doc 7' } },
] as unknown) as SavedObjectsRawDoc[];
await expect(
bulkOverwriteTransformedDocuments(client, 'existing_index_with_write_block', newDocs)()
bulkOverwriteTransformedDocuments(
client,
'existing_index_with_write_block',
newDocs,
'wait_for'
)()
).rejects.toMatchObject(expect.anything());
});
});

View file

@ -0,0 +1,131 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Path from 'path';
import Fs from 'fs';
import Util from 'util';
import JSON5 from 'json5';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import type { Root } from '../../../root';
const logFilePath = Path.join(__dirname, 'cleanup_test.log');
const asyncUnlink = Util.promisify(Fs.unlink);
const asyncReadFile = Util.promisify(Fs.readFile);
async function removeLogFile() {
// ignore errors if it doesn't exist
await asyncUnlink(logFilePath).catch(() => void 0);
}
function createRoot() {
return kbnTestServer.createRootWithCorePlugins(
{
migrations: {
skip: false,
enableV2: true,
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFilePath,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
},
{
oss: true,
}
);
}
describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
beforeAll(async () => {
await removeLogFile();
});
afterAll(async () => {
if (root) {
await root.shutdown();
}
if (esServer) {
await esServer.stop();
}
await new Promise((resolve) => setTimeout(resolve, 10000));
});
it('clean ups if migration fails', async () => {
const { startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'trial',
// original SO:
// {
// _index: '.kibana_7.13.0_001',
// _type: '_doc',
// _id: 'index-pattern:test_index*',
// _version: 1,
// result: 'created',
// _shards: { total: 2, successful: 1, failed: 0 },
// _seq_no: 0,
// _primary_term: 1
// }
dataArchive: Path.join(__dirname, 'archives', '7.13.0_with_corrupted_so.zip'),
},
},
});
root = createRoot();
esServer = await startES();
await root.setup();
await expect(root.start()).rejects.toThrow(
/Unable to migrate the corrupt saved object document with _id: 'index-pattern:test_index\*'/
);
const logFileContent = await asyncReadFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str));
const logRecordWithPit = records.find(
(rec) => rec.message === '[.kibana] REINDEX_SOURCE_TO_TEMP_OPEN_PIT RESPONSE'
);
expect(logRecordWithPit).toBeTruthy();
const pitId = logRecordWithPit.right.pitId;
expect(pitId).toBeTruthy();
const client = esServer.es.getClient();
await expect(
client.search({
body: {
pit: { id: pitId },
},
})
// throws an exception that cannot search with closed PIT
).rejects.toThrow(/search_phase_execution_exception/);
});
});

View file

@ -51,6 +51,8 @@ describe('migration v2', () => {
migrations: {
skip: false,
enableV2: true,
// There are 53 docs in fixtures. Batch size configured to enforce 3 migration steps.
batchSize: 20,
},
logging: {
appenders: {

View file

@ -0,0 +1,240 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Path from 'path';
import Fs from 'fs';
import Util from 'util';
import { kibanaPackageJson as pkg } from '@kbn/utils';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import type { ElasticsearchClient } from '../../../elasticsearch';
import { Root } from '../../../root';
import { deterministicallyRegenerateObjectId } from '../../migrations/core/document_migrator';
const logFilePath = Path.join(__dirname, 'migration_test_kibana.log');
const asyncUnlink = Util.promisify(Fs.unlink);
async function removeLogFile() {
// ignore errors if it doesn't exist
await asyncUnlink(logFilePath).catch(() => void 0);
}
function sortByTypeAndId(a: { type: string; id: string }, b: { type: string; id: string }) {
return a.type.localeCompare(b.type) || a.id.localeCompare(b.id);
}
async function fetchDocs(esClient: ElasticsearchClient, index: string) {
const { body } = await esClient.search<any>({
index,
body: {
query: {
bool: {
should: [
{
term: { type: 'foo' },
},
{
term: { type: 'bar' },
},
{
term: { type: 'legacy-url-alias' },
},
],
},
},
},
});
return body.hits.hits
.map((h) => ({
...h._source,
id: h._id,
}))
.sort(sortByTypeAndId);
}
function createRoot() {
return kbnTestServer.createRootWithCorePlugins(
{
migrations: {
skip: false,
enableV2: true,
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFilePath,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
},
{
oss: true,
}
);
}
describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
beforeAll(async () => {
await removeLogFile();
});
afterAll(async () => {
if (root) {
await root.shutdown();
}
if (esServer) {
await esServer.stop();
}
await new Promise((resolve) => setTimeout(resolve, 10000));
});
it('rewrites id deterministically for SO with namespaceType: "multiple" and "multiple-isolated"', async () => {
const migratedIndex = `.kibana_${pkg.version}_001`;
const { startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'trial',
// original SO:
// [
// { id: 'foo:1', type: 'foo', foo: { name: 'Foo 1 default' } },
// { id: 'spacex:foo:1', type: 'foo', foo: { name: 'Foo 1 spacex' }, namespace: 'spacex' },
// {
// id: 'bar:1',
// type: 'bar',
// bar: { nomnom: 1 },
// references: [{ type: 'foo', id: '1', name: 'Foo 1 default' }],
// },
// {
// id: 'spacex:bar:1',
// type: 'bar',
// bar: { nomnom: 2 },
// references: [{ type: 'foo', id: '1', name: 'Foo 1 spacex' }],
// namespace: 'spacex',
// },
// ];
dataArchive: Path.join(__dirname, 'archives', '7.13.0_so_with_multiple_namespaces.zip'),
},
},
});
root = createRoot();
esServer = await startES();
const coreSetup = await root.setup();
coreSetup.savedObjects.registerType({
name: 'foo',
hidden: false,
mappings: { properties: { name: { type: 'text' } } },
namespaceType: 'multiple',
convertToMultiNamespaceTypeVersion: '8.0.0',
});
coreSetup.savedObjects.registerType({
name: 'bar',
hidden: false,
mappings: { properties: { nomnom: { type: 'integer' } } },
namespaceType: 'multiple-isolated',
convertToMultiNamespaceTypeVersion: '8.0.0',
});
const coreStart = await root.start();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const migratedDocs = await fetchDocs(esClient, migratedIndex);
// each newly converted multi-namespace object in a non-default space has its ID deterministically regenerated, and a legacy-url-alias
// object is created which links the old ID to the new ID
const newFooId = deterministicallyRegenerateObjectId('spacex', 'foo', '1');
const newBarId = deterministicallyRegenerateObjectId('spacex', 'bar', '1');
expect(migratedDocs).toEqual(
[
{
id: 'foo:1',
type: 'foo',
foo: { name: 'Foo 1 default' },
references: [],
namespaces: ['default'],
migrationVersion: { foo: '8.0.0' },
coreMigrationVersion: pkg.version,
},
{
id: `foo:${newFooId}`,
type: 'foo',
foo: { name: 'Foo 1 spacex' },
references: [],
namespaces: ['spacex'],
originId: '1',
migrationVersion: { foo: '8.0.0' },
coreMigrationVersion: pkg.version,
},
{
// new object for spacex:foo:1
id: 'legacy-url-alias:spacex:foo:1',
type: 'legacy-url-alias',
'legacy-url-alias': {
targetId: newFooId,
targetNamespace: 'spacex',
targetType: 'foo',
},
migrationVersion: {},
references: [],
coreMigrationVersion: pkg.version,
},
{
id: 'bar:1',
type: 'bar',
bar: { nomnom: 1 },
references: [{ type: 'foo', id: '1', name: 'Foo 1 default' }],
namespaces: ['default'],
migrationVersion: { bar: '8.0.0' },
coreMigrationVersion: pkg.version,
},
{
id: `bar:${newBarId}`,
type: 'bar',
bar: { nomnom: 2 },
references: [{ type: 'foo', id: newFooId, name: 'Foo 1 spacex' }],
namespaces: ['spacex'],
originId: '1',
migrationVersion: { bar: '8.0.0' },
coreMigrationVersion: pkg.version,
},
{
// new object for spacex:bar:1
id: 'legacy-url-alias:spacex:bar:1',
type: 'legacy-url-alias',
'legacy-url-alias': {
targetId: newBarId,
targetNamespace: 'spacex',
targetType: 'bar',
},
migrationVersion: {},
references: [],
coreMigrationVersion: pkg.version,
},
].sort(sortByTypeAndId)
);
});
});

View file

@ -5,9 +5,9 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { cleanupMock } from './migrations_state_machine_cleanup.mocks';
import { migrationStateActionMachine } from './migrations_state_action_machine';
import { loggingSystemMock } from '../../mocks';
import { loggingSystemMock, elasticsearchServiceMock } from '../../mocks';
import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
import { AllControlStates, State } from './types';
@ -15,6 +15,7 @@ import { createInitialState } from './model';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { elasticsearchClientMock } from '../../elasticsearch/client/mocks';
const esClient = elasticsearchServiceMock.createElasticsearchClient();
describe('migrationsStateActionMachine', () => {
beforeAll(() => {
jest
@ -74,6 +75,7 @@ describe('migrationsStateActionMachine', () => {
logger: mockLogger.get(),
model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']),
next,
client: esClient,
});
const logs = loggingSystemMock.collect(mockLogger);
const doneLog = logs.info.splice(8, 1)[0][0];
@ -151,6 +153,7 @@ describe('migrationsStateActionMachine', () => {
logger: mockLogger.get(),
model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']),
next,
client: esClient,
})
).resolves.toEqual(expect.anything());
});
@ -161,6 +164,7 @@ describe('migrationsStateActionMachine', () => {
logger: mockLogger.get(),
model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']),
next,
client: esClient,
})
).resolves.toEqual(expect.objectContaining({ status: 'migrated' }));
});
@ -171,6 +175,7 @@ describe('migrationsStateActionMachine', () => {
logger: mockLogger.get(),
model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']),
next,
client: esClient,
})
).resolves.toEqual(expect.objectContaining({ status: 'patched' }));
});
@ -181,6 +186,7 @@ describe('migrationsStateActionMachine', () => {
logger: mockLogger.get(),
model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'FATAL']),
next,
client: esClient,
})
).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.my-so-index] index: the fatal reason]`
@ -196,6 +202,7 @@ describe('migrationsStateActionMachine', () => {
logger: mockLogger.get(),
model: transitionModel(['LEGACY_DELETE', 'FATAL']),
next,
client: esClient,
}).catch((err) => err);
// Ignore the first 4 log entries that come from our model
const executionLogLogs = loggingSystemMock.collect(mockLogger).info.slice(4);
@ -418,6 +425,7 @@ describe('migrationsStateActionMachine', () => {
})
);
},
client: esClient,
})
).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.my-so-index] index. Please check the health of your Elasticsearch cluster and try again. Error: [snapshot_in_progress_exception]: Cannot delete indices that are being snapshotted]`
@ -450,6 +458,7 @@ describe('migrationsStateActionMachine', () => {
next: () => {
throw new Error('this action throws');
},
client: esClient,
})
).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.my-so-index] index. Error: this action throws]`
@ -483,6 +492,7 @@ describe('migrationsStateActionMachine', () => {
if (state.controlState === 'LEGACY_DELETE') throw new Error('this action throws');
return () => Promise.resolve('hello');
},
client: esClient,
});
} catch (e) {
/** ignore */
@ -680,4 +690,37 @@ describe('migrationsStateActionMachine', () => {
]
`);
});
describe('cleanup', () => {
beforeEach(() => {
cleanupMock.mockClear();
});
it('calls cleanup function when an action throws', async () => {
await expect(
migrationStateActionMachine({
initialState: { ...initialState, reason: 'the fatal reason' } as State,
logger: mockLogger.get(),
model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'FATAL']),
next: () => {
throw new Error('this action throws');
},
client: esClient,
})
).rejects.toThrow();
expect(cleanupMock).toHaveBeenCalledTimes(1);
});
it('calls cleanup function when reaching the FATAL state', async () => {
await expect(
migrationStateActionMachine({
initialState: { ...initialState, reason: 'the fatal reason' } as State,
logger: mockLogger.get(),
model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'FATAL']),
next,
client: esClient,
})
).rejects.toThrow();
expect(cleanupMock).toHaveBeenCalledTimes(1);
});
});
});

View file

@ -9,8 +9,10 @@
import { errors as EsErrors } from '@elastic/elasticsearch';
import * as Option from 'fp-ts/lib/Option';
import { Logger, LogMeta } from '../../logging';
import type { ElasticsearchClient } from '../../elasticsearch';
import { CorruptSavedObjectError } from '../migrations/core/migrate_raw_docs';
import { Model, Next, stateActionMachine } from './state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import { State } from './types';
interface StateLogMeta extends LogMeta {
@ -19,7 +21,8 @@ interface StateLogMeta extends LogMeta {
};
}
type ExecutionLog = Array<
/** @internal */
export type ExecutionLog = Array<
| {
type: 'transition';
prevControlState: State['controlState'];
@ -31,6 +34,11 @@ type ExecutionLog = Array<
controlState: State['controlState'];
res: unknown;
}
| {
type: 'cleanup';
state: State;
message: string;
}
>;
const logStateTransition = (
@ -99,11 +107,13 @@ export async function migrationStateActionMachine({
logger,
next,
model,
client,
}: {
initialState: State;
logger: Logger;
next: Next<State>;
model: Model<State>;
client: ElasticsearchClient;
}) {
const executionLog: ExecutionLog = [];
const startTime = Date.now();
@ -112,11 +122,13 @@ export async function migrationStateActionMachine({
// indicate which messages come from which index upgrade.
const logMessagePrefix = `[${initialState.indexPrefix}] `;
let prevTimestamp = startTime;
let lastState: State | undefined;
try {
const finalState = await stateActionMachine<State>(
initialState,
(state) => next(state),
(state, res) => {
lastState = state;
executionLog.push({
type: 'response',
res,
@ -169,6 +181,7 @@ export async function migrationStateActionMachine({
};
}
} else if (finalState.controlState === 'FATAL') {
await cleanup(client, executionLog, finalState);
dumpExecutionLog(logger, logMessagePrefix, executionLog);
return Promise.reject(
new Error(
@ -180,6 +193,7 @@ export async function migrationStateActionMachine({
throw new Error('Invalid terminating control state');
}
} catch (e) {
await cleanup(client, executionLog, lastState);
if (e instanceof EsErrors.ResponseError) {
logger.error(
logMessagePrefix + `[${e.body?.error?.type}]: ${e.body?.error?.reason ?? e.message}`
@ -202,9 +216,13 @@ export async function migrationStateActionMachine({
);
}
throw new Error(
const newError = new Error(
`Unable to complete saved object migrations for the [${initialState.indexPrefix}] index. ${e}`
);
// restore error stack to point to a source of the problem.
newError.stack = `[${e.stack}]`;
throw newError;
}
}
}

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const cleanupMock = jest.fn();
jest.doMock('./migrations_state_machine_cleanup', () => ({
cleanup: cleanupMock,
}));

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { ElasticsearchClient } from '../../elasticsearch';
import * as Actions from './actions';
import type { State } from './types';
import type { ExecutionLog } from './migrations_state_action_machine';
export async function cleanup(
client: ElasticsearchClient,
executionLog: ExecutionLog,
state?: State
) {
if (!state) return;
if ('sourceIndexPitId' in state) {
try {
await Actions.closePit(client, state.sourceIndexPitId)();
} catch (e) {
executionLog.push({
type: 'cleanup',
state,
message: e.message,
});
}
}
}

View file

@ -17,7 +17,10 @@ import type {
LegacyReindexState,
LegacyReindexWaitForTaskState,
LegacyDeleteState,
ReindexSourceToTempState,
ReindexSourceToTempOpenPit,
ReindexSourceToTempRead,
ReindexSourceToTempClosePit,
ReindexSourceToTempIndex,
UpdateTargetMappingsState,
UpdateTargetMappingsWaitForTaskState,
OutdatedDocumentsSearch,
@ -25,7 +28,6 @@ import type {
MarkVersionIndexReady,
BaseState,
CreateReindexTempState,
ReindexSourceToTempWaitForTaskState,
MarkVersionIndexReadyConflict,
CreateNewTargetState,
CloneTempToSource,
@ -299,14 +301,12 @@ describe('migrations v2 model', () => {
settings: {},
},
});
const newState = model(initState, res) as FatalState;
const newState = model(initState, res) as WaitForYellowSourceState;
expect(newState.controlState).toEqual('WAIT_FOR_YELLOW_SOURCE');
expect(newState).toMatchObject({
controlState: 'WAIT_FOR_YELLOW_SOURCE',
sourceIndex: '.kibana_7.invalid.0_001',
});
expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE');
expect(newState.sourceIndex.value).toBe('.kibana_7.invalid.0_001');
});
test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v2 migrations index (>= 7.11.0)', () => {
const res: ResponseType<'INIT'> = Either.right({
'.kibana_7.11.0_001': {
@ -330,15 +330,14 @@ describe('migrations v2 model', () => {
},
},
res
);
) as WaitForYellowSourceState;
expect(newState).toMatchObject({
controlState: 'WAIT_FOR_YELLOW_SOURCE',
sourceIndex: '.kibana_7.11.0_001',
});
expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE');
expect(newState.sourceIndex.value).toBe('.kibana_7.11.0_001');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v1 migrations index (>= 6.5 < 7.11.0)', () => {
const res: ResponseType<'INIT'> = Either.right({
'.kibana_3': {
@ -349,12 +348,10 @@ describe('migrations v2 model', () => {
settings: {},
},
});
const newState = model(initState, res);
const newState = model(initState, res) as WaitForYellowSourceState;
expect(newState).toMatchObject({
controlState: 'WAIT_FOR_YELLOW_SOURCE',
sourceIndex: '.kibana_3',
});
expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE');
expect(newState.sourceIndex.value).toBe('.kibana_3');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
@ -420,12 +417,10 @@ describe('migrations v2 model', () => {
versionIndex: 'my-saved-objects_7.11.0_001',
},
res
);
) as WaitForYellowSourceState;
expect(newState).toMatchObject({
controlState: 'WAIT_FOR_YELLOW_SOURCE',
sourceIndex: 'my-saved-objects_3',
});
expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE');
expect(newState.sourceIndex.value).toBe('my-saved-objects_3');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
@ -449,12 +444,11 @@ describe('migrations v2 model', () => {
versionIndex: 'my-saved-objects_7.12.0_001',
},
res
);
) as WaitForYellowSourceState;
expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE');
expect(newState.sourceIndex.value).toBe('my-saved-objects_7.11.0');
expect(newState).toMatchObject({
controlState: 'WAIT_FOR_YELLOW_SOURCE',
sourceIndex: 'my-saved-objects_7.11.0',
});
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
@ -662,7 +656,7 @@ describe('migrations v2 model', () => {
const waitForYellowSourceState: WaitForYellowSourceState = {
...baseState,
controlState: 'WAIT_FOR_YELLOW_SOURCE',
sourceIndex: '.kibana_3',
sourceIndex: Option.some('.kibana_3') as Option.Some<string>,
sourceIndexMappings: mappingsWithUnknownType,
};
@ -734,7 +728,7 @@ describe('migrations v2 model', () => {
});
});
describe('CREATE_REINDEX_TEMP', () => {
const createReindexTargetState: CreateReindexTempState = {
const state: CreateReindexTempState = {
...baseState,
controlState: 'CREATE_REINDEX_TEMP',
versionIndexReadyActions: Option.none,
@ -742,80 +736,134 @@ describe('migrations v2 model', () => {
targetIndex: '.kibana_7.11.0_001',
tempIndexMappings: { properties: {} },
};
it('CREATE_REINDEX_TEMP -> REINDEX_SOURCE_TO_TEMP if action succeeds', () => {
it('CREATE_REINDEX_TEMP -> REINDEX_SOURCE_TO_TEMP_OPEN_PIT if action succeeds', () => {
const res: ResponseType<'CREATE_REINDEX_TEMP'> = Either.right('create_index_succeeded');
const newState = model(createReindexTargetState, res);
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP');
const newState = model(state, res);
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_OPEN_PIT');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
});
describe('REINDEX_SOURCE_TO_TEMP', () => {
const reindexSourceToTargetState: ReindexSourceToTempState = {
describe('REINDEX_SOURCE_TO_TEMP_OPEN_PIT', () => {
const state: ReindexSourceToTempOpenPit = {
...baseState,
controlState: 'REINDEX_SOURCE_TO_TEMP',
controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT',
versionIndexReadyActions: Option.none,
sourceIndex: Option.some('.kibana') as Option.Some<string>,
targetIndex: '.kibana_7.11.0_001',
tempIndexMappings: { properties: {} },
};
test('REINDEX_SOURCE_TO_TEMP -> REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP'> = Either.right({
taskId: 'reindex-task-id',
it('REINDEX_SOURCE_TO_TEMP_OPEN_PIT -> REINDEX_SOURCE_TO_TEMP_READ if action succeeds', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_OPEN_PIT'> = Either.right({
pitId: 'pit_id',
});
const newState = model(reindexSourceToTargetState, res);
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
const newState = model(state, res) as ReindexSourceToTempRead;
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ');
expect(newState.sourceIndexPitId).toBe('pit_id');
expect(newState.lastHitSortValue).toBe(undefined);
});
});
describe('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK', () => {
const state: ReindexSourceToTempWaitForTaskState = {
describe('REINDEX_SOURCE_TO_TEMP_READ', () => {
const state: ReindexSourceToTempRead = {
...baseState,
controlState: 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK',
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
versionIndexReadyActions: Option.none,
sourceIndex: Option.some('.kibana') as Option.Some<string>,
sourceIndexPitId: 'pit_id',
targetIndex: '.kibana_7.11.0_001',
reindexSourceToTargetTaskId: 'reindex-task-id',
tempIndexMappings: { properties: {} },
lastHitSortValue: undefined,
};
test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> SET_TEMP_WRITE_BLOCK when response is right', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.right(
'reindex_succeeded'
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_INDEX if the index has outdated documents to reindex', () => {
const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }];
const lastHitSortValue = [123456];
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({
outdatedDocuments,
lastHitSortValue,
});
const newState = model(state, res) as ReindexSourceToTempIndex;
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_INDEX');
expect(newState.outdatedDocuments).toBe(outdatedDocuments);
expect(newState.lastHitSortValue).toBe(lastHitSortValue);
});
it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if no outdated documents to reindex', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({
outdatedDocuments: [],
lastHitSortValue: undefined,
});
const newState = model(state, res) as ReindexSourceToTempClosePit;
expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT');
expect(newState.sourceIndexPitId).toBe('pit_id');
});
});
describe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', () => {
const state: ReindexSourceToTempClosePit = {
...baseState,
controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT',
versionIndexReadyActions: Option.none,
sourceIndex: Option.some('.kibana') as Option.Some<string>,
sourceIndexPitId: 'pit_id',
targetIndex: '.kibana_7.11.0_001',
tempIndexMappings: { properties: {} },
};
it('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT -> SET_TEMP_WRITE_BLOCK if action succeeded', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT'> = Either.right({});
const newState = model(state, res) as ReindexSourceToTempIndex;
expect(newState.controlState).toBe('SET_TEMP_WRITE_BLOCK');
expect(newState.sourceIndex).toEqual(state.sourceIndex);
});
});
describe('REINDEX_SOURCE_TO_TEMP_INDEX', () => {
const state: ReindexSourceToTempIndex = {
...baseState,
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX',
outdatedDocuments: [],
versionIndexReadyActions: Option.none,
sourceIndex: Option.some('.kibana') as Option.Some<string>,
sourceIndexPitId: 'pit_id',
targetIndex: '.kibana_7.11.0_001',
lastHitSortValue: undefined,
};
it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ if action succeeded', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.right(
'bulk_index_succeeded'
);
const newState = model(state, res);
expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK');
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> SET_TEMP_WRITE_BLOCK when response is left target_index_had_write_block', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.left({
it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ when response is left target_index_had_write_block', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.left({
type: 'target_index_had_write_block',
});
const newState = model(state, res);
expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK');
const newState = model(state, res) as ReindexSourceToTempRead;
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> SET_TEMP_WRITE_BLOCK when response is left index_not_found_exception', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.left({
it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ when response is left index_not_found_exception for temp index', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.left({
type: 'index_not_found_exception',
index: '.kibana_7.11.0_reindex_temp',
index: state.tempIndex,
});
const newState = model(state, res);
expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK');
const newState = model(state, res) as ReindexSourceToTempRead;
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK when response is left wait_for_task_completion_timeout', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.left({
message: '[timeout_exception] Timeout waiting for ...',
type: 'wait_for_task_completion_timeout',
});
const newState = model(state, res);
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK');
expect(newState.retryCount).toEqual(1);
expect(newState.retryDelay).toEqual(2000);
});
});
describe('SET_TEMP_WRITE_BLOCK', () => {
const state: SetTempWriteBlock = {
...baseState,

View file

@ -227,7 +227,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
return {
...stateP,
controlState: 'WAIT_FOR_YELLOW_SOURCE',
sourceIndex: source,
sourceIndex: Option.some(source) as Option.Some<string>,
sourceIndexMappings: indices[source].mappings,
};
} else if (indices[stateP.legacyIndex] != null) {
@ -303,7 +303,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
}
} else if (stateP.controlState === 'LEGACY_SET_WRITE_BLOCK') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
// If the write block is sucessfully in place
// If the write block is successfully in place
if (Either.isRight(res)) {
return { ...stateP, controlState: 'LEGACY_CREATE_REINDEX_TARGET' };
} else if (Either.isLeft(res)) {
@ -431,14 +431,14 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
return {
...stateP,
controlState: 'SET_SOURCE_WRITE_BLOCK',
sourceIndex: Option.some(source) as Option.Some<string>,
sourceIndex: source,
targetIndex: target,
targetIndexMappings: disableUnknownTypeMappingFields(
stateP.targetIndexMappings,
stateP.sourceIndexMappings
),
versionIndexReadyActions: Option.some<AliasAction[]>([
{ remove: { index: source, alias: stateP.currentAlias, must_exist: true } },
{ remove: { index: source.value, alias: stateP.currentAlias, must_exist: true } },
{ add: { index: target, alias: stateP.currentAlias } },
{ add: { index: target, alias: stateP.versionAlias } },
{ remove_index: { index: stateP.tempIndex } },
@ -466,32 +466,61 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
} else if (stateP.controlState === 'CREATE_REINDEX_TEMP') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
return { ...stateP, controlState: 'REINDEX_SOURCE_TO_TEMP' };
return { ...stateP, controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT' };
} else {
// If the createIndex action receives an 'resource_already_exists_exception'
// it will wait until the index status turns green so we don't have any
// left responses to handle here.
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP') {
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
return {
...stateP,
controlState: 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK',
reindexSourceToTargetTaskId: res.right.taskId,
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
sourceIndexPitId: res.right.pitId,
lastHitSortValue: undefined,
};
} else {
// Since this is a background task, the request should always succeed,
// errors only show up in the returned task.
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK') {
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_READ') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
if (res.right.outdatedDocuments.length > 0) {
return {
...stateP,
controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX',
outdatedDocuments: res.right.outdatedDocuments,
lastHitSortValue: res.right.lastHitSortValue,
};
}
return {
...stateP,
controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT',
};
} else {
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
const { sourceIndexPitId, ...state } = stateP;
return {
...state,
controlState: 'SET_TEMP_WRITE_BLOCK',
sourceIndex: stateP.sourceIndex as Option.Some<string>,
};
} else {
throwBadResponse(stateP, res);
}
} else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
if (Either.isRight(res)) {
return {
...stateP,
controlState: 'SET_TEMP_WRITE_BLOCK',
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
};
} else {
const left = res.left;
@ -510,28 +539,11 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
// we know another instance already completed these.
return {
...stateP,
controlState: 'SET_TEMP_WRITE_BLOCK',
controlState: 'REINDEX_SOURCE_TO_TEMP_READ',
};
} else if (isLeftTypeof(left, 'wait_for_task_completion_timeout')) {
// After waiting for the specificed timeout, the task has not yet
// completed. Retry this step to see if the task has completed after an
// exponential delay. We will basically keep polling forever until the
// Elasticeasrch task succeeds or fails.
return delayRetryState(stateP, left.message, Number.MAX_SAFE_INTEGER);
} else if (
isLeftTypeof(left, 'index_not_found_exception') ||
isLeftTypeof(left, 'incompatible_mapping_exception')
) {
// Don't handle the following errors as the migration algorithm should
// never cause them to occur:
// - incompatible_mapping_exception the temp index has `dynamic: false`
// mappings
// - index_not_found_exception for the source index, we will never
// delete the source index
throwBadResponse(stateP, left as never);
} else {
throwBadResponse(stateP, left);
}
// should never happen
throwBadResponse(stateP, res as never);
}
} else if (stateP.controlState === 'SET_TEMP_WRITE_BLOCK') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -609,7 +621,7 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
controlState: 'OUTDATED_DOCUMENTS_SEARCH',
};
} else {
throwBadResponse(stateP, res);
throwBadResponse(stateP, res as never);
}
} else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
@ -647,10 +659,10 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
} else {
const left = res.left;
if (isLeftTypeof(left, 'wait_for_task_completion_timeout')) {
// After waiting for the specificed timeout, the task has not yet
// After waiting for the specified timeout, the task has not yet
// completed. Retry this step to see if the task has completed after an
// exponential delay. We will basically keep polling forever until the
// Elasticeasrch task succeeds or fails.
// Elasticsearch task succeeds or fails.
return delayRetryState(stateP, res.left.message, Number.MAX_SAFE_INTEGER);
} else {
throwBadResponse(stateP, left);

View file

@ -6,13 +6,13 @@
* Side Public License, v 1.
*/
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { UnwrapPromise } from '@kbn/utility-types';
import { pipe } from 'fp-ts/lib/pipeable';
import type { UnwrapPromise } from '@kbn/utility-types';
import type {
AllActionStates,
ReindexSourceToTempState,
ReindexSourceToTempOpenPit,
ReindexSourceToTempRead,
ReindexSourceToTempClosePit,
ReindexSourceToTempIndex,
MarkVersionIndexReady,
InitState,
LegacyCreateReindexTargetState,
@ -27,18 +27,16 @@ import type {
UpdateTargetMappingsState,
UpdateTargetMappingsWaitForTaskState,
CreateReindexTempState,
ReindexSourceToTempWaitForTaskState,
MarkVersionIndexReadyConflict,
CreateNewTargetState,
CloneTempToSource,
SetTempWriteBlock,
WaitForYellowSourceState,
TransformRawDocs,
} from './types';
import * as Actions from './actions';
import { ElasticsearchClient } from '../../elasticsearch';
import { SavedObjectsRawDoc } from '..';
export type TransformRawDocs = (rawDocs: SavedObjectsRawDoc[]) => Promise<SavedObjectsRawDoc[]>;
type ActionMap = ReturnType<typeof nextActionMap>;
/**
@ -56,26 +54,43 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
INIT: (state: InitState) =>
Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]),
WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) =>
Actions.waitForIndexStatusYellow(client, state.sourceIndex),
Actions.waitForIndexStatusYellow(client, state.sourceIndex.value),
SET_SOURCE_WRITE_BLOCK: (state: SetSourceWriteBlockState) =>
Actions.setWriteBlock(client, state.sourceIndex.value),
CREATE_NEW_TARGET: (state: CreateNewTargetState) =>
Actions.createIndex(client, state.targetIndex, state.targetIndexMappings),
CREATE_REINDEX_TEMP: (state: CreateReindexTempState) =>
Actions.createIndex(client, state.tempIndex, state.tempIndexMappings),
REINDEX_SOURCE_TO_TEMP: (state: ReindexSourceToTempState) =>
Actions.reindex(
REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) =>
Actions.openPit(client, state.sourceIndex.value),
REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) =>
Actions.readWithPit(
client,
state.sourceIndex.value,
state.sourceIndexPitId,
state.unusedTypesQuery,
state.batchSize,
state.lastHitSortValue
),
REINDEX_SOURCE_TO_TEMP_CLOSE_PIT: (state: ReindexSourceToTempClosePit) =>
Actions.closePit(client, state.sourceIndexPitId),
REINDEX_SOURCE_TO_TEMP_INDEX: (state: ReindexSourceToTempIndex) =>
Actions.transformDocs(
client,
transformRawDocs,
state.outdatedDocuments,
state.tempIndex,
Option.none,
false,
state.unusedTypesQuery
/**
* Since we don't run a search against the target index, we disable "refresh" to speed up
* the migration process.
* Although any further step must run "refresh" for the target index
* before we reach out to the OUTDATED_DOCUMENTS_SEARCH step.
* Right now, we rely on UPDATE_TARGET_MAPPINGS + UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK
* to perform refresh.
*/
false
),
SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) =>
Actions.setWriteBlock(client, state.tempIndex),
REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK: (state: ReindexSourceToTempWaitForTaskState) =>
Actions.waitForReindexTask(client, state.reindexSourceToTargetTaskId, '60s'),
CLONE_TEMP_TO_TARGET: (state: CloneTempToSource) =>
Actions.cloneIndex(client, state.tempIndex, state.targetIndex),
UPDATE_TARGET_MAPPINGS: (state: UpdateTargetMappingsState) =>
@ -89,16 +104,20 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra
outdatedDocumentsQuery: state.outdatedDocumentsQuery,
}),
OUTDATED_DOCUMENTS_TRANSFORM: (state: OutdatedDocumentsTransform) =>
pipe(
TaskEither.tryCatch(
() => transformRawDocs(state.outdatedDocuments),
(e) => {
throw e;
}
),
TaskEither.chain((docs) =>
Actions.bulkOverwriteTransformedDocuments(client, state.targetIndex, docs)
)
// Wait for a refresh to happen before returning. This ensures that when
// this Kibana instance searches for outdated documents, it won't find
// documents that were already transformed by itself or another Kibana
// instance. However, this causes each OUTDATED_DOCUMENTS_SEARCH ->
// OUTDATED_DOCUMENTS_TRANSFORM cycle to take 1s so when batches are
// small performance will become a lot worse.
// The alternative is to use a search_after with either a tie_breaker
// field or using a Point In Time as a cursor to go through all documents.
Actions.transformDocs(
client,
transformRawDocs,
state.outdatedDocuments,
state.targetIndex,
'wait_for'
),
MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) =>
Actions.updateAliases(client, state.versionIndexReadyActions.value),

View file

@ -132,7 +132,7 @@ export type FatalState = BaseState & {
export interface WaitForYellowSourceState extends BaseState {
/** Wait for the source index to be yellow before requesting it. */
readonly controlState: 'WAIT_FOR_YELLOW_SOURCE';
readonly sourceIndex: string;
readonly sourceIndex: Option.Some<string>;
readonly sourceIndexMappings: IndexMapping;
}
@ -158,21 +158,29 @@ export type CreateReindexTempState = PostInitState & {
readonly sourceIndex: Option.Some<string>;
};
export type ReindexSourceToTempState = PostInitState & {
/** Reindex documents from the source index into the target index */
readonly controlState: 'REINDEX_SOURCE_TO_TEMP';
export interface ReindexSourceToTempOpenPit extends PostInitState {
/** Open PIT to the source index */
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT';
readonly sourceIndex: Option.Some<string>;
};
}
export type ReindexSourceToTempWaitForTaskState = PostInitState & {
/**
* Wait until reindexing documents from the source index into the target
* index has completed
*/
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK';
readonly sourceIndex: Option.Some<string>;
readonly reindexSourceToTargetTaskId: string;
};
export interface ReindexSourceToTempRead extends PostInitState {
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_READ';
readonly sourceIndexPitId: string;
readonly lastHitSortValue: number[] | undefined;
}
export interface ReindexSourceToTempClosePit extends PostInitState {
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT';
readonly sourceIndexPitId: string;
}
export interface ReindexSourceToTempIndex extends PostInitState {
readonly controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX';
readonly outdatedDocuments: SavedObjectsRawDoc[];
readonly sourceIndexPitId: string;
readonly lastHitSortValue: number[] | undefined;
}
export type SetTempWriteBlock = PostInitState & {
/**
@ -302,8 +310,10 @@ export type State =
| SetSourceWriteBlockState
| CreateNewTargetState
| CreateReindexTempState
| ReindexSourceToTempState
| ReindexSourceToTempWaitForTaskState
| ReindexSourceToTempOpenPit
| ReindexSourceToTempRead
| ReindexSourceToTempClosePit
| ReindexSourceToTempIndex
| SetTempWriteBlock
| CloneTempToSource
| UpdateTargetMappingsState
@ -324,3 +334,5 @@ export type AllControlStates = State['controlState'];
* 'FATAL' and 'DONE').
*/
export type AllActionStates = Exclude<AllControlStates, 'FATAL' | 'DONE'>;
export type TransformRawDocs = (rawDocs: SavedObjectsRawDoc[]) => Promise<SavedObjectsRawDoc[]>;

View file

@ -1917,10 +1917,7 @@ export class SavedObjectsRepository {
...(preference ? { preference } : {}),
};
const {
body,
statusCode,
} = await this.client.openPointInTime<SavedObjectsOpenPointInTimeResponse>(
const { body, statusCode } = await this.client.openPointInTime(
// @ts-expect-error @elastic/elasticsearch OpenPointInTimeRequest.index expected to accept string[]
esOptions,
{

View file

@ -6,7 +6,7 @@
* Side Public License, v 1.
*/
import { Client } from 'elasticsearch';
import type { KibanaClient } from '@elastic/elasticsearch/api/kibana';
import { ToolingLog, REPO_ROOT } from '@kbn/dev-utils';
import {
// @ts-expect-error https://github.com/elastic/kibana/issues/95679
@ -140,7 +140,7 @@ export interface TestElasticsearchServer {
start: (esArgs: string[], esEnvVars: Record<string, string>) => Promise<void>;
stop: () => Promise<void>;
cleanup: () => Promise<void>;
getClient: () => Client;
getClient: () => KibanaClient;
getCallCluster: () => LegacyAPICaller;
getUrl: () => string;
}

View file

@ -35,7 +35,10 @@ export default function ({ getService, getPageObjects }) {
describe('context link in discover', () => {
before(async () => {
await PageObjects.timePicker.setDefaultAbsoluteRangeViaUiSettings();
await kibanaServer.uiSettings.update({ 'doc_table:legacy': true });
await kibanaServer.uiSettings.update({
'doc_table:legacy': true,
defaultIndex: 'logstash-*',
});
await PageObjects.common.navigateToApp('discover');
for (const columnName of TEST_COLUMN_NAMES) {

View file

@ -207,7 +207,7 @@
"fields": "[{\"name\":\"_id\",\"type\":\"string\",\"esTypes\":[\"_id\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"name\":\"_index\",\"type\":\"string\",\"esTypes\":[\"_index\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"name\":\"_score\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"searchable\":false,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"_source\",\"type\":\"_source\",\"esTypes\":[\"_source\"],\"count\":0,\"scripted\":false,\"searchable\":false,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"_type\",\"type\":\"string\",\"esTypes\":[\"_type\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"name\":\"message\",\"type\":\"string\",\"esTypes\":[\"text\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"message.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"message\"}}},{\"name\":\"user\",\"type\":\"string\",\"esTypes\":[\"text\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"user.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"user\"}}}]",
"title": "test_index*"
},
"type": "test_index*"
"type": "index-pattern"
}
}
}